# RNNs for Text Classification

We will use a RNN based model to perform classification of SMS messages into Spam or not Spam. The notebook has split the entire process into several parts for your convienience. To appreciate preprocessing in music, it is important to understand preprocessing in other domains. You may need to read up upon tokenization, embeddings (Glove) and Dataloaders, as well as how to pipeline an end - to - end AI model

**Resources:** \
https://www.geeksforgeeks.org/pre-trained-word-embedding-using-glove-in-nlp-models/

We do not expect you to finish the code entirely, take help whenever required, but understand the code you have written, do not blindly copy code. In case of help required at any time, feel free to contact the project leads.

| Name | Phone Number |
| :-- | :-- |
| Pranay Mathur | 7032832559|
| Swathi Narashiman | 6379869509 |

In [None]:
# Imports
from IPython.display import clear_output
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import spacy
import re
import string
from collections import Counter
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import tqdm

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Downloading the Spam SMS Dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip

!unzip /content/smsspamcollection.zip
!rm /content/readme
!rm !rm /content/smsspamcollection.zip

clear_output()

In [None]:
# Downloading the GloVe embeddings database

!wget https://nlp.stanford.edu/data/glove.6B.zip

!unzip /content/glove.6B.zip

!rm -rf /content/glove.6B.zip
!rm /content/glove.6B.100d.txt
!rm /content/glove.6B.200d.txt
!rm /content/glove.6B.300d.txt

clear_output()

In [None]:
# Getting Texts and Labels
text = []
label = []

with open("/content/SMSSpamCollection") as f:

    """ read each line of the text file and create a Pandas Data Frame
        label spam messages as 1 and legit messages as 0
    """
    file_content = f.read()
    msgs = file_content.split("\n")
    for msg in msgs[:-1]: # Skipped the last line since its empty
      split_list = msg.split("\t")
      text.append(split_list[1])
      if split_list[0] == "ham": label.append(0)
      else: label.append(1)

len(text), len(label)

(5574, 5574)

In [None]:
# Creating a Pandas DataFrame
sms = pd.DataFrame(zip(text, label), columns = ["Text", "Label"])

def count_words(text):
    return len(text.split())

sms['Text_Length'] = sms['Text'].apply(lambda x: count_words(x))

In [None]:
sms.head()

Unnamed: 0,Text,Label,Text_Length
0,"Go until jurong point, crazy.. Available only ...",0,20
1,Ok lar... Joking wif u oni...,0,6
2,Free entry in 2 a wkly comp to win FA Cup fina...,1,28
3,U dun say so early hor... U c already then say...,0,11
4,"Nah I don't think he goes to usf, he lives aro...",0,13


In [None]:
spacy_tokenizer = spacy.load('en_core_web_sm')

def remove_non_ascii(text):
    cleaned_text = re.sub(r'[^\x00-\x7F]+', '', text)
    return cleaned_text

def remove_punctuation(text):
    punctuation = string.punctuation
    cleaned_text = ''.join(char for char in text if char not in punctuation)
    return cleaned_text

def tokenize(text):
    """remove any non-ascii characters
       remove punctuations
       tokenize the text
       return the tokenized text
    """
    tokenized_text = []

    for te in text:
      te = remove_non_ascii(te)
      te = remove_punctuation(te)
      tokenized_te = spacy_tokenizer(te)
      tokenized_text.append(tokenized_te)

    return tokenized_text

In [None]:
# Tokenize the text sms in the Pandas Dataframe
tokenized_text = tokenize(text)
sms["Tokenized_Text"] = tokenized_text

In [None]:
sms.head()

Unnamed: 0,Text,Label,Text_Length,Tokenized_Text
0,"Go until jurong point, crazy.. Available only ...",0,20,"(Go, until, jurong, point, crazy, Available, o..."
1,Ok lar... Joking wif u oni...,0,6,"(Ok, lar, Joking, wif, u, oni)"
2,Free entry in 2 a wkly comp to win FA Cup fina...,1,28,"(Free, entry, in, 2, a, wkly, comp, to, win, F..."
3,U dun say so early hor... U c already then say...,0,11,"(U, dun, say, so, early, hor, U, c, already, t..."
4,"Nah I don't think he goes to usf, he lives aro...",0,13,"(Nah, I, do, nt, think, he, goes, to, usf, he,..."


In [None]:
def load_GloVe_embeddings(glove_file):

    """
        load the GloVe embeddings from the files downloaded
        create a dictionary of the form {word : word embedding}
    """

    embeddings_dict = {}

    with open(glove_file, 'r', encoding='utf-8') as file:
        for line in file:
            values = line.split()
            word = values[0]
            embedding = [float(value) for value in values[1:]]
            embeddings_dict[word] = embedding

    return embeddings_dict

In [None]:
# Loading GloVe embeddings
embeddings_dict = load_GloVe_embeddings("glove.6B.50d.txt")

In [None]:
def embed_text(tokenized_text, word_embeddings, max_text_length=20, embedding_size=50):
    """
        given a sequence of tokens convert them to their word embeddings
    """
    embedding_text = []
    token_list = str(tokenized_text).split(" ")
    for i in token_list[:max_text_length]:
      try:
        embedding_text.append(word_embeddings[str(i).lower()])
      except Exception as e:
        embedding_text.append([0.]*embedding_size)

    embedding_text.extend([[0.]*embedding_size] * (max_text_length - len(token_list)))
    return embedding_text

In [None]:
# Creating Embeddings
embedding_text = []
for i in tokenized_text:
  embedding_text.append(embed_text(i, embeddings_dict))
sms["Embedded_Text"] = embedding_text

In [None]:
"""Complete the below code for the Dataloader class"""
class load_dataset(Dataset):
    def __init__(self, X, Y):
        """
            X: the embeddings of the sentence
            Y: ground truth of the sentence (0- positive, 1- negative)
        """
        self.X = X
        self.y = Y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.X[idx])
        label = self.y[idx]
        return embedding, label

In [None]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, num_layers):
        super(RNN, self).__init__()
        self.rnn = nn.LSTM(50, 64, num_layers)
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        output, _ = self.rnn(x)
        output = self.fc1(output[:, -1, :])
        output = self.relu(output)
        output = self.fc2(output)
        output = self.sigmoid(output)
        return output

In [None]:
def train_model(num_epochs, train_loader, model, criterion, optimizer):
   """
   Write a trainer loop for the model.
   It must follow the below pattern:
   1. Pass the input to the model and perform forward propagation
   2. Obtain losses
   3. Backpropagate to find the gradients
   Make sure to check the accuracy of the model at regular intervals
   """

   # Navigating the model to necessary device
   device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
   model = model.to(device)

   for epoch in range(num_epochs):

       # Training Mode
       model.train()
       running_loss = 0.0
       correct = 0
       total = 0

       for embedding, labels in train_loader:

           # Forward Propagation
           outputs = model(embedding)
           outputs = outputs.squeeze()

           # Loss Calculation
           loss = criterion(outputs, labels.float())

           # Back Propagation
           optimizer.zero_grad()
           loss.backward()
           optimizer.step()

           running_loss += loss.item()

           # Getting predictions
           predicted = (outputs > 0.5).long()
           total += labels.size(0)
           correct += (predicted == labels).sum().item()

       # Loss for each epoch
       epoch_loss = running_loss / len(train_loader)
       epoch_acc = 100 * correct / total

       print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%')

   return model

In [None]:
"""
1. Write code to split your available data into training and testing splits
2. Define the model
3. Set up hyper-parameters such as learning rate, number of epochs, batch size
4. Train the model by using the function you defined above
5. Check the model accuracy by running the model on the testing split
6. Save the model as a .pth file
"""

# Creating a dataset
dataset = load_dataset(sms["Embedded_Text"], sms["Label"])

# Train Test Split
train_ratio = 0.8
num_samples = len(dataset)
num_train = int(train_ratio * num_samples)
num_test = num_samples - num_train

# Getting the datasets
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [num_train, num_test])

# Loaders initialisation
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Parameters initialisation
learning_rate = 0.01
num_epochs = 10
num_layers = 1

# Model
model = RNN(num_layers)

# Loss
criterion = nn.BCELoss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training
train_model(num_epochs, train_loader, model, criterion, optimizer)

Epoch [1/10], Loss: 0.3969, Accuracy: 84.77%
Epoch [2/10], Loss: 0.3300, Accuracy: 87.96%
Epoch [3/10], Loss: 0.3157, Accuracy: 88.63%
Epoch [4/10], Loss: 0.3048, Accuracy: 89.30%
Epoch [5/10], Loss: 0.2989, Accuracy: 89.55%
Epoch [6/10], Loss: 0.2951, Accuracy: 89.80%
Epoch [7/10], Loss: 0.2861, Accuracy: 90.15%
Epoch [8/10], Loss: 0.2773, Accuracy: 90.94%
Epoch [9/10], Loss: 0.2729, Accuracy: 90.94%
Epoch [10/10], Loss: 0.2663, Accuracy: 91.03%


RNN(
  (rnn): LSTM(50, 64)
  (fc1): Linear(in_features=64, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=1, bias=True)
  (relu): ReLU()
  (sigmoid): Sigmoid()
)

In [None]:
# Testing Loop

# Setting the model in evaluation mode
model.eval()

with torch.no_grad():
    correct = 0
    total = 0

    # Iterating through the test loader
    for batch in test_loader:
        embeddings = batch[0]
        labels = batch[1]

        # Calculating the outputs
        outputs = model(embeddings)
        outputs = outputs.squeeze()

        # Getting the predictions
        predicted = (outputs > 0.5).long()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    # Accuracy calculations
    test_accuracy = 100 * correct / total
    print(f'Test Accuracy: {test_accuracy}%')

Test Accuracy: 90.04484304932735%


In [None]:
# Saving the model
torch.save(model, 'sentiment_analysis.pth')