<a href="https://colab.research.google.com/github/izu-theintrepid/Calculator/blob/main/Copy_of_Sentiment_Analysis_Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RNNs for Text Classification

We will use a RNN based model to perform classification of SMS messages into Spam or not Spam. 

In [None]:
# Imports
from IPython.display import clear_output
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import spacy
import re
import string
from collections import Counter
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import tqdm

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Downloading the Spam SMS Dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip

!unzip /content/smsspamcollection.zip
!rm /content/readme
!rm !rm /content/smsspamcollection.zip

clear_output()

In [None]:
# Downloading the GloVe embeddings database

!wget https://nlp.stanford.edu/data/glove.6B.zip

!unzip /content/glove.6B.zip

!rm -rf /content/glove.6B.zip
!rm /content/glove.6B.100d.txt
!rm /content/glove.6B.200d.txt
!rm /content/glove.6B.300d.txt

clear_output()

In [None]:
text = []
label = []

with open("/content/SMSSpamCollection") as f:

    """ read each line of the text file and create a Pandas Data Frame
        label spam messages as 1 and legit messages as 0
    """
    for line in f:

        label1, message = line.strip().split('\t', 1)
        if label1 == 'spam':
            label.append(1)
        else:
            label.append(0)

        text.append(message)

    ###########YOUR CODE HERE###########

In [None]:
sms = pd.DataFrame(zip(text, label), columns = ["Text", "Label"])#dataframe
text_lengths = []


for message in sms["Text"]:
    length = 0
    for char in message:
        length += 1
    text_lengths.append(length)#calculating length for every row of data

sms['Text_Length'] = text_lengths
#sms['Text_Length'] =sms["Text"].apply(len) ###########YOUR CODE HERE###########


In [None]:
spacy_tokenizer = spacy.load('en_core_web_sm')
def tokenize (text):

    """remove any non-ascii characters
       remove punctuations
       tokenize the text
       return the tokenized text
    """
    text = re.sub(r'[^\w\s]', '', text)#to remove words that contain characters other than alphabets,numbers and _
    text = text.strip()
    doc = spacy_tokenizer(text)
    tokens=[]

    for token in doc:
      flag = True  #assumption


      for char in token.text:
        if ord(char) >= 128:  # If non ascii
            flag = False
            break

    # if the token is valid (ascii) and not space, add it to the tokens list

      if flag and token.text.strip():
        tokens.append(token.text.strip().lower())

    return tokens
    ###########YOUR CODE HERE###########

In [None]:
tokenized_text = []
for text in sms["Text"]:
    tokenized_text.append(tokenize(text))    #tokenized version of each text to the list


sms["Tokenized_Text"] = tokenized_text


 ###########YOUR CODE HERE###########

In [None]:
print(sms["Tokenized_Text"].head())

0    [go, until, jurong, point, crazy, available, o...
1                       [ok, lar, joking, wif, u, oni]
2    [free, entry, in, 2, a, wkly, comp, to, win, f...
3    [u, dun, say, so, early, hor, u, c, already, t...
4    [nah, i, do, nt, think, he, goes, to, usf, he,...
Name: Tokenized_Text, dtype: object


In [None]:
def load_GloVe_embeddings(glove_file):
    embeddings_dict=dict()
    cnt=0
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
          values = line.split()
          word = values[0]  #the word
          vector = np.array(values[1:], dtype='float32')  # vector components


          embeddings_dict[word] = vector# add the word and its vector to the dictionary
          if not word.isalpha():
            cnt+=1
    """
        load the GloVe embeddings from the files downloaded
        create a dictionary of the form {word : word embedding}
    """
    print(cnt)
    return embeddings_dict


glove_file = "/content/glove.6B.50d.txt"
    ###########YOUR CODE HERE###########
dict1=load_GloVe_embeddings(glove_file)



72909


In [None]:
def embed_text(tokenized_text, word_embeddings):
    """
        given a sequence of tokens convert them to their word embeddings
    """

    embedded_text = []


    for token in tokenized_text:

        embedding_vector = word_embeddings.get(token)
        if embedding_vector is not None:#see if the word is in the glove dict
            embedded_text.append(embedding_vector)


    return np.array(embedded_text,dtype=float)
    ###########YOUR CODE HERE###########

In [None]:
all_word_vector_sequences=[]
for token in sms["Tokenized_Text"]:
  vec=embed_text(token,dict1)#to see if the word is in dict
  if vec.shape[0] == 0:#if the word doesnt exist
       vec= np.zeros(shape=(1, 50))

  all_word_vector_sequences.append(vec)

sms["Embedded_Text"] = all_word_vector_sequences

#seeing the max length of a text
print(max(sms["Embedded_Text"].apply(len)))###########YOUR CODE HERE###########

173


In [None]:
from copy import deepcopy
def padding(X,max_length=174):
  tempX=deepcopy(X)
  for i,a in enumerate(X):
    xlen=a.shape[0]#padding to have uniformity
    padlen=max_length-xlen
    pad=np.zeros(shape=(padlen,50))
    tempX[i]=np.concatenate([a,pad])
  return np.array(tempX).astype(float)

sms["Padded_Embedded_Text"] = sms["Embedded_Text"].apply(lambda x: padding([x])[0])
print(sms["Padded_Embedded_Text"])

sms['Label']=sms['Label'].to_numpy().astype(int)


0       [[0.14827999472618103, 0.17760999500751495, 0....
1       [[-0.5364599823951721, -0.07243199646472931, 0...
2       [[-0.41183000802993774, 0.4528000056743622, 0....
3       [[-0.25676000118255615, 0.8549000024795532, 1....
4       [[0.5095900297164917, 1.2706999778747559, -0.0...
                              ...                        
5569    [[0.5307400226593018, 0.4011699855327606, -0.4...
5570    [[0.8154399991035461, 0.30171000957489014, 0.5...
5571    [[-0.05248900130391121, 0.3052400052547455, -0...
5572    [[0.4180000126361847, 0.24967999756336212, -0....
5573    [[0.7671899795532227, 0.12389999628067017, -0....
Name: Padded_Embedded_Text, Length: 5574, dtype: object


In [None]:
from torch.utils.data import Dataset, DataLoader

class load_dataset(Dataset):
    def __init__(self, padded_texts, labels):
        self.padded_texts = padded_texts
        self.labels = labels

    def __len__(self):
        return len(self.padded_texts)

    def __getitem__(self, idx):
        return {
            "msg": torch.tensor(self.padded_texts[idx], dtype=torch.float32),
            "target": torch.tensor(self.labels[idx], dtype=torch.float32)
        }


In [None]:
import torch
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(RNN, self).__init__()#calling the parent class

        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        #input_size is the size of the input feature or the word embeddings
        #hidden_layer is the number of features in the hidden layer
        #stacked RNN layers
        #batch_first means the input and output tensors are in the shape (batch_size, sequence_length, input_size)
        #bidirectional
        # Linear layer should map from 512 (concatenated size) to the desired output size (1)
        self.fc = nn.Linear(hidden_size * 4, output_size)  # hidden_size * 4 because of bidirectional and pooling

    def forward(self, x):
        h_0 = torch.zeros(2, x.size(0), 128)  # 2 for bidirectional, 128 for hidden size

        x, _ = self.rnn(x, h_0)        #RNN forward pass

        avg_pool = torch.mean(x, 1)
        max_pool, _ = torch.max(x, 1)


        out = torch.cat((avg_pool, max_pool), 1)#Concatenate pooled outputs

        # fully connected layer
        out = self.fc(out)

        return out


In [None]:
import torch.optim as optim

def train_model(data_loader, model, optimizer, device, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for data in data_loader:
            messages = data["msg"]
            targets = data["target"]


            messages = messages.to(device, dtype=torch.float32)# Move the data to the device (CPU/GPU)
            targets = targets.to(device, dtype=torch.float32)


            optimizer.zero_grad()#clear the gradients


            predictions = model(messages)# make predictions from the model


            loss = nn.BCEWithLogitsLoss()(predictions, targets.view(-1, 1))#calculate loss


            loss.backward()#backpropagate the loss

            optimizer.step()   #update the model parameters



            epoch_loss += loss.item()# Accumulate the loss for this epoch

        #alculate average loss
        avg_epoch_loss = epoch_loss / len(data_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}")

    return model

In [None]:
def evaluate_model(data_loader, model, device):
    model.eval()
    correct_predictions = 0
    total_samples = 0
    total_loss = 0

    with torch.no_grad():
        for batch in data_loader:
            messages=batch["msg"]
            targets = batch["target"]  # Unpack the batch

            # Debugging
            #print(type(reviews), type(targets))  #tensors


            messages = messages.to(device) # move the data to the device (CPU)
            targets = targets.to(device)


            predictions = model(messages)# Make predictions
            loss = nn.BCEWithLogitsLoss()(predictions, targets.view(-1, 1))
            total_loss += loss.item()

            #calculate accuracy
            predicted_labels = torch.round(torch.sigmoid(predictions))
            correct_predictions += (predicted_labels.cpu() == targets.cpu().view(-1, 1)).sum().item()
            total_samples += targets.size(0)

    accuracy = correct_predictions / total_samples
    average_loss = total_loss / len(data_loader)
    return average_loss, accuracy

In [None]:

X = np.stack(sms["Padded_Embedded_Text"].values)
Y = sms["Label"].values
X_train, X_temp, Y_train, Y_temp = train_test_split(X, Y, test_size=0.3, random_state=42)
X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42)


# Create the dataset and DataLoader
#def Load_dataset(X, Y):
    #return load_dataset(torch.tensor(X, dtype=torch.float32), torch.tensor(Y, dtype=torch.float32))

train_dataset = load_dataset(X_train, Y_train)
val_dataset = load_dataset(X_val, Y_val)
test_dataset = load_dataset(X_test, Y_test)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)




In [None]:
import torch.optim as optim
input_size = 50  # Since embeddings are of size 50(glove 50)
hidden_size = 128
num_layers = 1
output_size = 1
num_epochs = 10

# Initialize model, optimizer
model = RNN(input_size, hidden_size, num_layers, output_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
trained_model = train_model(train_loader, model, optimizer, device, num_epochs)


Epoch [1/10], Loss: 0.2579
Epoch [2/10], Loss: 0.1386
Epoch [3/10], Loss: 0.1131
Epoch [4/10], Loss: 0.0934
Epoch [5/10], Loss: 0.0756
Epoch [6/10], Loss: 0.0690
Epoch [7/10], Loss: 0.0645
Epoch [8/10], Loss: 0.0511
Epoch [9/10], Loss: 0.0418
Epoch [10/10], Loss: 0.0335


In [None]:
val_loss, val_accuracy = evaluate_model(val_loader, model, device)
print(f"Validation loss: {val_loss}, Validation accuracy: {val_accuracy}")

#evaluate
test_loss, test_accuracy = evaluate_model(test_loader, model, device)
print(f"Test loss: {test_loss}, Test accuracy: {test_accuracy}")

Validation loss: 0.06398799667065894, Validation accuracy: 0.9760765550239234
Test loss: 0.07510418437973217, Test accuracy: 0.982078853046595
