In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os, random
import pandas as pd
from sklearn import datasets  
import cv2
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.transforms import ToTensor
from torchvision.datasets import ImageFolder, DatasetFolder
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.io import read_image
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


In [2]:
data_dir = 'Lab3_galaxies/training_images/training_images/training_images_resized'
#file path
annotations_file = 'Lab3_galaxies/training_classifications.csv'

len(os.listdir(data_dir))

61578

In [3]:
def image_label_generator(data_dir, batch_size):
    # List all image files in the directory
    image_files = [f for f in os.listdir(data_dir) if f.endswith('.jpg')]
    num_images = len(image_files)
    
    # Calculate number of batches
    num_batches = num_images // batch_size
    
    while True:
        # Shuffle the list of image files for each epoch
        np.random.shuffle(image_files)
        
        # Split data into training and validation sets (80% training, 20% validation)
        split_index = int(0.8 * num_images)
        train_files = image_files[:split_index]
        valid_files = image_files[split_index:]

        for i in range(num_batches):
            # Initialize lists to store batch images and labels
            batch_images = []
            batch_labels = []
            
            # Read and process images and labels for the current batch
            for j in range(batch_size):
                idx = i * batch_size + j
                image_path = os.path.join(data_dir, image_files[idx])
                image = cv2.imread(image_path)  # Read image
                label = pd.read_csv(annotations_file).iloc[idx,0]
                batch_images.append(image)
                batch_labels.append(label)
                
            # Convert lists to numpy arrays for efficiency
            batch_images = np.array(batch_images)
            batch_labels = np.array(batch_labels)
            
            yield batch_images, batch_labels


data_dir = 'Lab3_galaxies/training_images/training_images/training_images_resized'
batch_size = 100

generator = image_label_generator(data_dir, batch_size)

# Fetch one batch from the training set
train_batch_images, train_batch_labels = next(generator)

# Fetch one batch from the validation set
valid_batch_images, valid_batch_labels = next(generator)

In [126]:
import torch.nn as nn
import torch.nn.functional as F

# Define the neural network architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.conv3 = nn.Conv2d(64, 128, 3)
        self.conv4 = nn.Conv2d(128, 256, 3)
        
        self.dropout1 = nn.Dropout2d(0.2)
        self.dropout2 = nn.Dropout2d(0.15)
        self.dropout3 = nn.Dropout2d(0.12)
        self.dropout4 = nn.Dropout2d(0.1)
        
        self.fc1 = nn.Linear(32*32, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 37)  # Adjusted to match the desired output size


    def forward(self, x):
        x = self.conv1(x)
        x = F.max_pool2d(x,2)
        
        x = self.conv2(x)
        x = F.max_pool2d(x,2)        

        x = self.conv3(x)
        x = F.max_pool2d(x,2)
        
        x = self.conv4(x)
        x = F.relu(x)
        x = F.max_pool2d(x,2)
        
        x = torch.flatten(x, 1)
        
        x = self.fc1(x)
        x = F.relu(x)

        x = self.dropout1(x)
        
        x = F.relu(x)

        x = self.fc2(x)
        x = F.relu(x)

        x = self.dropout2(x)
        
        x = self.fc3(x)
        x = F.relu(x)

        x = self.dropout3(x)
        
        x = self.fc4(x)
        x = self.dropout4(x)
        
        output = torch.sigmoid(x)
        return output
    
"""    def _calculate_flatten_size(self):
        # Dummy input tensor to calculate the size after convolutional layers
        x = torch.randn(1, 3, 224, 224)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        #x = self.conv4(x)
        # Flatten the output tensor
        return x.view(1, -1).size(1)"""

# Create an instance of the network
net = Net()
    
class CustomImageDataset(Dataset):
    def __init__(self, img_labels, img_dir, transform=None, target_transform=None):
        self.img_labels = img_labels
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform
        

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_name = str(self.img_labels.iloc[idx, 0])  # Assuming img_labels is a DataFrame        
        img_path = os.path.join(self.img_dir, img_name + '.jpg')       
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 0]        
        image = Image.open(img_path).convert("RGB")  # Open image using PIL and convert to RGB
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [127]:
# python image library of range [0, 1] 
# transform them to tensors of normalized range[-1, 1]

transform = transforms.Compose([
    transforms.Resize((77, 77)),  # Resize image to 77x77
    transforms.ToTensor(),           # Convert PIL Image to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize image
])

# set batch_size
batch_size = 100

# set number of workers
num_workers = 2


# Convert 'GalaxyID' column to strings
label = pd.read_csv(annotations_file)
trainset = CustomImageDataset(label,
    img_dir='Lab3_galaxies/training_images/training_images/training_images_resized/',
    transform=transform)

# load test data
testset = CustomImageDataset(label,
    img_dir="Lab3_galaxies/test_images/test_images",
    transform=transform)


# put 10 classes into a set
classes = ('GalaxyID', 'Class1.1', 'Class1.2', 'Class1.3', 'Class2.1', 'Class2.2',
       'Class3.1', 'Class3.2', 'Class4.1', 'Class4.2', 'Class5.1', 'Class5.2',
       'Class5.3', 'Class5.4', 'Class6.1', 'Class6.2', 'Class7.1', 'Class7.2',
       'Class7.3', 'Class8.1', 'Class8.2', 'Class8.3', 'Class8.4', 'Class8.5',
       'Class8.6', 'Class8.7', 'Class9.1', 'Class9.2', 'Class9.3', 'Class10.1',
       'Class10.2', 'Class10.3', 'Class11.1', 'Class11.2', 'Class11.3',
       'Class11.4', 'Class11.5', 'Class11.6')

In [128]:
criterion = nn.MSELoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

train_dataloader = DataLoader(trainset, batch_size=100, shuffle=True)
test_dataloader = DataLoader(testset, batch_size=100, shuffle=True)

In [130]:
device = ("cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu")
model = Net().to(device)
print(model)

X = torch.rand((3,3, 64, 64), device=device)
logits = model(X)
pred_probab = nn.Softmax(dim=1)(logits)
y_pred = pred_probab.argmax(1)
print(f"Predicted class: {y_pred}")

Net(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
  (conv4): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
  (dropout1): Dropout2d(p=0.2, inplace=False)
  (dropout2): Dropout2d(p=0.15, inplace=False)
  (dropout3): Dropout2d(p=0.12, inplace=False)
  (dropout4): Dropout2d(p=0.1, inplace=False)
  (fc1): Linear(in_features=1024, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (fc4): Linear(in_features=256, out_features=37, bias=True)
)
Predicted class: tensor([24, 24, 24])




In [131]:
def train_loop(dataloader, model, loss_fn, optimizer):
    for batch_idx, (inputs, labels) in enumerate(dataloader):
        optimizer.zero_grad()
        
        # Reshape inputs
        inputs = inputs.reshape(-1, 3, 77, 77)
        
        outputs = model(inputs)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        if batch_idx % 10 == 0:
            print(f"Batch {batch_idx}, Loss: {loss.item()}")


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [132]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

epochs = 1
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, criterion, optimizer)
    test_loop(test_dataloader,model, criterion)
print("Done!")

Epoch 1
-------------------------------


  return F.mse_loss(input, target, reduction=self.reduction)


RuntimeError: The size of tensor a (37) must match the size of tensor b (100) at non-singleton dimension 1