In [None]:
# citation: https://www.datacamp.com/tutorial/autoencoder-classifier-python
# citation: https://drive.google.com/drive/folders/1idfa8y7esf7usGo7SSxsH4iKBECEPFNr?usp=share_link 

In [None]:
# from google.colab import drive
import os
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import itertools
import torchvision
from PIL import Image



from torch.utils.data import Dataset, DataLoader, BatchSampler, random_split
from torchvision import transforms
from torch.utils.data import Subset
from PIL import Image

In [None]:
# # Mount Google Drive
# drive.mount('/content/drive')

In [None]:
# Create Dataset class for multilabel classification
class MultiClassImageDataset(Dataset):
    def __init__(self, ann_df, super_map_df, sub_map_df, img_dir, transform=None):
        self.ann_df = ann_df
        self.super_map_df = super_map_df
        self.sub_map_df = sub_map_df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.ann_df)

    def upscale_image(self, image):
        # Upscale the image using bicubic interpolation
        width, height = image.size
        upscaled_image = image.resize((128,128), Image.BICUBIC)
        return upscaled_image

    def __getitem__(self, idx):
        img_name = self.ann_df['image'][idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        # Upscale the image
        image = self.upscale_image(image)

        super_idx = self.ann_df['superclass_index'][idx]
        super_label = self.super_map_df['class'][super_idx]

        sub_idx = self.ann_df['subclass_index'][idx]
        sub_label = self.sub_map_df['class'][sub_idx]

        if self.transform:
            image = self.transform(image)

        return image, super_idx, super_label, sub_idx, sub_label

class MultiClassImageTestDataset(Dataset):
    def __init__(self, super_map_df, sub_map_df, img_dir, transform=None):
        self.super_map_df = super_map_df
        self.sub_map_df = sub_map_df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self): # Count files in img_dir
        return len([fname for fname in os.listdir(self.img_dir)])

    def __getitem__(self, idx):
        img_name = str(idx) + '.jpg'
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, img_name

In [None]:
train_ann_df = pd.read_csv('train_data.csv')
super_map_df = pd.read_csv('superclass_mapping.csv')
sub_map_df = pd.read_csv('subclass_mapping.csv')

train_img_dir = 'train_shuffle'
test_img_dir = 'test_shuffle'

image_preprocessing = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0), std=(1)),
])

# Create train and val split
train_dataset = MultiClassImageDataset(train_ann_df, super_map_df, sub_map_df, train_img_dir, transform=image_preprocessing)
print(len(train_dataset))
train_dataset, val_dataset = random_split(train_dataset, [0.9, 0.1])

# Create test dataset
test_dataset = MultiClassImageTestDataset(super_map_df, sub_map_df, test_img_dir, transform=image_preprocessing)

# Create dataloaders
batch_size = 8
train_loader = DataLoader(train_dataset,
                          batch_size=batch_size,
                          shuffle=True)

val_loader = DataLoader(val_dataset,
                        batch_size=batch_size,
                        shuffle=True)

test_loader = DataLoader(test_dataset,
                         batch_size=1,
                         shuffle=False)

In [None]:
# CNN
class Autoencoder(nn.Module):
    def __init__(self):
        super().__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(3, 8, 3, padding='same'),
            nn.ReLU(True),
            nn.BatchNorm2d(8),
            nn.Conv2d(8, 16, 3, padding='same'),
            nn.ReLU(True),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 32, 3, padding='same'),
            nn.ReLU(True),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 64, 3, padding='same'),
            nn.ReLU(True),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 128, 3, padding='same'),
            nn.ReLU(True),
            nn.BatchNorm2d(128),
            nn.Flatten(start_dim=1),
            nn.Linear(2097152, 256),
            nn.ReLU(True),
            nn.Linear(256, 128)
        )



        self.decoder = nn.Sequential(
            nn.Linear(128, 256),
            nn.ReLU(True),
            nn.Linear(256, 3 * 3 * 128),
            nn.ReLU(True),
            nn.Unflatten(dim=1, unflattened_size=(128, 3, 3)),
            nn.ConvTranspose2d(128, 64, 3, stride=2, output_padding=0),
            nn.ReLU(True),
            nn.BatchNorm2d(64),
            nn.ConvTranspose2d(64, 32, 3,stride=2, output_padding=0),
            nn.ReLU(True),
            nn.BatchNorm2d(32),
            nn.ConvTranspose2d(32, 16, 3, stride=2, output_padding=0),
            nn.ReLU(True),
            nn.BatchNorm2d(16),
            nn.ConvTranspose2d(16, 8, 3, stride=2, output_padding=0),
            nn.ReLU(True),
            nn.BatchNorm2d(8),
            nn.ConvTranspose2d(8, 3, 3, stride=2, output_padding=0)
        )


        self.fc1 = nn.Linear(48387, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3a = nn.Linear(128, 4)
        self.fc3b = nn.Linear(128, 88)


    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        super_out = self.fc3a(x)
        sub_out = self.fc3b(x)
        return super_out, sub_out

class Trainer():
    def __init__(self, model, criterion, optimizer, train_loader, val_loader, test_loader=None, device='cpu'):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.device = device
        self.max_super_prob_all = []
        self.max_sub_prob_all = []

    def train_epoch(self):
        running_loss = 0.0
        for i, data in enumerate(self.train_loader):
            inputs, super_labels, sub_labels = data[0].to(device), data[1].to(device), data[3].to(device)

            self.optimizer.zero_grad()
            super_outputs, sub_outputs = self.model(inputs)
            loss = self.criterion(super_outputs, super_labels) + self.criterion(sub_outputs, sub_labels)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()

        print(f'Training loss: {running_loss/i:.3f}')

    def validate_epoch(self):
        super_correct = 0
        sub_correct = 0
        total = 0
        running_loss = 0.0
        with torch.no_grad():
            for i, data in enumerate(self.val_loader):
                inputs, super_labels, sub_labels = data[0].to(device), data[1].to(device), data[3].to(device)

                super_outputs, sub_outputs = self.model(inputs)
                loss = self.criterion(super_outputs, super_labels) + self.criterion(sub_outputs, sub_labels)

                # Apply softmax to get probabilities
                super_probs = F.softmax(super_outputs, dim=1)
                sub_probs = F.softmax(sub_outputs, dim=1)

                # Get maximum probability values and corresponding predicted classes
                max_super_prob, super_predicted = torch.max(super_probs, 1)
                max_sub_prob, sub_predicted = torch.max(sub_probs, 1)

                # max_super_prob, super_predicted = torch.max(super_outputs.data, 1)
                # max_sub_prob, sub_predicted = torch.max(sub_outputs.data, 1)

                print('max_super_prob:', max_super_prob)
                print('max_sub_prob:', max_sub_prob)

                self.max_super_prob_all.append(max_super_prob)
                self.max_sub_prob_all.append(max_sub_prob)

                total += super_labels.size(0)
                super_correct += (super_predicted == super_labels).sum().item()
                sub_correct += (sub_predicted == sub_labels).sum().item()
                running_loss += loss.item()

        print(f'Validation loss: {running_loss/i:.3f}')
        print(f'Validation superclass acc: {100 * super_correct / total:.2f} %')
        print(f'Validation subclass acccc: {100 * sub_correct / total:.2f} %')

    def test(self, save_to_csv=False, return_predictions=False):
        if not self.test_loader:
            raise NotImplementedError('test_loader not specified')

        # Evaluate on test set, in this simple demo no special care is taken for novel/unseen classes
        test_predictions = {'image': [], 'superclass_index': [], 'subclass_index': []}
        with torch.no_grad():
            for i, data in enumerate(self.test_loader):
                inputs, img_name = data[0].to(device), data[1]

                super_outputs, sub_outputs = self.model(inputs)
                # Commented is the old method
                # _, super_predicted = torch.max(super_outputs.data, 1)
                # _, sub_predicted = torch.max(sub_outputs.data, 1)

                 # Apply softmax to get probabilities
                super_probs = F.softmax(super_outputs, dim=1)
                sub_probs = F.softmax(sub_outputs, dim=1)

                # is_novel_super = max_prob < threshold

                # Get maximum probability values and corresponding predicted classes
                max_super_prob, super_predicted = torch.max(super_probs, 1)
                max_sub_prob, sub_predicted = torch.max(sub_probs, 1)

                super_threshold = 0.3 # 0.4 gave 0.41053 --> need to drop this to 0.05 or 0.1 and retest
                sub_threshold = 0.4 # because the mean is around 0.6 and the mean is much smaller

                sub_predicted[max_sub_prob < sub_threshold] = 87
                super_predicted[max_super_prob < super_threshold] = 3

                test_predictions['image'].append(img_name[0])
                test_predictions['superclass_index'].append(super_predicted.item())
                test_predictions['subclass_index'].append(sub_predicted.item())

        test_predictions = pd.DataFrame(data=test_predictions)

        if save_to_csv:
            test_predictions.to_csv('example_test_predictions.csv', index=False)

        if return_predictions:
            return test_predictions

    def max_probs_all(self):
        return self.max_super_prob_all, self.max_sub_prob_all



In [None]:
# Init model and trainer
device = 'cuda'
model = Autoencoder().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
trainer = Trainer(model, criterion, optimizer, train_loader, val_loader, test_loader)

In [None]:
# Training loop
for epoch in range(10):
    print(f'Epoch {epoch+1}')
    trainer.train_epoch()
    trainer.validate_epoch()
    print('')

print('Finished Training')

In [None]:
trainer.test(save_to_csv=True, return_predictions=True)

'''
This simple baseline scores the following test accuracy

Superclass Accuracy
Overall: 43.83 %
Seen: 61.11 %
Unseen: 0.00 %

Subclass Accuracy
Overall: 2.03 %
Seen: 9.56 %
Unseen: 0.00 %
'''

In [None]:
# Read the CSV file into a DataFrame
file_path = 'example_test_predictions.csv'  # Replace with the actual file path
df = pd.read_csv(file_path)

# Display the original DataFrame
print("Original DataFrame:")
print(df)

# Select two columns
selected_columns = df[['image', 'subclass_index']]

# Rename one of the columns
selected_columns = selected_columns.rename(columns={'image': 'ID'})
selected_columns = selected_columns.rename(columns={'subclass_index': 'Target'})

# Save the modified DataFrame to a new CSV file
output_file_path = 'sub_test.csv'  # Replace with the desired output file path
selected_columns.to_csv(output_file_path, index=False)

# Select two columns
selected_column = df[['image', 'superclass_index']]

# Rename one of the columns
selected_column = selected_column.rename(columns={'image': 'ID'})
selected_column = selected_column.rename(columns={'superclass_index': 'Target'})
print(selected_columns)

# Save the modified DataFrame to a new CSV file
output_file_path = 'super_test.csv'  # Replace with the desired output file path
selected_column.to_csv(output_file_path, index=False)