# Imports and Drive Acces

In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

In [None]:
import os
import torch
import pandas as pd
import numpy as np
import torch.nn as nn
from tqdm import tqdm
from torch import Tensor
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision.io import read_image
from torchvision.transforms import v2, Lambda

# Data

In [None]:
# Extract the image tar files
!tar -xf "/content/drive/MyDrive/Graduation Project/AffectNet/train_images.tar" -C "/content/"
!tar -xf "/content/drive/MyDrive/Graduation Project/AffectNet/val_images.tar" -C "/content/"
!tar -xf "/content/drive/MyDrive/Graduation Project/AffectNet/test_images.tar" -C "/content/"

In [None]:
# Define train data path
train_annotations = "/content/drive/MyDrive/Graduation Project/AffectNet/train_annotations.csv"
train_images = "/content/train_images"

# Define test data path
val_annotations = "/content/drive/MyDrive/Graduation Project/AffectNet/val_annotations.csv"
val_images = "/content/val_images"

# Define test data path
test_annotations = "/content/drive/MyDrive/Graduation Project/AffectNet/test_annotations.csv"
test_images = "/content/test_images"

In [None]:
class AffectNet(Dataset):
    """
    A Dataset subclass for handling the AffectNet dataset.

    Attributes:
        annotations (DataFrame): The annotations for the images.
        root_dir (str): The root directory where the images are stored.
        transform (callable, optional): Optional transform to be applied on an image.
    """

    def __init__(self, annotations_file, img_root_dir, transform=None):
        """
        Initializes the AffectNet dataset.

        Args:
            annotations_file (str): The path to the CSV file containing the annotations.
            img_root_dir (str): The root directory where the images are stored.
            transform (callable, optional): Optional transform to be applied on an image.
        """

        self.annotations = pd.read_csv(annotations_file)
        self.root_dir = img_root_dir
        self.transform = transform

        # Check if number of images and annotations match
        if len(self.annotations) != len(os.listdir(self.root_dir)):
            raise ValueError(f"Number of images and annotations do not match:\
            {len(self.annotations)} != {len(os.listdir(self.root_dir))}"
                             )

    def __len__(self):
        """
        Returns the length of the dataset.

        Returns:
            int: The length of the dataset.
        """

        return len(self.annotations)


    def sample_dist(self):
        val_count = self.annotations.expression.value_counts()
        val_count = val_count.to_dict()
        category_weights = [1 / val_count[i] for i in sorted(val_count.keys())]
        return category_weights


    def sample_weights(self):
        category_weights = self.sample_dist()
        sample_weights = [category_weights[exp] for exp in self.annotations.expression.values]
        return sample_weights


    def __getitem__(self, idx):
        """
        Returns the image and its labels at the given index.

        Args:
            idx (int): The index of the image.

        Returns:
            tuple: A tuple containing the image, and its labels.
        """

        # Get image name and create path
        img_name = f"{self.annotations.iloc[idx, 0]}.jpg"
        img_path = os.path.join(self.root_dir, img_name)

        # Read image
        image = read_image(img_path)

        # Get labels and convert to tensor
        labels = self.annotations.iloc[idx, -1]
        labels = torch.tensor(labels)
        # Apply input transforms
        if self.transform:
            image = self.transform(image)

        # Return image and labels
        return image, labels

# Model

## Tokenizer

In [None]:
class Tokenizer(nn.Module):
    def __init__(self,
                 kernel_size=3, stride=2, padding=0,
                 pooling_kernel_size=3, pooling_stride=2, pooling_padding=1,
                 n_conv_layers=2,
                 n_input_channels=3,
                 n_output_channels=64,
                 in_planes=64,
                 ):
        super(Tokenizer, self).__init__()

        n_filter_list = [n_input_channels] + \
                        [in_planes for _ in range(n_conv_layers - 1)] + \
                        [n_output_channels]

        self.conv_layers = nn.Sequential(
            *[nn.Sequential(
                nn.Conv2d(n_filter_list[i], n_filter_list[i + 1],
                          kernel_size=kernel_size,
                          stride=stride,
                          padding=padding,
                          bias=False),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=pooling_kernel_size,
                             stride=pooling_stride,
                             padding=pooling_padding)
            ) for i in range(n_conv_layers) ])

        self.flattener = nn.Flatten(2, 3)

    def forward(self, x):
        return self.flattener(self.conv_layers(x)).transpose(-2, -1)

## Encoder

In [None]:
class MultiheadedSelfAttention(nn.Module):
    def __init__(self,
                 embed_dim,
                 num_heads=8,
                 attn_dropout=0.5,
                 proj_dropout=0.5,
                 ):
        super().__init__()
        self.num_heads = num_heads
        assert embed_dim % num_heads == 0, "Embedding dim must be divisible by number of heads."
        head_dim = embed_dim // num_heads
        self.scale = head_dim ** -0.5

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.attn_dropout = nn.Dropout(attn_dropout)
        self.projection = nn.Linear(embed_dim, embed_dim)
        self.proj_dropout = nn.Dropout(proj_dropout)

    def forward(self, x):
        B, N, C = x.shape
        qkv = (
            self.qkv(x) # B, N, (3*C)
            .reshape(B, N, 3, self.num_heads, C // self.num_heads) # B, N, 3(qkv), H(eads), embed_dim
            .permute(2, 0, 3, 1, 4) # 3, B, H(eads), N, emb_dim
        )
        q, k, v = torch.chunk(qkv, 3) # B, H, N, dim
        # B,H,N,dim x B,H,dim,N -> B,H,N,N
        attn = torch.matmul(q, k.transpose(-2, -1)) * self.scale # <q,k> / sqrt(d)
        attn = attn.softmax(dim=-1) # Softmax over embedding dim
        attn = self.attn_dropout(attn)

        x = ( # B, H, N, N
            torch.matmul(attn, v) # B,H,N,N x B,H,N,dim -> B, H, N, dim
            .transpose(1, 2) # B, N, H, dim
            .reshape(B, N, C) # B, N, (H*dim)
        )
        x = self.projection(x)
        x = self.proj_dropout(x)

        return x

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self,
                 embed_dim=192,
                 num_heads=8,
                 attn_dropout=0.5,
                 proj_dropout=0.5,
                 mlp_dropout=0.1,
                 feedforward_dim=768,
            ):
        super().__init__()
        self.norm_1 = nn.LayerNorm(embed_dim)
        self.norm_2 = nn.LayerNorm(embed_dim)
        self.MHA = MultiheadedSelfAttention(embed_dim,
                                        num_heads,
                                        attn_dropout,
                                        proj_dropout,
                   )
        self.ff = nn.Sequential(nn.Linear(embed_dim, feedforward_dim),
                                nn.GELU(),
                                nn.Dropout(mlp_dropout),
                                nn.Linear(feedforward_dim, embed_dim),
                                nn.Dropout(mlp_dropout),
                 )

    def forward(self, x):
        mha = self.norm_1(x)
        mha = self.MHA(mha)
        x = x + mha # Residual connection (Add)

        x = self.norm_2(x)
        x2 = self.ff(x)
        x = x + x2  # Residual connection (Add)

        return x

## CCT

In [None]:
class CCT(nn.Module):
    def __init__(self,
                 num_encoders=7,
                 num_classes=8,
                 embed_dim=64,
                 num_heads=8,
                 attn_dropout=0.5,
                 proj_dropout=0.5,
                 mlp_dropout=0.1,
                 feedforward_dim=768,
            ):
        super(CCT, self).__init__()
        self.tokenizer = Tokenizer(kernel_size=3, stride=2, padding=0, n_conv_layers=2)
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(64, embed_dim)
        self.transformer = self.create_encoders(embed_dim, num_heads,
                                                attn_dropout, proj_dropout,
                                                mlp_dropout, feedforward_dim,
                                                num_encoders)

        self.attention_pool = nn.Linear(embed_dim, 1)
        self.norm = nn.LayerNorm(embed_dim)
        self.fc = nn.Linear(embed_dim, num_classes)

        #for param in self.vgg.parameters():
        #    param.requires_grad = False



    def create_encoders(self, embed_dim=64,
                        num_heads=8,
                        attn_dropout=0.5,
                        proj_dropout=0.5,
                        mlp_dropout=0.1,
                        feedforward_dim=768,
                        num_layers=2,
                       ):
        return nn.Sequential(*[EncoderLayer(embed_dim, num_heads, attn_dropout, proj_dropout, mlp_dropout, feedforward_dim) for _ in range(num_layers)])


    def forward(self, x):
        x = self.tokenizer(x)
        x = self.fc1(x).squeeze(dim=2)
        x = self.transformer(x)
        x = torch.matmul(F.softmax(self.attention_pool(x), dim=1).transpose(-1, -2), x).squeeze()
        x = self.fc(x)
        return x

# Train

In [None]:
# Hyperparameters
learning_rate = 0.0001
batch_size = 128
device = "cuda" if torch.cuda.is_available() else "cpu"
epochs = 10

In [None]:
torch.manual_seed(42)

train_transforms = v2.Compose([
    v2.RandAugment(num_ops=5),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transforms = v2.Compose([
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_data = AffectNet(train_annotations, train_images, train_transforms)
val_data = AffectNet(val_annotations, val_images, val_transforms)

sample_weights = train_data.sample_weights()

train_loader = DataLoader(train_data,
                          batch_size=batch_size,
                          num_workers=2,
                          sampler=WeightedRandomSampler(weights=sample_weights, num_samples=3*len(train_data), replacement=True)
                          )
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
#path = "/content/drive/MyDrive/Graduation Project/Logs/MCCT-6_OS/MCCT-6_OS_ckpt_3.pt"
model = CCT(num_encoders=7)
#model.load_state_dict(torch.load(path), strict=False)
#model.to(device)

total_params = sum(p.numel() for p in model.parameters())
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params}, Total trainable params: {total_trainable_params}")

In [None]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
#optimizer.load_state_dict(torch.load("/content/drive/MyDrive/Graduation Project/Logs/VGGT-1/optimizer_state_dict.pt"))

In [None]:
# Set up logging list
logs = []

for epoch in range(3, epochs):
    # Training phase
    total_loss = 0.0
    correct = 0.0
    total = 0
    model.train()
    for (inputs, targets) in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{epochs}', leave=False, unit="batch"):
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)

        loss = criterion(outputs, targets)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        predicted = outputs.argmax(dim=1)
        correct += (predicted == targets).sum().item()
        total += targets.shape[0]

    # Validation phase
    model.eval()
    val_total_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            val_total_loss += loss.item()
            val_predicted = outputs.argmax(dim=1)
            val_correct += (val_predicted == targets).sum().item()
            val_total += targets.shape[0]


    train_loss = total_loss / total
    train_acc = correct / total

    val_loss = val_total_loss / val_total
    val_acc = val_correct / val_total

    logs.append({'Epoch': epoch+1,
                 'Loss': train_loss,
                 'Accuracy' : train_acc,
                 'VAL_Loss': val_loss,
                 'VAL_Accuracy' : val_acc,
            })

    print(f'Epoch {epoch + 1}/{epochs} - Train loss: {train_loss:.4f} - Train acc: {train_acc:.4f} - Val loss: {val_loss:.4f} - Val acc: {val_acc:.4f}')


    ckpt_path = f"/content/drive/MyDrive/Graduation Project/Logs/MCCT-6_OS/MCCT-6_OS_ckpt_{epoch+1}.pt"
    torch.save(model.state_dict(), ckpt_path)
    log_df = pd.DataFrame(logs)
    log_df.to_csv(f'/content/drive/MyDrive/Graduation Project/Logs/MCCT-6_OS/MCCT-6_OS_log_{epoch+1}.csv', index=False)
    torch.save(optimizer.state_dict(), "/content/drive/MyDrive/Graduation Project/Logs/MCCT-6_OS/optimizer_state_dict.pt")


print('Finished Training')

# Test

In [None]:
path = "/content/drive/MyDrive/Graduation Project/Logs/VGGT-1/VGGT-1_ckpt_77.pt"
model = CCT()
model.load_state_dict(torch.load(path), strict=False)
model.to(device)

total_params = sum(p.numel() for p in model.parameters())
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params}, Total trainable params: {total_trainable_params}")

In [None]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [None]:
torch.manual_seed(42)

test_transforms = v2.Compose([
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_data = AffectNet(test_annotations, test_images, test_transforms)

test_loader = DataLoader(test_data,
                          batch_size=32,
                          shuffle=True
                          )

In [None]:
# Set up logging list
test_logs = []

# Test phase
test_total_loss = 0.0
test_correct = 0.0
test_total = 0
model.eval()
y_pred = []
y_true = []
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_total_loss += loss.item()
        test_predicted = outputs.argmax(dim=1)
        y_pred.extend(test_predicted.data.cpu().numpy())
        y_true.extend(targets.data.cpu().numpy())
        test_correct += (test_predicted == targets).sum().item()
        test_total += targets.shape[0]


test_loss = test_total_loss / test_total
test_acc = test_correct / test_total

print(f'Test loss: {test_loss:.4f} - Test acc: {test_acc:.4f}')

print('Finished Evaluating')

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sn

labels = ["Neutral", "Happiness", "Sadness", "Surprise", "Fear", "Disgust", "Anger",
"Contempt"]
cm = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cm / np.sum(cm, axis=1)[:, None], index = [i for i in labels],
                     columns = [i for i in labels])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('output.png')