In [1]:
# !pip3 install torchtext==0.17.0

In [2]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import os
import pandas as pd
from torch.hub import load_state_dict_from_url
import re
import string
from string import digits
import spacy
from nltk.corpus import stopwords
import nltk
import torch.nn.functional as F
nltk.download('stopwords')
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
import time


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/Team7/.venv/lib/python3.11/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "/home/Team7/.venv/lib/python3.11/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
  File "/home/Team7/.venv/lib/python3.11/site-packages/ipykernel/kernelapp.py", line 739, in start
    self.io_loop.start()
  File "/home/Team7/.venv/lib/pyth

#Read Data

In [3]:
# Liar constants
LIAR_LABELS_TO_INDEX =  {
                            "pants-fire": [1.0,0.0,0.0,0.0,0.0,0.0],
                            "false": [0.0,1.0,0.0,0.0,0.0,0.0],
                            "barely-true": [0.0,0.0,1.0,0.0,0.0,0.0],
                            "half-true": [0.0,0.0,0.0,1.0,0.0,0.0],
                            "mostly-true": [0.0,0.0,0.0,0.0,1.0,0.0],
                            "true": [0.0,0.0,0.0,0.0,0.0,1.0],
                        }

LIAR_HEADER = ["id", "label", "statement", "subject", "speaker", "job_title", "state_info", "party_affiliation"
, "barely_true_counts", "false_counts", "half_true_counts", "mostly_true_counts", "pants_on_fire_counts", "context_venue_or_location"]

In [4]:
root_path = "dataset/"

train_df = pd.read_csv(root_path + 'train.tsv', sep='\t', header=None, names=LIAR_HEADER)
valid_df = pd.read_csv(root_path + 'valid.tsv', sep='\t', header=None, names=LIAR_HEADER)
test_df = pd.read_csv(root_path + 'test.tsv', sep='\t', header=None, names=LIAR_HEADER)


#Preprocessing
Remove punctuation and stop words

In [5]:
class PreprocessingConfig():
  def __init__(self, remove_quotations: bool = False, remove_punctuation: bool= False, remove_stop_words: bool= False, to_lower_case: bool = False, add_sos_eos_tokens: bool = True):
    self.remove_quotations = remove_quotations
    self.remove_punctuation = remove_punctuation
    self.remove_stop_words = remove_stop_words
    self.to_lower_case = to_lower_case
    self.add_sos_eos_tokens = add_sos_eos_tokens

In [6]:
# TODO replace unknown and nans
def preprocess_text(df_series, config: PreprocessingConfig):
  if config.remove_quotations:
    df_series = df_series.apply(lambda x: re.sub("'", '',x))

  if config.remove_punctuation:
    punctuation = set(string.punctuation)
    df_series  = df_series.apply(lambda x: ''.join(ch for ch in x if ch not in punctuation ))

  if config.remove_stop_words:
    stop_words = stopwords.words('english')
    df_series - df_series.apply(lambda x:  ' '.join([word for word in x.split() if word not in (stop_words)]) )

  if config.to_lower_case:
    df_series = df_series.apply(lambda x: x.lower())

  if config.add_sos_eos_tokens:
    df_series  = df_series.apply(lambda x: '<sos> ' + x + " <eos>")

  return  df_series

In [7]:
def preprocess_liar_statements(df):
  config = PreprocessingConfig()
  df["processed_statement"] = preprocess_text(df["statement"], config)
  df["labels_index"] = df["label"].apply(lambda x: LIAR_LABELS_TO_INDEX[x])
  return df

#Create the vocabulary

In [8]:
from torchtext.vocab import build_vocab_from_iterator
def yield_tokens(series):
    for text in df["processed_statement"]:
        yield text.split()

In [9]:
df = preprocess_liar_statements(train_df)
liar_vocab = build_vocab_from_iterator(yield_tokens(df["processed_statement"]), specials=["<unk>", "<pad>"])
liar_vocab.set_default_index(liar_vocab['<unk>'])

#Create Dataset

In [10]:
# Load data
class LiarDataset(Dataset):

  def __init__(self, df, vocab, transform=None, target_transform=None):
    self.data = df["processed_statement"]
    self.labels = df["labels_index"]
    self.vocab = vocab

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    example = self.data[idx]
    label = self.labels[idx]
    numerical_tokens = [self.vocab[token] for token in example.split()]

    return torch.tensor(numerical_tokens), label



In [11]:
from torch.nn.utils.rnn import pad_sequence
# We do the padding here because the sentences in each batch should have the same dimension.
class MyCollate:
    # pad_idx is the index for the <pad> token
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        # Batch is a tuple of sentences and their respective labels, this can be changed depending on the input fields we are using for training
        # by changing the get item function in the dataset class.
        sentences = [x[0] for x in batch]
        labels = [x[1] for x in batch]
        paded_sentences = pad_sequence(sentences, batch_first=True, padding_value = self.pad_idx)
        return paded_sentences , torch.tensor(labels)

In [12]:
train_dataset = LiarDataset(preprocess_liar_statements(train_df), liar_vocab)
validation_dataset = LiarDataset(preprocess_liar_statements(valid_df), liar_vocab)
test_dataset = LiarDataset(preprocess_liar_statements(test_df), liar_vocab)

train_loader = DataLoader(train_dataset, batch_size=32, collate_fn = MyCollate(pad_idx=liar_vocab["<pad>"]))
validation_loader = DataLoader(validation_dataset, batch_size=32, collate_fn = MyCollate(pad_idx=liar_vocab["<pad>"]))
test_loader = DataLoader(test_dataset, batch_size=32, collate_fn = MyCollate(pad_idx=liar_vocab["<pad>"]))

#Create the models

In [13]:
# Create the baseline model
class BiLSTMTextClassifierModel(nn.Module):

  def __init__(self, vocab, embedding_dim, hidden_dim, number_of_labels):
    super(BiLSTMTextClassifierModel, self).__init__()
    self.number_of_labels = number_of_labels
    self.embedding = nn.Embedding(len(vocab), embedding_dim, vocab["<pad>"])
    self.rnn = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
    self.top_layer = nn.Linear(2*hidden_dim, self.number_of_labels)
    self.relu = nn.ReLU()
    self.softmax = F.softmax

  def forward(self, x):
    embeddings = self.embedding(x)
    rnn_output, _ = self.rnn(embeddings)
    last_hidden = rnn_output[:, -1, :]
    top_layer_output = self.top_layer(self.relu(last_hidden))
    return self.softmax(self.relu(top_layer_output), dim=-1)



In [14]:
# trains one batch, returns total batch loss
def train_one_batch(model, inputs, targets, optimizer, loss_function):
        # Predict/Forward Pass
        predictions = model(inputs)
        # Compute loss
        loss = loss_function(predictions, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Multiply the cross entropy loss which is the average by the batch size so we get the total loss for the batch, we can divide this by all data set 
        # Size to get average loss for the epoch
        batch_size_train =  len(inputs)
        batch_loss = loss.item() * batch_size_train
        return batch_loss
    

In [15]:
# validates one batch, returns total batch loss, number of true positives
def validate_one_batch(model, inputs, targets, loss_function):
        predictions_val = model(inputs).detach()
        loss_validation = loss_function(predictions_val, targets)
        
        
        # calculate average loss
        batch_size_val = len(inputs)
        batch_loss = loss_validation.item() * batch_size_val
        
        # Calculate True positives
        predicted_class = predictions_val.argmax(axis=1)
        correct_class = targets.argmax(axis=1)
    
        true_positives_count = sum(predicted_class == correct_class).item()
        return batch_loss, true_positives_count

In [16]:
# Train model
model = BiLSTMTextClassifierModel(liar_vocab, 300, 128, 6)
# Training params
num_epochs = 50
# Hyper parameters
learning_rate = 0.001

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
device = torch.device("cuda:6")
model.to(device)
loss_fn.to(device)

train_loss = []
val_loss = []
val_accuracy_history = []
model.train()
for epoch in range(num_epochs):
    start_time = time.time()
    # Training
    model.train()
    total_train_loss = 0. 
    total_data_points_train = 0
    for train_batch in train_loader:
        inputs = train_batch[0].to(device)
        targets = train_batch[1].to(device)
        
        train_batch_loss = train_one_batch(model, inputs, targets, optimizer, loss_fn)
        
        total_train_loss += train_batch_loss
        total_data_points_train += len(inputs)
    
    
    average_epoch_loss_train = total_train_loss/total_data_points_train
    train_loss.append(average_epoch_loss_train)
    
    # Validation
    # TODO restructure
    model.eval()        
    total_val_loss = 0. 
    total_data_points_val = 0
    true_positives_val = 0
    for validation_batch in validation_loader:
        inputs_val = validation_batch[0].to(device)
        targets_val = validation_batch[1].to(device)
        
        # Returns loss and true positives count
        batch_loss_val, true_positives_count = validate_one_batch(model, inputs_val, targets_val, loss_fn)
            
        # calculate average loss and appends true positives count
        total_val_loss += batch_loss_val
        total_data_points_val += len(inputs_val)
        true_positives_val += true_positives_count
    
    average_epoch_loss_val = total_val_loss/total_data_points_val
    val_accuracy = true_positives_val/total_data_points_val
    val_accuracy_history.append(val_accuracy)    
    val_loss.append(average_epoch_loss_val)
    
        
    # Print every epoch's metrics
    elapsed_time = time.time() - start_time
    print(f"epoch {epoch + 1}, average train loss: {average_epoch_loss_train}, average val loss: {average_epoch_loss_val}, val accuracy: {val_accuracy},  training time : {elapsed_time}")
    
      

epoch 1, average train loss: 1.7836974520236253, average val loss: 1.7813379786838994, val accuracy: 0.21573208722741433,  training time : 2.9156482219696045
epoch 2, average train loss: 1.7834785602986813, average val loss: 1.789648190094303, val accuracy: 0.19470404984423675,  training time : 2.095219373703003
epoch 3, average train loss: 1.7815971709787846, average val loss: 1.7851098293084593, val accuracy: 0.205607476635514,  training time : 1.9737038612365723
epoch 4, average train loss: 1.7747184906154871, average val loss: 1.7953519204695276, val accuracy: 0.220404984423676,  training time : 1.861588716506958
epoch 5, average train loss: 1.7767615742981433, average val loss: 1.7919336234297707, val accuracy: 0.1923676012461059,  training time : 2.1257686614990234
epoch 6, average train loss: 1.762551800906658, average val loss: 1.785548113588232, val accuracy: 0.21261682242990654,  training time : 2.2622108459472656
epoch 7, average train loss: 1.7333800189197064, average val l

In [17]:
# Test the model
model.eval()        
total_test_loss = 0. 
total_data_points_test = 0
true_positives_test = 0
i=0
for test_batch in test_loader:
    inputs_test = test_batch[0].to(device)
    targets_test = test_batch[1].to(device)
    # Returns loss and true positives count
    batch_loss_test, true_positives_count = validate_one_batch(model, inputs_test, targets_test, loss_fn)
            
    # calculate average loss and appends true positives count
    total_test_loss += batch_loss_test
    total_data_points_test += len(inputs_test)
    true_positives_test += true_positives_count
    
average_epoch_loss_test = total_test_loss/total_data_points_test
accuracy_test = true_positives_test/total_data_points_test
print(f"average test loss: {average_epoch_loss_test}, accuracy: {accuracy_test}")
    


average test loss: 1.8184774796983438, accuracy: 0.1499605367008682
