# Install required packages

In [None]:
#Installing required repositories
!pip install sentence-transformers
!sudo apt install tesseract-ocr
!pip3 install pytesseract

# Import Necessary libraries

In [None]:
import torch
import torchvision.models as models
from torchvision import datasets, transforms as T
from torchvision.datasets import FakeData
from sentence_transformers import SentenceTransformer
from torch.utils.data import DataLoader, Dataset
from oauth2client.client import GoogleCredentials
from torchvision import datasets, transforms
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import os
import pandas as pd
import numpy as np
from torch import Tensor
from torch.nn import Linear
from torch.nn import ReLU
from torch.nn import Softmax
from torch.nn import Module
from torch.optim import SGD
from torch.nn import CrossEntropyLoss
from torch.nn.init import kaiming_uniform_
from torch.nn.init import xavier_uniform_
from sklearn.metrics import accuracy_score, classification_report


In [None]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


In [None]:
import warnings
warnings.filterwarnings("ignore")

plt.ion() 


if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
CUDA_LAUNCH_BLOCKING=1

# Data exploration

In [None]:
path =  'data.csv'
df = pd.read_csv(path)

In [None]:
df.count()

In [None]:
print(df['2_way_label'].unique())
print(df['3_way_label'].unique())
print(df['6_way_label'].unique())

In [None]:
print("Class 0 in 2 way: "+ str(sum(df['2_way_label'] == 0)))
print("Class 1 in 2 way: "+ str(sum(df['2_way_label'] == 1)))
print("\n\nClass 0 in 3 way: "+ str(sum(df['3_way_label'] == 0)))
print("Class 1 in 3 way: "+ str(sum(df['3_way_label'] == 1)))
print("Class 2 in 3 way: "+ str(sum(df['3_way_label'] == 2)))
print("\n\nClass 0 in 6 way: "+ str(sum(df['6_way_label'] == 0)))
print("Class 1 in 6 way: "+ str(sum(df['6_way_label'] == 1)))
print("Class 2 in 6 way: "+ str(sum(df['6_way_label'] == 2)))
print("Class 3 in 6 way: "+ str(sum(df['6_way_label'] == 3)))
print("Class 4 in 6 way: "+ str(sum(df['6_way_label'] == 4)))
print("Class 5 in 6 way: "+ str(sum(df['6_way_label'] == 5)))

In [None]:
df['hasImage'].isnull().any()

In [None]:
df['clean_title'].isnull().any()

# Sentence BERT Embeddings 

In [None]:
#import model for SBERT
from sentence_transformers import SentenceTransformer
sentence_model = SentenceTransformer('all-mpnet-base-v2')

In [None]:
#Encode the sentences to extract embeddings
sentence = list(df['clean_title'])
sentence = sentence_model.encode(sentence)
print(sentence.shape)

In [None]:
combined_embeddings = sentence

In [None]:
#Not neccesary to include this, but the class definition has transform mentioned, hence it is added
transform = T.Compose([T.Resize(256), T.CenterCrop(224), T.ToTensor(), 
                       T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

In [None]:
print(len(combined_embeddings), len(combined_embeddings[0]))

In [None]:
combined_embeddings = np.asarray(combined_embeddings)
combined_embeddings = torch.tensor(np.array([combined_embeddings]))

In [None]:
print(combined_embeddings.shape)
print(combined_embeddings.shape[2])
print(combined_embeddings.type)

# Dataset and Dataloader using Pytorch

In [None]:
class ImageSentenceDataset(Dataset):

    def __init__(self, csv_path, embedding, transform = transforms.ToTensor()):
        self.csv_path = csv_path
        self.transform = transform
        self.embeddings = embedding
        self.labels = pd.read_csv(csv_path).iloc[:, 15]

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        #Sentence
        label = self.labels[index]
        embedding = self.embeddings[index]
        return (embedding, label)

In [None]:
path =  'data.csv'

In [None]:
custom_dataset_from_csv = ImageSentenceDataset(path, combined_embeddings[0], 
                                               transform= transform)

In [None]:
print(len(custom_dataset_from_csv))

In [None]:
#Test Train Val Split
train_set, test_set, val_set = torch.utils.data.random_split(custom_dataset_from_csv, [17622-3000, 1500, 1500])

In [None]:
print(len(train_set), len(test_set), len(val_set))

In [None]:
custom_dataset_loader = torch.utils.data.DataLoader(dataset=train_set,
                                                    batch_size=28,
                                                    shuffle=False)

In [None]:
#Just confirm what you see here
for em, label in (custom_dataset_loader):
  print(em, em.shape)
  break

In [None]:
#Define MLP
class MLP(Module):
    # define model elements
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # input to first hidden layer
        self.hidden1 = Linear(n_inputs, 100)
        kaiming_uniform_(self.hidden1.weight, nonlinearity='relu')
        self.act1 = ReLU()
        # second hidden layer
        self.hidden2 = Linear(100, 30)
        kaiming_uniform_(self.hidden2.weight, nonlinearity='relu')
        self.act2 = ReLU()
        self.hidden3 = Linear(30, 6)
        kaiming_uniform_(self.hidden3.weight, nonlinearity='relu')
        self.act3 = ReLU()
        # third hidden layer and output
        self.hidden4 = Linear(6, 6)
        xavier_uniform_(self.hidden4.weight)
        self.act4 = Softmax(dim=1)

    # forward propagate input
    def forward(self, X):
        # input to first hidden layer
        X = self.hidden1(X)
        X = self.act1(X)
        # second hidden layer
        X = self.hidden2(X)
        X = self.act2(X)
        # output layer
        X = self.hidden3(X)
        X = self.act3(X)
        X = self.hidden4(X)
        X = self.act4(X)
        return X

In [None]:
model = MLP(combined_embeddings.shape[2]).to(device)
# train the model
#train_model(custom_dataset_loader, model)
optimizer = torch.optim.Adam(model.parameters(), amsgrad = True)
criterion = nn.CrossEntropyLoss()

In [None]:
#Define a training function
def train(epoch, log_interval=200):
    # Set model to training mode
    model.train()
    for epoch in range(epoch):
        # Loop over each batch from the training set
        for batch_idx, (data, target) in enumerate(custom_dataset_loader):
            # Copy data to GPU if needed
            data = data.to(device)
            target = target.to(device)
            # Zero gradient buffers
            optimizer.zero_grad() 
            # Pass data through the network
            output = model(data)
            # Calculate loss
            loss = criterion(output, target)
            # Backpropagate
            loss.backward()        
            # Update weights
            optimizer.step()
            
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(custom_dataset_loader.dataset),
                    100. * batch_idx / len(custom_dataset_loader), loss.data.item()))



In [None]:
train(epoch=100)

# Evaluation

In [None]:
#Evaluation of model
def evaluate_model(test_dl, model):
    predictions, actuals = list(), list()
    for i, (inputs, targets) in enumerate(test_dl):
        # evaluate the model on the test set
        inputs = inputs.to(device)
        targets = targets.to(device)

        yhat = model(inputs)
        # retrieve numpy array
        yhat = yhat.cpu().detach().numpy()
        actual = targets.cpu().numpy()
        # convert to class labels
        yhat = np.argmax(yhat, axis=1)
        # reshape for stacking
        actual = actual.reshape((len(actual), 1))
        yhat = yhat.reshape((len(yhat), 1))
        # store
        predictions.append(yhat)
        actuals.append(actual)
    predictions, actuals = np.vstack(predictions), np.vstack(actuals)
    # calculate accuracy
    acc = accuracy_score(actuals, predictions)
    report = classification_report(actuals, predictions)
    return acc, report

In [None]:
test_dataset_loader = torch.utils.data.DataLoader(dataset=test_set,
                                                    batch_size=1024,
                                                    shuffle=False)

In [None]:
acc, report = evaluate_model(test_dataset_loader, model)
print("Accuracy and classification report")
print(acc)
print(report)

In [None]:
val_dataset_loader = torch.utils.data.DataLoader(dataset=val_set,
                                                    batch_size=64,
                                                    shuffle=False)


In [None]:
acc, report = evaluate_model(val_dataset_loader, model)
print("Accuracy and classification report")
print(acc)
print(report)

In [None]:
# New heading