In [None]:
import torch
import pandas as pd
import numpy as np
import torch.nn as nn
import re
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from transformers import AutoModel, AutoTokenizer
import os

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

df = pd.read_csv("../CSVFiles/smallDomainDataNotEmbedded.csv")


bertweet = AutoModel.from_pretrained("vinai/bertweet-base")
tokenizer = AutoTokenizer.from_pretrained("vinai/bertweet-base", use_fast=False, normalization=True)                     
bertweet.eval()
bertweet.to('cuda')

with torch.no_grad():
    for i, idx in enumerate(df.index):
        embeds = bertweet(torch.tensor(tokenizer.encode(df['comment_text'][idx], add_special_tokens=True,
                                                                   truncation=True)).to('cuda').unsqueeze(0))[0].squeeze(0)
        if embeds.shape[0] < 100:
            embeds = torch.cat((embeds, torch.zeros((100-embeds.shape[0], 768)).to('cuda')))
            
        elif embeds.shape[0] > 100:
            embeds = embeds[0:100]
        
        df.at[idx, 'comment_text'] = embeds.detach().cpu().numpy()

In [None]:
# Updating values for training_data
training_data = df[df['split'] == 'train']

# Getting test_data
test_data = df[df['split'] == 'test']

# Getting validation_data
validation_data = df[df['split'] == 'val']

In [None]:
# Creating data loaders
X_train = np.array(training_data['comment_text'].values.tolist())
Y_train = np.array(training_data['toxicity'].values.tolist())

X_test = np.array(test_data['comment_text'].values.tolist())
Y_test = np.array(test_data['toxicity'].values.tolist())

X_val = np.array(validation_data['comment_text'].values.tolist())
Y_val = np.array(validation_data['toxicity'].values.tolist())

In [None]:
print(len(X_train))
print(len(X_test))
print(len(X_train))
print(len(X_val))

In [None]:
def CleanText(text):
    text = text.lower() #Turn all text entries into lower-case
    text = re.sub(r'''(https?:\/\/www\.|https?:\/\/)?[a-z0-9]+([\-\.]{1}[a-z0-9]+)*\.[a-z]{2,3}[-a-zA-Z0-9()@:%_\+.~#?&\//=<>]*''', "<URL>", text)
    #Replace URL with tag
    text = re.sub(r'''[0-9]+[/\-.]+[0-9]+[/\-.]+[0-9]+''', "<DATE>", text) #Replace dates with tag
    text = re.sub(r'''[a-z0-9._%+-]+\@[a-z0-9.-]+[a-z0-9]\.[a-z]{1,}''', "<EMAIL>", text)
    text = re.sub(r'''[0-9]+''', "<NUM>", text) #Replace numbers with tag
    text = re.sub(r'''[.|,|!|?|\'|\''|\"|\n|\t|\-|\(|\)]''', '', text)
    text = re.sub(r'''^\s+|\s+$''', '', text) #Remove whitespaces at the end and start of string
    text = re.sub(r'''[ ][ ]+|_''', " ", text) #Remove multiple whitespace
    return text

In [None]:
from transformers import BertTokenizer, BertModel

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
model.eval()
model.to('cuda')

In [None]:
prep_training = []
prep_test = []
prep_val = []

for i, text in enumerate(X_train):
    prep_training.append([torch.tensor(tokenizer.encode(text, add_special_tokens=True, max_length=100, truncation=True, padding="max_length")), Y_train[i]])

for i, text in enumerate(X_test):
    prep_test.append([torch.tensor(tokenizer.encode(text, add_special_tokens=True, max_length=100, truncation=True, padding="max_length")), Y_test[i]])

for i, text in enumerate(X_val):
    prep_val.append([torch.tensor(tokenizer.encode(text, add_special_tokens=True, max_length=100, truncation=True, padding="max_length")), Y_val[i]])

In [None]:
train_loader = DataLoader(prep_training, batch_size=16, shuffle=True)
valid_loader = DataLoader(prep_val, batch_size=16, shuffle=False)
test_loader = DataLoader(prep_test, batch_size=16, shuffle=False)

In [None]:
def CheckAccuracy(predictions, labels):
        acc = 0.0
        for i in range(len(predictions)):
            if (predictions[i] == labels[i]):
                acc += 1
        return acc/len(predictions)

In [None]:
epochs = 50
lr = 0.00005

cuda = True # Set this if training on GPU
cuda = cuda and torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")

In [None]:
prep_training = []
prep_test = []
prep_val = []

for i, text in enumerate(X_train):
    prep_training.append([X_train[i], Y_train[i]])

for i, text in enumerate(X_test):
    prep_test.append([X_test[i], Y_test[i]])

for i, text in enumerate(X_val):
    prep_val.append([X_val[i], Y_val[i]])
    
train_loader = DataLoader(prep_training, batch_size=16, shuffle=True)
valid_loader = DataLoader(prep_val, batch_size=16, shuffle=False)
test_loader = DataLoader(prep_test, batch_size=16, shuffle=False)

In [None]:
class LSTM_Net(nn.Module):
    
    def __init__(self):
        super(LSTM_Net, self).__init__()
        self.lstm = nn.LSTM(input_size=768, hidden_size=768, num_layers=1, batch_first=True, bidirectional=True) #bidirectional=True
        self.fc1 = nn.Sequential(nn.Linear(768, 256), nn.ReLU(), nn.Dropout(p=0.8))
        self.fc2 = nn.Linear(768, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        #x = model(x)[0]
        lstm_out, (ht, ct) = self.lstm(x)
        #print(lstm_out.shape)
        #print(ht.shape)
        #print(ht[-1, :, :].shape)
        output = self.fc1(ht[-1, :, :])
        output = self.fc2(ht[-1, :, :])
        #print(output.shape)
        output = self.sigmoid(output)
        return output

In [None]:
# Setting up model parameters
lstm_model = LSTM_Net().to(device)
loss_function = nn.BCELoss()
optimizer = torch.optim.AdamW(lstm_model.parameters(), lr=lr)

early_stopping = 5
notImproved = 0
bestLoss = None
bestModel = None
bestEpoch = 0

trainArr = []
valArr = []

for epoch in range(1, epochs + 1): 
    
    train_loss = 0.0
    lstm_model.train()
    for batch_idx, data in enumerate(train_loader):
        
        # get the input
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device).float()
        
        optimizer.zero_grad()        
        outputs = lstm_model(inputs)
        
        outputs = outputs.squeeze(1)
        
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    train_loss /= len(train_loader.dataset)
    
    trainArr.append(train_loss)
    
    valid_loss = 0
    labs = []
    preds = []
    
    lstm_model.eval()
    with torch.no_grad():        
        for batch_idx, data in enumerate(valid_loader):
            # get the input
            inputs, labels = data
        
            inputs = inputs.to(device)
            labels = labels.to(device).float()
            
            outputs = lstm_model(inputs)
            outputs = outputs.squeeze(1)
        
            labs.extend(labels)
            preds.extend(torch.round(outputs))
            valid_loss += loss_function(outputs, labels).item()
            
    valid_loss /= len(valid_loader.dataset)
    
    valArr.append(valid_loss)
    print("Accuracy on validation set: ", CheckAccuracy(labs, preds))
    
    if bestLoss == None:
        bestLoss = valid_loss
    
    if valid_loss <= bestLoss:
        bestLoss = valid_loss
        bestModel = lstm_model
        notImproved = 0
        bestEpoch = epoch
    else:
        notImproved +=1
        
    if notImproved >= early_stopping:
        break
    
print(bestEpoch)

lstm_model = bestModel

In [None]:
plt.figure(figsize=(10,5))
plt.scatter(np.arange(0,len(trainArr)), trainArr, color='r', label='Training loss')
plt.scatter(np.arange(0,len(valArr)), valArr, color='g', label='Validation loss')
plt.title("Training loss vs Validation loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend(loc="upper right")
plt.show()

In [None]:
def F1_Scores(preds, labels):
    true_positives = 0
    true_negatives = 0
    false_positives = 0
    false_negatives = 0
    for i in range(len(labs)):
        if labels[i]==1 and preds[i]==1:
            true_positives += 1
        if labels[i]==0 and preds[i]==0:
            true_negatives += 1
        if labels[i]==0 and preds[i]==1:
            false_positives += 1
        if labels[i]==1 and preds[i]==0:
            false_negatives += 1
    print("true_positives", true_positives)
    print("true_negatives", true_negatives)
    print("false_positives", false_positives)
    print("false_negatives", false_negatives)
    
    return true_positives, true_negatives, false_positives, false_negatives

In [None]:
labs = []
preds = []
lstm_model.eval()
with torch.no_grad():        
    for batch_idx, data in enumerate(test_loader):
        # get the input
        inputs, labels = data
        
        inputs = inputs.to(device)
        labels = labels.to(device).float()
        
        # forward + backward + optimize
        outputs = lstm_model(inputs)
        outputs = outputs.squeeze(1)
        
        labs.extend(labels)
        preds.extend(torch.round(outputs))
print("Accuracy on test set: ", CheckAccuracy(labs, preds))

labs = []
preds = []
lstm_model.eval()
with torch.no_grad():        
    for batch_idx, data in enumerate(valid_loader):
        # get the input
        inputs, labels = data
        
        inputs = inputs.to(device)
        labels = labels.to(device).float()
        
        # forward + backward + optimize
        outputs = lstm_model(inputs)
        outputs = outputs.squeeze(1)
        
        labs.extend(labels)
        preds.extend(torch.round(outputs))
print("Accuracy on val set: ", CheckAccuracy(labs, preds))

labs = []
preds = []
lstm_model.eval()
with torch.no_grad():        
    for batch_idx, data in enumerate(train_loader):
        # get the input
        inputs, labels = data
        
        inputs = inputs.to(device)
        labels = labels.to(device).float()
        
        # forward + backward + optimize
        outputs = lstm_model(inputs)
        outputs = outputs.squeeze(1)
        
        labs.extend(labels)
        preds.extend(torch.round(outputs))
print("Accuracy on train set: ", CheckAccuracy(labs, preds))