In [None]:
Multiclass classification for only setting after multi-label models were not performing well, so we will try multiclass classification for setting only.

In [None]:
#https://pypi.org/project/rawpy/

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from pandas import DataFrame
from torch import nn
import torchvision.transforms.v2 as transforms
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.utils.class_weight import compute_class_weight
import rawpy
import os

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning) #Numpy and Pandas FutureWarnings - ignore them for now, we can fix them later

In [None]:
df = pd.read_csv('categories.csv')
df.head()

In [None]:
df = df.drop(columns=['location','time_of_day','skyCondition']) # Drop other columns, we will use 'setting' as the target variable, input will be the image.

In [None]:
df = pd.get_dummies(df, columns=['setting'], dtype='int')
df.columns

In [None]:
df.head()

In [None]:
device = torch.device("mps")

In [None]:
#Rewrite the dataset class
class MainDataset(Dataset):
    def __init__(self, data,image_dir, transform=None):
        self.data = data
        self.image_dir = image_dir
        self.transform = transform
        self.bounds = []

        if not os.path.exists(self.image_dir):
            raise ValueError(f"Image directory {self.image_dir} does not exist.")

        for file in os.listdir(self.image_dir):
            if not file.startswith('HQa'):
                continue

            start = int(file.split('HQa')[1].split('to')[0])
            end = int(file.split('to')[1])
            self.bounds.append((file,start, end))


    def __len__(self):
        return len(self.data)

    def dng_to_numpy(self, path):
        with rawpy.imread(path) as img:
            return img.postprocess(use_camera_wb=True, no_auto_bright=True) #Trying to get the original image data

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_name = row['img_name']
        img_number = int(img_name.split('a')[1].split('-')[0])  # Extract the number from the image name

        for file,start,end in self.bounds:
            if not start <= img_number <= end:
                continue

            path = f'{self.image_dir}/{file}/photos/{img_name}.dng'
            if not os.path.exists(path):
                raise ValueError(f"Image file {path} does not exist.")

            image = self.dng_to_numpy(path)
            if self.transform:
                image = self.transform(image)

            label = self.data.iloc[idx, 1:].values.tolist() #idx is row, 1: is all columns except the first one (img_name), which are the labels
            label = torch.tensor(label, dtype=torch.float32)  # Convert to tensor

            return image, label

        raise IndexError(f"Image with index {idx} not found in the dataset.")

In [None]:
dataset = MainDataset(data=df, image_dir='raw_photos', transform=transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize for pre-trained models
]))

In [None]:
val_size = int(0.2 * len(dataset))  # 20% for validation
test_size = int(0.2 * len(dataset))  # 20% for testing
train_size = len(dataset) - val_size - test_size  # Remaining for training
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

Add data augmentation depending on overfitting

In [None]:
#For now lets use the same transforms for all datasets, we can change if we are overfitting (e.g. by using data augmentation)
#train_dataset.dataset.transform =

In [None]:
#Data loaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
train_images, train_labels = next(iter(train_loader))
print(train_images.shape, train_labels.shape)

In [None]:
from torchvision.utils import make_grid

def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

print(train_images.dtype)  # Check the shape of the images tensor
print(train_labels.dtype)  # Check the shape of the labels tensor

imshow(make_grid(train_images))

Look at flosst code for data augmentation and transforms

In [None]:
#n_samples / (n_classes * np.bincount(y))

#Build my own class weight function

def class_weights(data: DataFrame):
    weights = []
    for col in data.columns[1:]:  # Skip the first column (img_name)
        classes = data[col].unique()
        total_samples = len(data[col])
        class_weight = total_samples / (len(classes) * data[col].value_counts())
        weights.append(class_weight[1]) # Assuming the positive class is the second one (1)
    class_weights = torch.tensor(weights, dtype=torch.float32)

    return class_weights

In [None]:
weights = class_weights(df)
print(weights)

In [None]:
#Applyimg multiclass classification loss function
criterion = nn.CrossEntropyLoss(weight=weights).to(device) #Contains softmax and negative log likelihood loss

Look at notes from paper about suggested depth and width of the model https://arxiv.org/pdf/1512.03385

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.dropout = nn.Dropout(0.2)
        self.max_pool = nn.MaxPool2d(2, 2)
        self.avg_pool = nn.AvgPool2d(2, 2)
        self.conv1 = nn.Conv2d(3, 32, kernel_size=7, stride=1, padding=3) #Padding should help keep the spatial dimensions the same for longer giving us more features to work with later
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=5, stride=1, padding=2)
        self.bn3 = nn.BatchNorm2d(64)
        #max pool 248
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.conv5 = nn.Conv2d(128, 128, kernel_size=3, stride=1)
        self.bn5 = nn.BatchNorm2d(128)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, stride=1)
        self.bn6 = nn.BatchNorm2d(128)
        #max pool 120
        self.conv7 = nn.Conv2d(128, 256, kernel_size=3, stride=1)
        self.bn7 = nn.BatchNorm2d(256)
        self.conv8 = nn.Conv2d(256, 256, kernel_size=3, stride=1)
        self.bn8 = nn.BatchNorm2d(256)

        self.avg_global_pool = nn.AdaptiveAvgPool2d(1)  # Global average pooling - this will reduce the spatial dimensions to 1x1, allowing us to flatten the output and classify it instead of using a fully connected layer. Basically turns the feature map into a single vector per channel.
        #It does this by averaging the values in each channel across the spatial dimensions, resulting in a tensor of shape (batch_size, num_channels, 1, 1).

        #Due to average pooling size is only 256
        self.fc1 = nn.Linear(256, len(df.columns) - 1) #Classification layer (new)
        # self.fc1 = nn.Linear(256 * 114 * 114, out_features=len(df.columns) - 1) #Classification layer


    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = torch.relu(x) #Break the linearity of the model, so it can learn more complex functions

        x = self.conv2(x)
        x = self.bn2(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions
        x = self.dropout(x)  # Dropout after the first two convolutional layers

        x = self.conv3(x)
        x = self.bn3(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions
        #495

        x = self.max_pool(x)
        #248

        x = self.conv4(x)
        x = self.bn4(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions
        #245

        x = self.conv5(x)
        x = self.bn5(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions
        x = self.dropout(x)  # Dropout after the next two convolutional layers

        x = self.conv6(x)
        x = self.bn6(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions

        x = self.max_pool(x) # Max pooling after the next three convolutional layers

        x = self.conv7(x)
        x = self.bn7(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions
        x = self.dropout(x)  # Dropout after the next two convolutional layers

        x = self.conv8(x)
        x = self.bn8(x)
        x = nn.functional.relu(x)  # Apply ReLU activation function, breaks the linearity of the model, so it can learn more complex functions

        x = self.avg_global_pool(x)  # Global average pooling for classification, basically turn the feature map into a single vector per channel (saves you doing a fully connected layer)
        x = torch.flatten(x, 1)  # Flatten the output for the fully connected layer
        x = self.fc1(x)  # Classification layer
        return x

    def train_epoch(self, train_loader, loss_values, epoch):
        running_loss = 0.0
        total_train_data = len(train_loader)
        for i, data in enumerate(train_loader):# Start counting from 0, enumerate give us a counter as well as the data
            # Every data instance is an input + label pair
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device) # Move inputs and labels to the same device as the model (apple silicon chip, cuda, or cpu)

            optimizer.zero_grad() # Zero your gradients for every batch!

            outputs = self(inputs) # Make predictions for this batch (forward pass)

            loss = criterion(outputs, labels) # Compute the loss and its gradients
            loss.backward()  # Backward pass

            optimizer.step() # Adjust learning weights
            running_loss += loss.item() # running_loss is the total loss for the epoch, we will average it later

        avg_loss = running_loss / total_train_data # Average loss for the epoch (over the all batches in the training set, given by len(train_loader))

        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")  # Print the average loss for this epoch
        loss_values.append(avg_loss) #log the average loss for the epoch to the loss_values list
        return avg_loss, loss_values

    #Maybe add train function (train_epoch) in here, makes a bit cleaner?

Look at flosst code and docs for the rest of the code (training loop etc)

In [None]:
optimizer = torch.optim.AdamW(Model().parameters(), lr=1e-3)  # Using adamW optimizer with weight decay, which is good for regularization, standard L2 regularization is 0.01

In [None]:
recent_train_losses = []  # List to keep track of recent training losses for early stopping
previous_train_losses = []

#This was broken - made a change idk now
def early_stopping(avg_loss,patience=5):

    if len(recent_train_losses) < patience:
        recent_train_losses.append(avg_loss)  # Add the current average loss to the recent losses

    if len(previous_train_losses) < patience:
        previous_train_losses.append(recent_train_losses[0])
        recent_train_losses.pop(0)

    if len(recent_train_losses) == patience and len(previous_train_losses) == patience:
        avg_of_previous_losses = np.mean(previous_train_losses)
        avg_of_recent_previous_losses = np.mean(recent_train_losses)
        if avg_of_previous_losses < avg_of_recent_previous_losses:
            print(f"Early stopping condition met: {avg_of_previous_losses} < {avg_of_recent_previous_losses}")
            return True, recent_train_losses, previous_train_losses  # Early stopping condition met

        print(f"Early stopping condition not met: {avg_of_previous_losses} >= {avg_of_recent_previous_losses}")
        previous_train_losses.append(recent_train_losses[0])  #Add oldest recent loss to previous losses
        recent_train_losses.pop(0)  # Remove the oldest loss to maintain the size of the list

    return False, recent_train_losses, previous_train_losses  # No early stopping condition met


In [None]:
model = Model().to(device)  # Initialize the model and move it to the device (GPU or CPU)

In [None]:
from datetime import datetime
from torch.utils.tensorboard import SummaryWriter

#There's a bunch of tensorboard stuff in the pytorch tutorial, we don't need it for now, but we can add it later if we want to visualize the training process
# Initializing in a separate cell so we can easily add more epochs to the same run
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/classifier_trainer{}'.format(timestamp))

EPOCHS = 10

best_vloss = 1_000_000. #Super big number to start with, so we can save the model if the validation loss is lower than this

# Initialize lists to keep track of loss values for plotting later
loss_values = []
val_loss_values = []


total_val_data = len(val_loader)
for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch + 1)) # Start counting epochs from 1 for better readability

    # Make sure gradient tracking is on, and do a pass over the data, so we are not in evaluation mode.
    model.train(True)

    #Actually train the model, this is where the training happens
    #Keep track of loss values for plotting later, keep giving the loss_values array to the train_one_epoch function so that it keeps track of the loss values
    avg_loss, loss_values = model.train_epoch(train_loader, loss_values,epoch)

    early_stop,recent_train_losses, previous_train_losses = early_stopping(avg_loss,patience=5)  # Check if early stopping condition is met

    running_vloss = 0.0
    # Set the model to evaluation mode, disabling dropout and using population
    # statistics for batch normalization.
    model.eval()

    #This is the validation loop, it will run after each epoch and compute the validation loss
    # Disable gradient computation and reduce memory consumption.
    with torch.no_grad():
        for i, vdata in enumerate(val_loader, 0): #Computes the validation loss per epoch
            vinputs, vlabels = vdata
            vinputs, vlabels = vinputs.to(device), vlabels.to(device).float()  # Move inputs and labels to the same device as the model
            voutputs = model(vinputs)
            vloss = criterion(voutputs, vlabels)
            #running vloss is the total validation loss for the epoch, we will average it later
            running_vloss += vloss.item()  # Add the validation loss for this batch to the running validation loss

    # Average validation loss for the epoch (over the all batches in the validation set, given by len(val_loader))
    avg_vloss = running_vloss / total_val_data
    val_loss_values.append(avg_vloss)

    #Gives the loss and validation loss for the epoch into the terminal
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

    # for both training and validation w]e log the average loss for the epoch
    writer.add_scalars('Training vs. Validation Loss',
                       { 'Training' : avg_loss, 'Validation' : avg_vloss },
                       epoch + 1)
    writer.flush()

    #Track best performance, and save the model's state
    if avg_vloss < best_vloss: #If the validation loss is lower than the best validation loss so far, save the model

        #Best vloss starts at a super high number, so the first model will always be saved
        best_vloss = avg_vloss
        model_path = 'model_{}_{}'.format(timestamp, epoch + 1) # Save the model with a timestamp and epoch number (epoch + 1 to start from 1)
        torch.save(model.state_dict(), model_path)

    if early_stop:  # Check if early stopping condition is met
        print("Early stopping triggered, stopping training.")
        break  # Stop training if early stopping condition is met

    #Onto the next epoch, which will also run through all the batches in the training set and validation set and give the average los s for the epoch

Finish metrics and i need google collab

In [None]:
#Plot the training and validation loss
plt.plot(loss_values, label='Training Loss')
plt.plot(val_loss_values, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

In [None]:
#Training set evaluation
from torcheval.metrics import MulticlassAccuracy #This lib has so many of these - good to use i think
model.eval() #Turns off dropout layers
metric = MulticlassAccuracy()
with torch.no_grad():
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)  # Get the predicted class indices
        metric.update(preds, labels)  # Update the metric with predictions and true labels

accuracy = metric.compute()  # Compute the accuracy
print(f'Training set accuracy: {accuracy:.4f}')  # Print the accuracy

In [None]:
#Test set evaluation
model.eval()
metric = MulticlassAccuracy()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)  # Get the predicted class indices
        metric.update(preds, labels)  # Update the metric with predictions and true labels

accuracy = metric.compute()  # Compute the accuracy
print(f'Test set accuracy: {accuracy:.4f}')  # Print the accuracy