In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, random_split

import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models.resnet import ResNet50_Weights

import numpy as np
import pandas as pd
from PIL import Image

import cv2
import os
import random

from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.utils.class_weight import compute_class_weight

In [None]:
rand_seed = 6
np.random.seed(rand_seed)
random.seed(rand_seed)
torch.manual_seed(rand_seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(rand_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
resnet_transform = transforms.Compose([
    transforms.ToPILImage(),
    # transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class CustomDataset(Dataset):
  def __init__(self, features, labels, transform):
    self.features = features
    self.labels = labels
    self.transform = transform

  def __len__(self):
    return len(self.features)

  def __getitem__(self, idx):
    return self.transform(self.features[idx]), self.labels[idx]

In [None]:
data = pd.read_excel('/content/drive/Shareddrives/TFQ/MemeSEN/multi-sent.xlsx')

In [None]:
label_map = {'neutral': 0, 'positive': 1, 'negative': 2}

In [None]:
# Xi = [i for i in data['image_name']]
Xi = np.load('/content/drive/Shareddrives/TFQ/Model_Checkpoints/Resized_224_Normalized.npy')
Xc = [i for i in data['Captions']]
Y = [label_map[i] for i in data['Label_Sentiment']]

In [None]:
def image_preprocess(image):
  # path = '/content/drive/MyDrive/MemeSEN/Memes/' + path
  # image = cv2.imread(path)
  # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  # image = Image.fromarray(image)
  # image = image.astype(np.uint8)
  return resnet_transform(image)

In [None]:
# Xi = np.load('/content/drive/MyDrive/MemeSEN/ResNet50_Processed_Xi.npy')
# Xi = [image_preprocess(i) for i in Xi]
# Xi_numpy = [i.numpy() for i in Xi]
# np.save('/content/drive/MyDrive/MemeSEN/ResNet50_Processed_Xi.npy', Xi_numpy)

In [None]:
Xi_train, Xi_test, Y_train, Y_test = train_test_split(Xi, Y, test_size=0.3, random_state=6, stratify=Y)
Xi_test, Xi_valid, Y_test, Y_valid = train_test_split(Xi_test, Y_test, test_size=1/3, random_state=6, stratify=Y_test)

In [None]:
class Sampler(object):
    def __init__(self, data_source):
        pass

    def __iter__(self):
        raise NotImplementedError

    def __len__(self):
        raise NotImplementedError

class StratifiedSampler(Sampler):
    def __init__(self, class_vector, batch_size):
        self.n_splits = int(class_vector.size(0) / batch_size)
        self.class_vector = class_vector

    def gen_sample_array(self):
        s = StratifiedShuffleSplit(n_splits=self.n_splits, test_size=0.5)
        X = torch.randn(self.class_vector.size(0),2).numpy()
        y = self.class_vector.numpy()
        s.get_n_splits(X, y)

        train_index, test_index = next(s.split(X, y))
        return np.hstack([train_index, test_index])

    def __iter__(self):
        return iter(self.gen_sample_array())

    def __len__(self):
        return len(self.class_vector)

In [None]:
batch_size=48

sampler = StratifiedSampler(class_vector=torch.tensor(Y_train), batch_size=batch_size)
train_loader = DataLoader(CustomDataset(Xi_train, Y_train, image_preprocess), batch_size=batch_size, sampler=sampler)
valid_loader = DataLoader(CustomDataset(Xi_valid, Y_valid, image_preprocess), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(CustomDataset(Xi_test, Y_test, image_preprocess), batch_size=batch_size, shuffle=False)

In [None]:
resnet50 = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)

num_features = resnet50.fc.in_features
resnet50.fc = nn.Linear(num_features, 3)
resnet50.to(device)

In [None]:
mobilenet_v3 = models.mobilenet_v3_large(weights=models.MobileNet_V3_Large_Weights.IMAGENET1K_V1)
num_features = mobilenet_v3.classifier[3].in_features
mobilenet_v3.classifier[3] = nn.Linear(num_features, 3)
mobilenet_v3.to(device)

In [None]:
densenet161 = models.densenet161(weights=models.DenseNet161_Weights.IMAGENET1K_V1)
num_features = densenet161.classifier.in_features
densenet161.classifier = nn.Linear(num_features, 3)
densenet161.to(device)

In [None]:
# class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(np.array(Y)), y=Y)
# criterion = torch.nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float32, device=device))
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(densenet161.parameters(),
                              lr=0.00001,
                              betas=(0.9, 0.9999),
                              eps=1e-09,
                              weight_decay=0.08)

In [None]:
path = '/content/drive/Shareddrives/TFQ/Model_Checkpoints/DenseNet/'
def train_model(model, start, end, name):
  for epoch in range(start, end):
      model.train()
      running_loss = 0.0
      for inputs, labels in train_loader:
          inputs = inputs.to(device)
          # print(labels)
          labels = labels.to(device)
          optimizer.zero_grad()
          outputs = model(inputs)
          # print(outputs)
          loss = criterion(outputs, labels)
          loss.backward()
          optimizer.step()
          running_loss = loss.item()


      model.eval()
      correct = 0
      total = 0
      with torch.no_grad():
          for inputs, labels in valid_loader:
              inputs = inputs.to(device)
              labels = labels.to(device)
              outputs = model(inputs)
              _, predicted = torch.max(outputs, 1)
              total += labels.size(0)
              correct += (predicted == labels).sum().item()

      val_accuracy = correct / total
      torch.save(model.state_dict(), path + f'{name}_{epoch + 1}.pkl')
      print(f"Epoch {epoch + 1}/{end}, Loss: {running_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

In [None]:
train_model(densenet161, 0, 10, 'densenet161')

Epoch 1/10, Loss: 0.8210, Validation Accuracy: 0.7162
Epoch 2/10, Loss: 0.7779, Validation Accuracy: 0.7140
Epoch 3/10, Loss: 0.6802, Validation Accuracy: 0.7094
Epoch 4/10, Loss: 0.5688, Validation Accuracy: 0.7094
Epoch 5/10, Loss: 0.5700, Validation Accuracy: 0.7162
Epoch 6/10, Loss: 0.4410, Validation Accuracy: 0.7025
Epoch 7/10, Loss: 0.4226, Validation Accuracy: 0.7025
Epoch 8/10, Loss: 0.3190, Validation Accuracy: 0.7025
Epoch 9/10, Loss: 0.1666, Validation Accuracy: 0.7117
Epoch 10/10, Loss: 0.1908, Validation Accuracy: 0.7048


In [None]:
def get_report(model, weight):
    model.load_state_dict(torch.load(weight))
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            y_true.extend(labels.tolist())
            y_pred.extend(predicted.tolist())
    return f'{confusion_matrix(y_true, y_pred)}\n{classification_report(y_true, y_pred)}'

In [None]:
print(get_report(densenet161, '/content/drive/Shareddrives/TFQ/Model_Checkpoints/DenseNet/densenet161_9.pkl'))

[[  0   3  55]
 [  1 153 116]
 [  1  67 478]]
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        58
           1       0.69      0.57      0.62       270
           2       0.74      0.88      0.80       546

    accuracy                           0.72       874
   macro avg       0.47      0.48      0.47       874
weighted avg       0.67      0.72      0.69       874

