In [None]:
!pip install wandb fasttext plotly

In [1]:
from google.colab import drive
import drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import json
import numpy as np
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
import fasttext.util
import plotly.express as px
import sys
import wandb
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.nn.functional import sigmoid



In [None]:
# "neutrale", "odio" aaf831dabc88d936d4e6b439b798bb4cb42814ea

ROOT_PATH = "/content/drive/MyDrive/uni/nlp/nlp2024-hw1-b"



In [None]:
wandb.login()

In [None]:


class HaSpeeDe_Dataset(Dataset):
    
    
    def __init__(self, data_path: str, data: list[tuple[list, int]]=None, use_embeddings: bool=False, stopwords_file_path: str="", device="cpu") -> None:
        self.device = device
        if data is not None:
            self.data = data
        else:
            if stopwords_file_path != "":
                with open(stopwords_file_path, 'r', encoding="UTF8") as f:
                    stopwords = f.readlines() #controllare carattere di andare a capo
            if use_embeddings:
                fasttext.util.download_model('it', if_exists='ignore')
                embeddings = fasttext.load_model('cc.it.300.bin')
            self.data = []
            with open(data_path, 'r', encoding="UTF8") as f:
                for line in f:
                    item = json.loads(line)
                    sentence = item['text'].split()
                    if stopwords_file_path != "":
                        sentence = [word for word in sentence if word not in stopwords]
                    if use_embeddings:
                        embedded_sentence = []
                        for word in sentence:
                            embedded_sentence.append(embeddings.get_word_vector(word))
                        sentence = embedded_sentence  
                    self.data.append((sentence, item['label']))
                    
    def __len__(self) -> int:
        return len(self.data)
    
    def __getitem__(self, idx: int) -> tuple[list, int]:
        return self.data[idx]
    
    def split(self, prc: float) -> list[tuple[list, int]]:
        validation_size = int(prc * len(self.data))
        train_size = len(self.data) - validation_size
        validation_data, self.data = torch.utils.data.random_split(self.data, [validation_size, train_size])
        return validation_data
    
    
    def collate(self, batch: list[tuple[list, int]]) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        texts, labels = zip(*batch)
        lens = [len(text) for text in texts]
        texts = pad_sequence([torch.tensor(text) for text in texts], batch_first=True)
        return texts.to(self.device), torch.tensor(labels).to(self.device), torch.tensor(lens).to(self.device)
    
    def get_dataloader(self, batch_size: int, shuffle: bool) -> DataLoader:
        return DataLoader(self, batch_size=batch_size, shuffle=shuffle)


    def print_data_analysis(self):
        y= [0,0]
        for el in self.data:
            if el[1] == 1:
                y[1] += 1
            else:
                y[0] += 1
        fig = px.bar(x=["neutrale", "odio"], y=y)
        fig.show()
        





In [None]:
# Function to print a progress bar
def print_progress_bar(percentuale: float, lunghezza_barra: int = 30) -> None:
    blocchi_compilati = int(lunghezza_barra * percentuale)
    barra = "[" + "=" * (blocchi_compilati - 1) + ">" + " " * (lunghezza_barra - blocchi_compilati) + "]"
    sys.stdout.write(f"\r{barra} {percentuale * 100:.2f}% complete")
    sys.stdout.flush()

In [None]:


class Trainer():
    
    def __init__(self, model,train_dataloader, validation_dataloader, optimizer, loss_function, device):
        self.model = model.to(device)
        self.train_dataloader = train_dataloader
        self.validation_dataloader = validation_dataloader
        self.optimizer = optimizer
        self.loss_function = loss_function
        self.device = device
        
        
    @staticmethod
    def evaluation_parameters(y_true, y_pred):
        cm = confusion_matrix(y_true, y_pred).ravel()
        precision = precision_score(y_true, y_pred)
        recall = recall_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)
        accuracy = accuracy_score(y_true, y_pred)
        return cm, precision, recall, f1, accuracy


    def train(self, epochs: int, use_wandb: bool = False, config: dict = {}):
        if use_wandb:
            wandb.init(
                # Set the project where this run will be logged
                project="nlp-hw-1b",
                # Track hyperparameters and run metadata
                config=config
            )
        for epoch in range(epochs):
            self.model.train()  # Set the model to training mode
            total_loss = 0
            
            for i, batch in enumerate(self.train_dataloader):
                print_progress_bar(i / len(self.train_dataloader))
                # Get the inputs and targets from the batch
                inputs, targets, lens = batch
                # Zero the gradients
                self.optimizer.zero_grad()
                # Forward pass
                outputs = self.model((inputs, lens))
                # Compute loss
                loss = self.loss_function(outputs, targets)
                # Backward pass and optimize
                loss.backward()
                self.optimizer.step()
                # Accumulate the total loss
                total_loss += loss.item()
                
            # Print the average loss for this epoch
            if use_wandb:
                wandb.log({"train_loss": total_loss / len(self.train_dataloader)})
            self.validate(use_wandb)
            

    def validate(self, use_wandb: bool = False):
        self.model.eval()  # Set the model to evaluation mode
        total_loss = 0
        all_predictions = torch.tensor([], device=self.device)
        all_targets = torch.tensor([], device=self.device)
        with torch.no_grad():  # Do not calculate gradients
            for i, batch in enumerate(self.validation_dataloader):
                print_progress_bar(i / len(self.validation_dataloader))
                # Get the inputs and targets from the batch
                inputs, targets, lens = batch
                # Forward pass
                outputs = self.model((inputs,lens))
                # Compute loss
                loss = self.loss_function(outputs, targets)
                # Accumulate the total loss
                total_loss += loss.item()
                # Store predictions and targets
                all_predictions = torch.cat((all_predictions, outputs.squeeze().round()))
                all_targets = torch.cat((all_targets, targets))
        
        if use_wandb:
            wandb.log({"validation_loss": total_loss / len(self.validation_dataloader),
                       "precision": precision_score(all_targets, all_predictions),
                       "recall": recall_score(all_targets, all_predictions),
                       "f1": f1_score(all_targets, all_predictions),
                       "accuracy": accuracy_score(all_targets, all_predictions)})
        return total_loss / len(self.validation_dataloader)



In [None]:


class BaselineStratifiedModel(nn.Module):
    
    def __init__(self, len0, len1):
        super(BaselineStratifiedModel, self).__init__()
        self.p = len0/(len0+len1)
        
    def forward(self, x):
        return torch.tensor([0 if np.random.rand() < self.p else 1 for _ in range(x.shape[0])])



class BaselineSimpleModel(nn.Module):
    
    def __init__(self, input_size, output_size):
        super(BaselineSimpleModel, self).__init__()
        self.linear = nn.Linear(input_size, output_size)
        
    def forward(self, x):
        seq, lens = x
        packed = pack_padded_sequence(seq, lens, batch_first=True, enforce_sorted=False)
        data = self.linear(packed.data).squeeze()
        packed.data = sigmoid(data)
        seq, lens = pad_packed_sequence(packed, batch_first=True)
        return torch.tensor([sum(el)/len(el) for el in seq])
        
        

