# CS4248 Project Group 23

In [None]:
# If you wish to run this on Google Colab, mount the Google drive by running this cell or click the `files` icon on the left navbar
# and click mount Google Drive (it takes some time to load)
# from google.colab import drive
# drive.mount('/content/drive')

# %cd "/content/drive/My Drive/<The path to this notebook in your Google Drive>"
# !cd "/content/drive/My Drive/<The path to this notebook in your Google Drive>"

In [None]:
# To check if torch is working on m1
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

In [None]:
import pandas as pd
from sklearn.metrics import f1_score
import numpy as np

In [None]:
# Unzip raw_data.zip locally
import zipfile
with zipfile.ZipFile('raw_data.zip', 'r') as zip_ref:
    zip_ref.extractall()

Feature Engineering: Capture various features of the text (e.g. punctuation, stopwords, statement length). 
Test out different tokenizers to capture their performance.


In [None]:
import gensim.downloader as api

def load_glove_model():
    glove_model = api.load('glove-wiki-gigaword-300')
    print("Done.",len(glove_model)," words loaded!")
    return glove_model, glove_model.vector_size

glove_model, glove_dim = load_glove_model()

In [None]:
import re
import string
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# todo parallelize this in future


def preprocess_text(text):
    # Lowercase text
    text = text.lower()
    # Remove punctuation
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    # tokenize
    words = text.split()
    # Get the GloVe vectors
    vectors = [glove_model[word] for word in words if word in glove_model]
    # If vectors is empty, return a vector of zeros
    if not vectors:
        print("No vectors found for the text: ", text)
        return np.zeros(glove_model.vector_size)
    return np.mean(vectors, axis=0)

In [None]:
train = pd.read_csv("./raw_data/fulltrain.csv", names=['Verdict', 'Text'])
x_train = np.array([preprocess_text(text) for text in train['Text']]) 
# y_train = train['Verdict'].values # - 1 # subtract 1 to make the labels 0-based
y_train = train['Verdict'].apply(lambda x: 1 if x == 4 else 0).values # convert to binary- label 4 = trusted
print(y_train)
y_train = y_train.reshape((-1, 1))

test = pd.read_csv("./raw_data/balancedtest.csv", names=['Verdict', 'Text'])
X_test = np.array([preprocess_text(text) for text in test['Text']]) 
y_test = test['Verdict'].apply(lambda x: 1 if x == 4 else 0).values # convert to binary- label 4 = trusted
y_test = y_test.reshape((-1, 1))

Test out different kinds of models and find the most effective architectures.|

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset

class SimpleNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 300) 
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(300, 200)  
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(200, 100)
        self.relu3 = nn.ReLU()
        self.fc4 = nn.Linear(100, num_classes)
        self.softmax = nn.Softmax(dim = 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu1(out)
        out = self.fc2(out)
        out = self.relu2(out)
        out = self.fc3(out)
        out = self.relu3(out)
        out = self.fc4(out)
        return self.sigmoid(out) #self.softmax(out)

In [None]:
# Vectorize the text if you didn't use GloVe
from sklearn.feature_extraction.text import tfidfVectorizer
vectorizer = tfidfVectorizer()
vectorizer.fit(x_train)
x_train = vectorizer.transform(x_train).toarray()

In [None]:


def train_model(model, X_train, y_train, num_epochs=5, learning_rate=0.001):
    global epochs
    global loss_arr
    global f1_score_arr
    epochs = []
    loss_arr = []
    f1_score_arr = []
    # Convert numpy arrays to PyTorch tensors
    X_train = torch.from_numpy(X_train).float()
    y_train = torch.from_numpy(y_train).float()
    print(X_train)
    print(y_train)
    # Create a DataLoader for the training data
    train_data = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_data, batch_size=y_train.shape[0])

    # Define the loss function and optimizer
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    best_f1_score = 0.0
    
    # Train the model
    for epoch in range(num_epochs):
        epochs.append(epoch + 1)
        epoch_loss = 0
        for i, (texts, labels) in enumerate(train_loader):
            outputs = model(texts)
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        print ("Epoch: {}, Loss: {}".format(epoch, epoch_loss))
        output = model(X_test_tensor)
        result = (output.data > 0.5).long()
        test_f1_score = f1_score(y_test, result.numpy(), average='macro')
        if (test_f1_score > best_f1_score):
            best_f1_score = test_f1_score
            print("new best:")
        print(test_f1_score)
        optimizer.zero_grad()
        optimizer.step()
        loss_arr.append(epoch_loss)
        f1_score_arr.append(test_f1_score)
        

In [None]:
# train your model

print(x_train)
print(y_train)

model = SimpleNN(input_size=300, num_classes=1) 
train_model(model, x_train, y_train, num_epochs=400, learning_rate=0.0005)

x_train_tensor = torch.from_numpy(x_train).float()
print(model(x_train_tensor))
y_pred = model(x_train_tensor).round().detach().numpy()

In [None]:
from matplotlib import pyplot
import seaborn
# pyplot.plot(epochs, loss_arr, epochs, f1_score_arr)
# pyplot.title("Plot of F1-score and Training loss vs # of epochs")
seaborn.plotting_context("poster")

seaborn.lineplot(x=epochs, y=loss_arr)
seaborn.lineplot(x=epochs, y=f1_score_arr)
pyplot.title("Plot of F1-score and Training loss vs number of epochs")
pyplot.xlabel("Number of Epochs")
pyplot.ylabel("Training Loss (blue) or F1-Score (orange)")
seaborn.plotting_context("poster")



In [None]:
# Or load a previously saved model
#model = SimpleNN(input_size=300, hidden_size=100, num_classes=4)
model.load_state_dict(torch.load('./trained_models/model.pth'))

Perform hyperparameter tuning on best 3 models.

In [None]:
from sklearn.model_selection import GridSearchCV

# or perform hyperparameter tuning
# Create the hyperparameters grid
param_grid = {

}

grid_search = GridSearchCV(())

# Train
grid_search.fit(x_train, y_train)

print(grid_search.best_params_)

# Use best model
model = grid_search.best_estimator_

In [None]:
# get the training error
print(y_train)
print(y_pred)
f1_score(y_train, y_pred, average='macro')
# print for the train set f1 score is
print("Train Set: " + str(f1_score(y_train, y_pred, average='macro')))
print("Train Set Accuracy: " + str(np.count_nonzero(y_train == y_pred) / y_train.shape[0]))

In [None]:
# get the prediction for the test set
test = pd.read_csv('./raw_data/balancedtest.csv')
X_test = np.array([preprocess_text(text) for text in test.iloc[:, 1]]) 
X_test_tensor = torch.from_numpy(X_test).float()
output = model(X_test_tensor)
result = (output.data > 0.5).long()
# _, result = torch.max(output.data, 1)

# adjust the labels in the test set to be in the range 0-3
#y_test = test.iloc[:, 0].values 
y_test = test.iloc[:, 0].apply(lambda x: 1 if x == 4 else 0).values # convert to binary- label 4 = trusted
y_test = y_test.reshape((-1, 1))

#print(y_test)
#print(result.numpy())

#for i in range(y_test.shape[0]):
#    print(y_test[i])
#    print(result.numpy()[i])

# get the f1 score against the test set
print("Test Set F1: " + str(f1_score(y_test, result.numpy(), average='macro')))
print(result.numpy())
print(y_test)
print(np.count_nonzero(y_test == result.numpy()))
print(y_test.shape)
print("Test Set Accuracy: " + str(np.count_nonzero(y_test == result.numpy()) / y_test.shape[0]))

In [None]:
# save the model
torch.save(model.state_dict(), './trained_models/model.pth')

In [None]:
from sklearn.metrics import f1_score, accuracy_score, precision_score, confusion_matrix
from afinn import Afinn
import nltk
import pandas as pd
import numpy as np
from nltk.corpus import wordnet as wn
nltk.download('wordnet')

test = pd.read_excel("./raw_data/test.xlsx").rename({'Satirical =1 Legitimate=0': 'Verdict', 'Full Text ': 'Text'}, axis=1) #raw dataframe
#print(test.iloc[:, 2][0])
X_test2 = np.array([preprocess_text(text) for text in test.iloc[:, 2]])
X_test_tensor2 = torch.from_numpy(X_test2).float()
output2 = model(X_test_tensor2)
print(X_test2)
print(output2)
result2 = 1 - (output2.data > 0.5).long().detach().numpy()#(output2.data > 0.8).long() # My model trains on Satirical = 0, Legitimate = 1
y_actual = test['Verdict']
y_pred_test = result2  # add your model's results when predicting on test.xlsx

print(y_pred_test.shape)

#for i in range(360):
#    print(y_actual[i])
#    print(y_pred_test[i])

def get_superlatives(text):
    superlatives = []
    # Tokenize text into words
    words = word_tokenize(text)
    # Tag words with part-of-speech (POS)
    tagged_words = pos_tag(words)
    # Filter words tagged as superlatives
    for word, pos in tagged_words:
        if pos == 'JJS' or word.endswith('est') or word.startswith('most'):
            superlatives.append(word)
    return superlatives

print("Test F1 score is:", f1_score(y_actual, y_pred_test))
print("Accuracy:", accuracy_score(y_actual, y_pred_test))
print("Precision:", precision_score(y_actual, y_pred_test))
print("Confusion Matrix:")
print(confusion_matrix(y_actual, y_pred_test))

# Per y output type
# print ('Per y label-----')
# for i in range(1, 5):
#     y_cat_actual = y_actual_4outputs[y_actual_4outputs == i].apply(lambda x: 1 if x == 4 else 0) 
#     y_cat_pred = y_pred_test[y_actual_4outputs == i]
#     print("Accuracy:", accuracy_score(y_cat_actual, y_cat_pred))
#     print("Precision:", precision_score(y_cat_actual, y_cat_pred))
#     print("Confusion Matrix:")
#     print(confusion_matrix(y_cat_actual, y_cat_pred))

print()
print('By chracter lengths-----')
text_lengths = test['Text'].apply(lambda entry: len(entry.strip()))
min_length, max_length = min(text_lengths), max(text_lengths)
Q1, Q2, Q3 = np.percentile(text_lengths, 25), np.percentile(text_lengths, 50), np.percentile(text_lengths, 75)
text_char_limits = [min_length, Q1, Q2, Q3, max_length]
for i in range(4):
    y_cat_actual = y_actual[(text_lengths >= text_char_limits[i]) & ((text_lengths <= text_char_limits[i+1]))]
    y_cat_pred = y_pred_test[(text_lengths >= text_char_limits[i]) & ((text_lengths <= text_char_limits[i+1]))]
    print(f'F1 score for {i+1}th quartile: {f1_score(y_cat_actual, y_cat_pred)}')
    print(f'Accuracy for {i+1}th quartile: {accuracy_score(y_cat_actual, y_cat_pred)}')
    print(f'Precision for {i+1}th quartile: {precision_score(y_cat_actual, y_cat_pred)}')


print()
print('By domain')
domains = test['Domain'].unique()
for domain in domains:
    y_cat_actual = y_actual[test['Domain'] == domain]
    y_cat_pred = y_pred_test[test['Domain'] == domain]
    print(f'Entries in {domain}: {len(y_cat_actual)}')
    print(f'F1 score for {domain}: {f1_score(y_cat_actual, y_cat_pred)}')
    print(f'Accuracy for {domain}: {accuracy_score(y_cat_actual, y_cat_pred)}')
    print(f'Precision for {domain}: {precision_score(y_cat_actual, y_cat_pred)}')

print()
print('By Subtopic')
subtopics = test['Subtopic'].unique()
for subtopic in subtopics:
    y_cat_actual = y_actual[test['Subtopic'] == subtopic]
    y_cat_pred = y_pred_test[test['Subtopic'] == subtopic]
    print(f'Entries in {subtopic}: {len(y_cat_actual)}')
    print(f'F1 score for {subtopic}: {f1_score(y_cat_actual, y_cat_pred)}')
    print(f'Accuracy for {subtopic}: {accuracy_score(y_cat_actual, y_cat_pred)}')
    print(f'Precision for {subtopic}: {precision_score(y_cat_actual, y_cat_pred)}')

print()
print('By superlatives per sentence')
superlatives_per_length = test['Text'].apply(lambda entry: len(get_superlatives(entry))/len(entry.strip()))
print(superlatives_per_length)
min_length, max_length = min(superlatives_per_length), max(superlatives_per_length)
Q1, Q2, Q3 = np.percentile(superlatives_per_length, 25), np.percentile(superlatives_per_length, 50), np.percentile(superlatives_per_length, 75)
superlative_limits = [min_length, Q1, Q2, Q3, max_length]
for i in range(4):
    y_cat_actual = y_actual[(superlatives_per_length >= superlative_limits[i]) & ((superlatives_per_length <= superlative_limits[i+1]))]
    y_cat_pred = y_pred_test[(superlatives_per_length >= superlative_limits[i]) & ((superlatives_per_length <= superlative_limits[i+1]))]
    print(f'F1 score for {i+1}th quartile: {f1_score(y_cat_actual, y_cat_pred)}')
    print(f'Accuracy for {i+1}th quartile: {accuracy_score(y_cat_actual, y_cat_pred)}')
    print(f'Precision for {i+1}th quartile: {precision_score(y_cat_actual, y_cat_pred)}')

print()
print('By sentiment analysis (+ve = happy, -ve = sad)')
afn = Afinn()
sentiment_scores = test['Text'].apply(afn.score).apply(lambda s: 1 if s > 3 else -1 if s < -3 else 0)
all_scores = sentiment_scores.unique()
for score in all_scores:
    y_cat_actual = y_actual[sentiment_scores == score]
    y_cat_pred = y_pred_test[sentiment_scores == score]
    print(f'Entries for score {score}: {len(y_cat_actual)}')
    print(f'F1 score for score {score}: {f1_score(y_cat_actual, y_cat_pred)}')
    print(f'Accuracy for score {score}: {accuracy_score(y_cat_actual, y_cat_pred)}')
    print(f'Precision for score {score}: {precision_score(y_cat_actual, y_cat_pred)}')
     