#### Omid Davar @2023

In [None]:
import pandas as pd
import spacy
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import lightning as pl
from torch.utils.data import Dataset, DataLoader
from spacy.lang.en import English
from tqdm import tqdm
import torchmetrics

nlp = spacy.load("en_core_web_lg")

class MulticlassTextDataset(Dataset):
    def __init__(self , load_embeddings = True):
        self.load_embeddings = load_embeddings
        self.train_df = pd.read_csv(r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\data\DigiKala\train_sm.csv')
        self.test_df = pd.read_csv(r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\data\DigiKala\test_sm.csv')
        self.train_df2 = pd.read_csv(r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\data\DigiKala\train_sm2.csv')
        self.test_df2 = pd.read_csv(r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\data\DigiKala\test_sm2.csv')
        self.train_df.columns = ['Polarity', 'Title', 'Review']
        self.test_df.columns = ['Polarity', 'Title', 'Review']
        self.train_df['Review'] = self.train_df['Title'].astype(str) + ' ' +  self.train_df['Review'].astype(str)
        self.test_df['Review'] = self.test_df['Title'].astype(str) + ' ' +  self.test_df['Review'].astype(str)
        self.train_df = self.train_df[['Polarity', 'Review']]
        self.test_df = self.test_df[['Polarity', 'Review']]
        self.train_df2.columns = ['Polarity', 'Title', 'Review']
        self.test_df2.columns = ['Polarity', 'Title', 'Review']
        self.train_df2['Review'] = self.train_df2['Title'].astype(str) + ' ' +  self.train_df2['Review'].astype(str)
        self.test_df2['Review'] = self.test_df2['Title'].astype(str) + ' ' +  self.test_df2['Review'].astype(str)
        self.train_df2 = self.train_df2[['Polarity', 'Review']]
        self.test_df2 = self.test_df2[['Polarity', 'Review']]
        self.data = pd.concat([self.train_df, self.test_df,self.train_df2 , self.test_df2])
        self.data.index = np.arange(0, 110000)
        # activate one line below
        labels = self.data['Polarity'][:]
        labels = labels.apply(lambda p: 0 if p == 1 else 1).to_numpy()
        labels = torch.from_numpy(labels)
        self.labels = torch.squeeze(labels.to(torch.float32).view(-1, 1).to('cpu') , 1)
        if load_embeddings:
            self.embeddings_tensor =  torch.load(r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\data\DigiKala\embeddings.pt').to('cpu')
        else:
            embeddings = []
            for text in tqdm(self.data['Review'] , 'Creating Embeddings ...'):
                doc = nlp(text)
                embedding = doc.vector
                embeddings.append(embedding)
            self.data['embedding'] = embeddings
            self.embeddings_tensor = torch.tensor(embeddings)
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return [self.embeddings_tensor[idx], self.labels[idx]]


class MulticlassSpacyClassifierModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MulticlassSpacyClassifierModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x


class MulticlassSpacyTextClassifier(pl.LightningModule):
    def __init__(self, input_dim, hidden_dim, output_dim , batch_size=256):
        super(MulticlassSpacyTextClassifier, self).__init__()
        self.model = MulticlassSpacyClassifierModel(input_dim, hidden_dim, output_dim)
        self.loss_fn = nn.CrossEntropyLoss()
        self.batch_size = batch_size
        self.train_acc = torchmetrics.Accuracy(task="multiclass" , num_classes=output_dim)
        self.val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=output_dim)
        self.lr = 1e-3

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x = batch[0]
        y = batch[1]
        y_pred = self.model(x)
        
        y_pred = y_pred.float()
        y = y.long()  
        loss = self.loss_fn(y_pred, y)
        self.train_acc(torch.squeeze(torch.argmax(y_pred, dim=1)), y)
        self.log('train_acc', self.train_acc, prog_bar=True, on_epoch=True, on_step=True, batch_size=self.batch_size)
        return loss

    def validation_step(self, batch, *args, **kwargs):
        x = batch[0]
        y = batch[1]
        y_pred = self.model(x)
        # Cast the model outputs and target labels to the appropriate data types
        y_pred = y_pred.float()  # Ensure y_pred is of type torch.float
        y = y.long()  # Ensure y is of type torch.long
        self.val_acc(torch.squeeze(torch.argmax(y_pred, dim=1)), y)
        self.log('val_acc', self.val_acc, prog_bar=True, on_epoch=True, on_step=True, batch_size=self.batch_size)

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=self.lr)


input_dim = 300  # SpaCy embeddings dimension
hidden_dim = 100
output_dim = 2  
trainer = pl.Trainer(max_epochs=70)


multiclass_dataset = MulticlassTextDataset()



train_size = int(0.8 * len(multiclass_dataset))
val_size = int(0.05 * len(multiclass_dataset))
test_size = len(multiclass_dataset) - (train_size + val_size)
train_dataset, val_dataset , test_dataset = torch.utils.data.random_split(multiclass_dataset, [train_size, val_size,test_size])


train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=256 , shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=256 , shuffle=False)


multiclass_model = MulticlassSpacyTextClassifier(input_dim, hidden_dim, output_dim , batch_size=256)



In [None]:
import torch
from Scripts.Models.LightningModels.LightningModels import BaseLightningModel
from Scripts.Models.ModelsManager.ModelManager import ModelManager
import pandas as pd
import matplotlib.pyplot as plt
from typing import List
from torch_geometric.nn import summary
from lightning.pytorch.callbacks import Callback, ModelCheckpoint, EarlyStopping
from os import path

from sklearn.metrics import classification_report, f1_score, accuracy_score, precision_score, recall_score, confusion_matrix, hinge_loss

class ClassifierModelManager(ModelManager):

    def __init__(self,
                 torch_model: torch.nn.Module,
                 lightning_model: BaseLightningModel,
                 model_save_dir: str = '~/Desktop',
                 log_dir: str = 'logs/',
                 log_name: str = 'model_logs',
                 device='cpu',
                 num_train_epoch = 100):
        super(ClassifierModelManager, self).__init__(torch_model, lightning_model, model_save_dir, log_dir, log_name, device, num_train_epoch)

    def _create_callbacks(self) -> List[Callback]:
        return [
            ModelCheckpoint(save_top_k=2, mode='max', monitor='val_acc', save_last=True),
            # EarlyStopping(patience=50, mode='max', monitor='val_acc')
        ]

    def draw_summary(self, dataloader):
        X, y = next(iter(dataloader))
        print(summary(self.torch_model, X.to(self.device)))

    def plot_csv_logger(self, loss_names=['train_loss', 'val_loss'], eval_names=['train_acc', 'val_acc']):
        csv_path = path.join(self.log_dir, self.log_name, f'version_{self.logger.version}', 'metrics.csv')
        metrics = pd.read_csv(csv_path)

        aggregation_metrics = []
        agg_col = 'epoch'
        for i, dfg in metrics.groupby(agg_col):
            agg = dict(dfg.mean())
            agg[agg_col] = i
            aggregation_metrics.append(agg)

        df_metrics = pd.DataFrame(aggregation_metrics)
        df_metrics[loss_names].plot(grid=True, legend=True, xlabel='Epoch', ylabel='loss')
        df_metrics[eval_names].plot(grid=True, legend=True, xlabel='Epoch', ylabel='accuracy')
        plt.show()

    def save_plot_csv_logger(self, loss_names=['train_loss', 'val_loss'], eval_names=['train_acc', 'val_acc'], name_prepend: str=""):
        csv_path = path.join(self.log_dir, self.log_name, f'version_{self.logger.version}', 'metrics.csv')
        metrics = pd.read_csv(csv_path)

        aggregation_metrics = []
        agg_col = 'epoch'
        for i, dfg in metrics.groupby(agg_col):
            agg = dict(dfg.mean())
            agg[agg_col] = i
            aggregation_metrics.append(agg)

        df_metrics = pd.DataFrame(aggregation_metrics)
        # df_metrics[loss_names].plot(grid=True, legend=True, xlabel='Epoch', ylabel='loss')
        
        # loss_png = path.join(self.log_dir, self.log_name, f'version_{self.logger.version}', f'{name_prepend}_loss_metric.png')
        # plt.savefig(loss_png)
        
        df_metrics[eval_names].plot(grid=True, legend=True, xlabel='Epoch', ylabel='accuracy')
        
        acc_png = path.join(self.log_dir, self.log_name, f'version_{self.logger.version}', f'{name_prepend}_acc_metric.png')
        plt.savefig(acc_png)
        
        plt.close()
    
    def evaluate(self, eval_dataloader,
                 give_confusion_matrix: bool=True, 
                 give_report: bool=True, 
                 give_f1_score: bool=False, 
                 give_accuracy_score: bool=False, 
                 give_precision_score: bool=False, 
                 give_recall_score: bool=False, 
                 give_hinge_loss: bool=False):
        y_true = []
        y_pred = []
        self.lightning_model.eval()
        for X, y in eval_dataloader:
            y_p = self.torch_model(X.to(self.device))
            y_p = torch.squeeze(torch.argmax(y_p, dim=1))
            if type(y_p) is tuple:
                y_p = y_p[0]
            y_pred.append((y_p>0).to(torch.int32).detach().to(y.device))
            y_true.append(y.to(torch.int32))
        y_true = torch.concat(y_true)
        y_pred = torch.concat(y_pred)
        if(give_confusion_matrix):
            print(f'confusion_matrix: \n{confusion_matrix(y_true, y_pred)}')
        if(give_report):
            print(classification_report(y_true, y_pred))
        if(give_f1_score):
            print(f'f1_score: {f1_score(y_true, y_pred)}')
        if(give_accuracy_score):
            print(f'accuracy_score: {accuracy_score(y_true, y_pred)}')
        if(give_precision_score):
            print(f'precision_score: {precision_score(y_true, y_pred)}')
        if(give_recall_score):
            print(f'recall_score: {recall_score(y_true, y_pred)}')
        if(give_hinge_loss):
            print(f'hinge_loss: {hinge_loss(y_true, y_pred)}')
                
    def save_evaluation(self, eval_dataloader, name_prepend: str='',
                 give_confusion_matrix: bool=True, 
                 give_report: bool=True, 
                 give_f1_score: bool=False, 
                 give_accuracy_score: bool=False, 
                 give_precision_score: bool=False, 
                 give_recall_score: bool=False, 
                 give_hinge_loss: bool=False):
        
        test_metrics_path = path.join(self.log_dir, self.log_name, f'version_{self.logger.version}', f'{name_prepend}_test_metrics.txt')
        
        y_true = []
        y_pred = []
        self.lightning_model.eval()
        for X, y in eval_dataloader:
            y_p = self.torch_model(X.to(self.device))
            y_p = torch.squeeze(torch.argmax(y_p, dim=1))
            if type(y_p) is tuple:
                y_p = y_p[0]
            y_pred.append((y_p>0).to(torch.int32).detach().to(y.device))
            y_true.append(y.to(torch.int32))
        y_true = torch.concat(y_true)
        y_pred = torch.concat(y_pred)
        with open(test_metrics_path, 'at+') as f:
            if(give_confusion_matrix):
                print(f'confusion_matrix: \n{confusion_matrix(y_true, y_pred)}', file=f)
            if(give_report):
                print(classification_report(y_true, y_pred), file=f)
            if(give_f1_score):
                print(f'f1_score: {f1_score(y_true, y_pred)}', file=f)
            if(give_accuracy_score):
                print(f'accuracy_score: {accuracy_score(y_true, y_pred)}', file=f)
            if(give_precision_score):
                print(f'precision_score: {precision_score(y_true, y_pred)}', file=f)
            if(give_recall_score):
                print(f'recall_score: {recall_score(y_true, y_pred)}', file=f)
            if(give_hinge_loss):
                print(f'hinge_loss: {hinge_loss(y_true, y_pred)}', file=f)

In [None]:
model_manager = ClassifierModelManager(multiclass_model.model, multiclass_model, log_name='hetero_model_11', model_save_dir=r'E:\Darsi\Payan Name Arshad\Second Work\ColorIntelligence2\ColorIntelligence\Practices\Tasks\HeterogeneousGraphs\hetero_model_7',device='cpu', num_train_epoch=56)
# model_manager.fit(train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, ckpt_path="E:\\Darsi\\Payan Name Arshad\\Second Work\\ColorIntelligence2\\ColorIntelligence\\logs\\hetero_model_11\\version_1\\checkpoints\\epoch=56-step=19608.ckpt")
model_manager.fit(train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)

In [None]:
model_manager.save_plot_csv_logger(loss_names=['train_loss', 'val_loss'], eval_names=['train_acc_epoch', 'val_acc_epoch'], name_prepend='tests_baseline')
model_manager.torch_model = model_manager.torch_model.to('cpu')
model_manager.save_evaluation(test_dataloader, 'baseline',True, True, True, True, True, True, True)