In [None]:
from torch import nn
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import joblib
from torch.autograd import Variable
import gensim, logging
import gensim.downloader as api
from gensim.models import Word2Vec
from gensim.models import KeyedVectors
import pandas as pd
from pathlib import Path
from sklearn.metrics import classification_report

trainloader =  joblib.load('/kaggle/input/final-dl/train_data_loader.pkl',weights_only=True)
valloader = joblib.load('/kaggle/input/validation/val_data_loader.pkl',weights_only=True)
testloader = joblib.load('/kaggle/input/final-dl/test_data_loader.pkl',weights_only=True)

BATCH_SIZE = 128
train_dataloader = torch.utils.data.DataLoader(trainloader.dataset,batch_size= BATCH_SIZE, shuffle= True)
test_dataloader = torch.utils.data.DataLoader(testloader.dataset,batch_size= BATCH_SIZE)
val_dataloader = torch.utils.data.DataLoader(valloader.dataset,batch_size= BATCH_SIZE)


In [None]:
from torch import nn
from tqdm import tqdm
import torch.nn.functional as F
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix

def F1_tensor(y_true, y_pred):
    y_true = y_true.to('cpu').numpy()
    y_pred = y_pred.to('cpu').numpy()
    return f1_score(y_true, y_pred)

def Confusion_matrix_tensor(y_true, y_pred):
    y_true = y_true.to('cpu').numpy()
    y_pred = y_pred.to('cpu').numpy()
    return f1_score(y_true, y_pred)

def convert_from_tensor(y): #convert from tensor to some kind of array that we can use numpy
    return y.cpu().detach().numpy().reshape(-1)

def take_all_elem(container, target):
    for x in target:
        if (x != 0 and x != 1):
            container.append(1)
        else:
            container.append(x)

def save_model(model):
    MODEL_PATH = Path('/kaggle/working/')
    MODEL_PATH.mkdir(parents = True, exist_ok = True)
    MODEL_NAME = 'best_RNNmodel.pth'
    MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
    print(f'Update new best model to : {MODEL_SAVE_PATH}')
    torch.save(obj = model.state_dict(),f = MODEL_SAVE_PATH)

def train_step(model : nn.Module,
               data_loader : torch.utils.data.DataLoader,
               loss_function : nn.Module,
               optimizer,
               device = 'cuda'):
    model.train()
    loss = 0

    all_y_true = []
    all_y_pred = []

    for (X_train,y_train) in data_loader:
        X_train = X_train.to(device)
        y_train = y_train.unsqueeze(1).to(device)

        y_pred = model(X_train)
        y_pred01 = torch.round(torch.sigmoid(y_pred))

        batch_loss = loss_function(y_pred.float(),y_train.float())
        loss += batch_loss

        take_all_elem(all_y_true,convert_from_tensor(y_train))
        take_all_elem(all_y_pred,convert_from_tensor(y_pred01))


        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()

    loss /= len(data_loader)

    all_y_true = np.array(all_y_true)
    all_y_pred = np.array(all_y_pred)

    #print(all_y_true)
    #print(np.unique(all_y_true))

    print('------------------Train Result----------------------------')
    print(f'Training loss : {loss} | F1_score : {f1_score(all_y_true,all_y_pred)}')
    print(f'Confusion matrix :')
    print(confusion_matrix(all_y_true,all_y_pred))
    print(f'Classification report :')
    print(classification_report(all_y_true, all_y_pred, digits=4))
    
def test_step(model : nn.Module,
              data_loader : torch.utils.data.DataLoader,
              loss_function : nn.Module,
              optimizer,
              device = 'cuda'):

    model.eval()
    loss,acc = 0,0

    all_y_true = []
    all_y_pred = []

    with torch.inference_mode():
        loss = 0

        for (X_test,y_test) in data_loader:
            X_test = X_test.to(device)
            y_test = y_test.to(device)

            test_logits = model(X_test).squeeze()
            test_01 = torch.round(torch.sigmoid(test_logits))

            batch_loss = loss_function(test_logits.float(),y_test.float())

            loss += batch_loss

            take_all_elem(all_y_true,convert_from_tensor(y_test))
            take_all_elem(all_y_pred,convert_from_tensor(test_01))


        loss /= len(data_loader)
        acc /= len(data_loader)
    current_f1_score = f1_score(all_y_true,all_y_pred)
    print('------------------Test Result----------------------------')
    print(f'Testing loss : {loss} | F1_score : {f1_score(all_y_true,all_y_pred)}')
    global best_f1_score
    if (current_f1_score > best_f1_score):
        best_f1_score = current_f1_score
        save_model(model)
    print(f'Confusion matrix :')
    print(confusion_matrix(all_y_true,all_y_pred))
    print(f'Classification report :')
    print(classification_report(all_y_true, all_y_pred, digits=4))
    print('---------------------------------------------------------')

matrix_size = (64001,768)
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attention = nn.Linear(hidden_dim, 1)

    def forward(self, lstm_output):
        """
        lstm_output: Tensor of shape (batch_size, seq_len, hidden_dim)
        """
        # Compute attention scores
        attn_scores = self.attention(lstm_output).squeeze(-1)  # (batch_size, seq_len)
        attn_weights = F.softmax(attn_scores, dim=1)  # Normalize scores to probabilities
        
        # Compute context vector as weighted sum of LSTM outputs
        context = torch.bmm(attn_weights.unsqueeze(1), lstm_output).squeeze(1)  # (batch_size, hidden_dim)
        
        return context, attn_weights

class attRNNmodel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(64001,matrix_size[1])
        self.rnn = nn.RNN(input_size = matrix_size[1],hidden_size = 32,
            num_layers = 3, batch_first = True, bidirectional = True)
        self.attention = Attention(32 * 2)
        self.fc = nn.LazyLinear(out_features = 1)

    #it output [0,1,2,....,seq_length - 1]
    #just take the last array element in case of classification or anything like that
    def forward(self, X, state=None):
        embedding = self.embedding(X)
        rnn_outputs, _ = self.rnn(embedding)
        context, attn_weights = self.attention(rnn_outputs)
        return self.fc(context)
        
class RNNmodel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(64001,matrix_size[1])
        self.rnn = nn.RNN(input_size = matrix_size[1],hidden_size = 32,
            num_layers = 3, batch_first = True, bidirectional = True)
        self.fc = nn.LazyLinear(out_features = 1)

    #it output [0,1,2,....,seq_length - 1]
    #just take the last array element in case of classification or anything like that
    def forward(self, X, state=None):
        embedding = self.embedding(X)
        rnn_outputs, _ = self.rnn(embedding)
        #context, attn_weights = self.attention(rnn_outputs)
        return self.fc(rnn_outputs)[:, -1, :]

device = 'cuda'


In [None]:
model = attRNNmodel().to(device)
BCE_loss = nn.BCEWithLogitsLoss()
Adam_optimizer = torch.optim.Adam(params = model.parameters(),lr = 0.001)


epochs = 10
best_f1_score = -1

for epoch in range(0,epochs):
    print(f'Epoch {epoch}=======================================')
    train_step(model,train_dataloader,BCE_loss,Adam_optimizer,device = device)
    test_step(model,val_dataloader,BCE_loss,Adam_optimizer,device = device)
    #break

In [None]:
model = RNNmodel().to(device)
BCE_loss = nn.BCEWithLogitsLoss()
Adam_optimizer = torch.optim.Adam(params = model.parameters(),lr = 0.001)


epochs = 10
best_f1_score = -1

for epoch in range(0,epochs):
    print(f'Epoch {epoch}=======================================')
    train_step(model,train_dataloader,BCE_loss,Adam_optimizer,device = device)
    test_step(model,val_dataloader,BCE_loss,Adam_optimizer,device = device)
    #break

In [None]:
model = RNNmodel().to(device)
model.load_state_dict(torch.load('best_RNNmodel.pth'))
test_step(model,test_dataloader,BCE_loss,Adam_optimizer,device = device)