In [None]:
import torch
import pandas as pd
import math

# nlp library of Pytorch
from torchtext import data
#from torchtext.legacy import data

import warnings as wrn
wrn.filterwarnings('ignore')
SEED = 2023

torch.manual_seed(SEED)
torch.backends.cuda.deterministic = True

In [None]:
data_ = pd.read_csv('.dataset/sms_spam.csv')
data_.head()
data_.info()

In [None]:
# Field is a normal column 
# LabelField is the label column.

import spacy
nlp = spacy.load("en_core_web_lg")
def tokenizer(text):
    return [tok.text for tok in nlp.tokenizer(text)]

TEXT = data.Field(tokenize=tokenizer,batch_first=True,include_lengths=True)
LABEL = data.LabelField(dtype = torch.float,batch_first=True)

In [None]:
fields = [("type",LABEL),('text',TEXT)]

In [None]:
training_data = data.TabularDataset(path=".dataset/sms_spam.csv",
                                    format="csv",
                                    fields=fields,
                                    skip_header=True
                                   )

print(vars(training_data.examples[0]))

In [None]:
import random
# train and validation splitting
train_data,valid_data = training_data.split(split_ratio=0.75,
                                            random_state=random.seed(SEED))

In [None]:
#Building vocabularies => (Token to integer)
fields[1][1].build_vocab(train_data)
fields[0][1].build_vocab(train_data)

In [None]:
print("Size of text vocab:",len(fields[1][1].vocab))
print("Size of label vocab:",len(fields[0][1].vocab))
fields[1][1].vocab.freqs.most_common(10)

In [None]:
device = torch.device("cuda")

BATCH_SIZE = 64

# We'll create iterators to get batches of data when we want to use them
"""
This BucketIterator batches the similar length of samples and reduces the need of 
padding tokens. This makes our future model more stable

"""
train_iterator,validation_iterator = data.BucketIterator.splits(
    (train_data,valid_data),
    batch_size = BATCH_SIZE,
    # Sort key is how to sort the samples
    sort_key = lambda x:len(x.text),
    sort_within_batch = True,
    device = device
)

In [None]:
import torch.nn as nn

class LSTMNet(nn.Module):
    
    def __init__(self,vocab_size,embedding_dim,hidden_dim,output_dim,n_layers,bidirectional,dropout):
        
        super(LSTMNet,self).__init__()
        # Implement the architecture of an LSTM network
        self.vocab_size=vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim=hidden_dim
        self.output_dim=output_dim
        self.n_layers=n_layers
        self.dropout_layer=nn.Dropout(p = dropout)
        
        # 1. Embedding layer converts integer sequences to vector sequences
        self.embedding_layer = nn.Embedding(self.vocab_size, self.embedding_dim)

        # 2. LSTM layer process the vector sequences 
        
        #zt
        self.Wz1 = nn.Parameter(torch.Tensor(self.embedding_dim, self.hidden_dim))
        self.Wz2 = nn.Parameter(torch.Tensor(self.hidden_dim, self.hidden_dim))
        
        #rt
        self.Wr1 = nn.Parameter(torch.Tensor(self.embedding_dim, self.hidden_dim))
        self.Wr2 = nn.Parameter(torch.Tensor(self.hidden_dim, self.hidden_dim))
        
        #ht
        self.W1 = nn.Parameter(torch.Tensor(self.embedding_dim, self.hidden_dim))
        self.W2 = nn.Parameter(torch.Tensor(self.hidden_dim, self.hidden_dim))
        
        self.init_weights()
        
        # 3. Dense layer to predict 
        self.dense_layer = nn.Linear(self.hidden_dim , self.output_dim)
        
        # 4. Prediction activation function (you can choose your own activate function e.g., ReLU, Sigmoid, Tanh)
        self.activation_layer = nn.Sigmoid()
        
    def init_weights(self):
        stdv = 1.0 / math.sqrt(self.hidden_dim)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)
           
    def forward(self,text,text_lengths):
        embedded_output = self.dropout_layer(self.embedding_layer(text))
        size_array = embedded_output.size()
        batch_size = size_array[0]
        seq_size = size_array[1]
        
        lstm_seq = []
        ht = torch.zeros(batch_size, self.hidden_dim).to(embedded_output.device)
        
        for word_num in range(seq_size):
            xt = embedded_output[:, word_num, :]
            
            for _ in range(self.n_layers):
                zt = torch.sigmoid(xt @ self.Wz1 + ht @ self.Wz2)
                rt = torch.sigmoid(xt @ self.Wr1 + ht @ self.Wr2)
                ht_ = torch.tanh(xt @ self.W1 + (rt*ht) @ self.W2)
                ht = (1-zt)*ht + zt*ht_
            
        lstm_seq.append(ht.unsqueeze(0))
        lstm_out = lstm_seq[-1][:][:]

        lstm_out = lstm_out.transpose(0, 1).contiguous()
        
        dense_output = self.dense_layer(lstm_out)
        
        output = self.activation_layer(dense_output)
        
        return output           
        

In [None]:
SIZE_OF_VOCAB = len(TEXT.vocab)
EMBEDDING_DIM = 300
NUM_HIDDEN_NODES = 64
NUM_OUTPUT_NODES = 1
NUM_LAYERS = 4
BIDIRECTION = True
DROPOUT = 0.2

In [None]:
model = LSTMNet(SIZE_OF_VOCAB,
                EMBEDDING_DIM,
                NUM_HIDDEN_NODES,
                NUM_OUTPUT_NODES,
                NUM_LAYERS,
                BIDIRECTION,
                DROPOUT
               )

In [None]:
import torch.optim as optim
model = model.to(device)
optimizer = optim.Adam(model.parameters(),lr=1e-4)
criterion = nn.BCELoss()
criterion = criterion.to(device)

In [None]:
def binary_accuracy(preds, y):
    #round predictions to the closest integer
    rounded_preds = torch.round(preds)
    
    correct = (rounded_preds == y).float() 
    acc = correct.sum() / len(correct)
    return acc

In [None]:
def train(model,iterator,optimizer,criterion):
    
    epoch_loss = 0.0
    epoch_acc = 0.0
    
    model.train()
    
    for batch in iterator:
        
        # cleaning the cache of optimizer
        optimizer.zero_grad()
        
        text,text_lengths = batch.text
        
        # forward propagation and squeezing
        predictions = model(text,text_lengths).squeeze()
        
        # computing loss / backward propagation
        loss = criterion(predictions,batch.type)
        loss.backward()
        
        # accuracy
        acc = binary_accuracy(predictions,batch.type)
        
        # updating params
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    # It'll return the means of loss and accuracy
    return epoch_loss / len(iterator), epoch_acc / len(iterator)
        

In [None]:
def evaluate(model,iterator,criterion):
    
    epoch_loss = 0.0
    epoch_acc = 0.0
    
    # deactivate the dropouts
    model.eval()
    
    # Sets require_grad flat False
    with torch.no_grad():
        for batch in iterator:
            text,text_lengths = batch.text
            
            predictions = model(text,text_lengths).squeeze()
              
            #compute loss and accuracy
            loss = criterion(predictions, batch.type)
            acc = binary_accuracy(predictions, batch.type)
            
            #keep track of loss and accuracy
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
EPOCH_NUMBER = 15
for epoch in range(1,EPOCH_NUMBER+1):
    
    train_loss,train_acc = train(model,train_iterator,optimizer,criterion)
    
    valid_loss,valid_acc = evaluate(model,validation_iterator,criterion)
    
    print(epoch)
    
    # Showing statistics
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
    