In [1]:
!pip install torch==2.3.0
!pip install torchtext==0.18.0



In [1]:
import torch
import torch.nn as nn
import sklearn as sk
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import matplotlib.pyplot as plt
import numpy as np
import math
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import re
import torchtext.vocab as tvc
import nltk
from sklearn.model_selection import train_test_split
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords



In [2]:
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\linhn\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\linhn\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
#load Training Data
data = pd.read_csv("sample_data/labeled_data.csv")

In [5]:
# Preprocess the data

wordnet = WordNetLemmatizer()
stop_words = stopwords.words('english')
def clean_data(text, stop_words, max_length):
  text = text.lower()
  text = text.replace("'","")
  text = re.sub("[^a-zA-Z]", " ", text)
  text = text.split()
  text = [word for word in text if word not in stop_words]
  text = [wordnet.lemmatize(word) for word in text]
  text = text[:max_length]
  return text

data["tweet"] = data["tweet"].apply(clean_data, stop_words = stop_words, max_length = 1000)
dataset_y = data["class"].tolist()
dataset_x = data["tweet"].tolist()

In [6]:
#Create training sets

X_train = dataset_x
Y_train = dataset_y

In [7]:
#Build/Load the Vocabulary

min_freq = 2
unk_token = "<unk>"
pad_token = "<pad>"
special_tokens = [unk_token, pad_token]
special_tokens_output = []

Y_train = np.array(Y_train).reshape(-1,1)
if torch.load('vocabulary_hate_speech.pt') == None:
    input_train_dataset = tvc.build_vocab_from_iterator(X_train, min_freq = min_freq, specials = special_tokens)
    torch.save(input_train_dataset, 'vocabulary_hate_speech.pt')
else:
    input_train_dataset = torch.load('vocabulary_hate_speech.pt')
input_train_dataset.set_default_index(input_train_dataset[unk_token])
Y_train = Y_train.tolist()

In [8]:
#Numerize Inputs and Outputs and Pad Inputs

for i in range(len(Y_train)):
  X_train[i] = torch.tensor(input_train_dataset.lookup_indices(X_train[i]))
X_train = nn.utils.rnn.pad_sequence(X_train, padding_value = 0, batch_first = True)

In [9]:
#Merge X, Y into a dataset and divide into batches with 32 samples in each batch
class CustomDataset(Dataset):
  def __init__(self, x, y):
    self.x = x
    self.y = y
  def __len__(self):
    return len(self.x)
  def __getitem__(self, idx):
    return self.x[idx], self.y[idx]
data = CustomDataset(X_train, Y_train)
dataloader = DataLoader(data, batch_size = 32, shuffle = True, num_workers = 2)

In [10]:
#Build the Encoder Model

class Encode(nn.Module):
  def __init__(self, input_size, hidden_size, output_size, num_layers,embedding_size, dropout):
    super(Encode, self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.embedding = nn.Embedding(input_size, embedding_size)
    self.dropout = nn.Dropout(dropout)
    self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, batch_first = True, dropout = dropout)
    self.ReLU = nn.ReLU()
    self.fc = nn.Linear(hidden_size, output_size)
    self.fc1 = nn.Linear(31, 3)

  def forward(self, x):
    embedding = self.dropout(self.embedding(x))
    output, (hidden, cell) = self.lstm(embedding)
    output = self.fc(output)
    print(output.shape)
    out = self.fc1(output.reshape(-1,31))
    return out

In [11]:
#Initialize model, loss, and optimizer function

input_size = len(input_train_dataset)
hidden_size = 100
output_size = 1
num_layers = 2
embedding_size = 128
dropout = 0.5

model = Encode(input_size, hidden_size, output_size, num_layers, embedding_size, dropout)
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

In [12]:
#Load Model Checkpoint

check_point = torch.load("check-point-hate-speech.pt")
model.load_state_dict(check_point['model_state_dict'])
optimizer.load_state_dict(check_point['optimizer_state_dict'])

In [None]:
#Train model

num_epochs = 20
valid_loss = 0
count = 0
for epoch in range(num_epochs):
  for input, output in dataloader:
    output = output[0]
    y_pred = model(input)
    l = loss(y_pred, output)
    valid_loss += l.item()
    count += 1
    l.backward()
    optimizer.step()
    optimizer.zero_grad()
    print(f"Epoch: {epoch}, Loss: {valid_loss/float(count)}")
    if count % 100 == 0:
      torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss,
                }, "check-point.pt")


In [13]:
#Load, Clean, and Divide test file into X_test and Y_test

data_test = pd.read_csv("sample_data/testhatespeech.csv")
data_test["tweet"] = data_test["tweet"].apply(clean_data, stop_words = stop_words, max_length = 1000)
def minimize(text, max_length):
  text = text[:max_length]
  return text
data_test["tweet"] = data_test["tweet"].apply(minimize, max_length = 31)
X_test = data_test["tweet"].tolist()
Y_test = data_test["Toxicity"].tolist()

In [14]:
#Numberize and Pad

Y_test = np.array(Y_test).reshape(-1,1)
Y_test = Y_test.tolist()
for j in range(len(X_test)):
  X_test[j] = torch.tensor(input_train_dataset.lookup_indices(X_test[j]))
X_test = nn.utils.rnn.pad_sequence(X_test, padding_value = 0, batch_first = True)
Y_test = torch.tensor(Y_test, dtype = torch.float32)
Y_test = Y_test.reshape(-1).int()

In [15]:
new_data = []
for m in range(X_test.shape[0]):
    new_tensor = torch.cat((X_test[m], torch.zeros(31 - X_test[m].shape[0]))).int()
    new_data.append(new_tensor)
X_test = torch.stack(new_data)


In [16]:
#Conduct Model Testing

with torch.no_grad():
  y_pred = model(X_test)
  y_pred_cls = torch.max(y_pred,1)
  for i in range (y_pred_cls.indices.shape[0]):
    if y_pred_cls.indices[i] == 2:
      y_pred_cls.indices[i] = 0
  acc = y_pred_cls.indices.eq(Y_test).sum() / float(3555)
  print(f"The accuracy of the model through testing: {acc.item()*100}%")

torch.Size([3554, 31, 1])
The accuracy of the model through testing: 77.13080048561096%


In [None]:
#Model Testing by Inputs from Keyboard

nlp_input = "I love this movie guys"
nlp_input = clean_data(nlp_input, stop_words, 1000)
input = input_train_dataset.lookup_indices(nlp_input)
for m in range(31 - len(nlp_input)):
  input.append(0)
with torch.no_grad():
  input = torch.tensor(input)
  input = input.reshape(-1,31)
  print(torch.argmax(model(input)))


tensor(2)
