## Inizializzazione

Installiamo la libreria Transformers, libreria open source sviluppata da Hugging Face. Questa libreria è utile per l'implementazione di modelli di trasformatori pre-addestrati

In [1]:
!pip install transformers



Configuro qui l'ambiente di lavoro su google Colab installando le principali librerie che andremo ad utilizzare.
Carichiamo inoltre il dataset subtaskB_train.csv. Solo successivamente, andremo ad attuare la data augumentation

In [2]:
import pandas as pd
import sklearn
from google.colab import drive
import nltk
import torch
from torch import nn
import numpy as np
from tqdm import tqdm
from torch.optim import Adam
from sklearn.model_selection import train_test_split
from transformers import AutoConfig, AutoModel, AutoTokenizer

nltk.download('punkt')
nltk.download('stopwords')

drive.mount('/content/gdrive')
root = "/content/gdrive/MyDrive/Colab Notebooks/torch/"
df = pd.read_csv(root+"data/subtaskB_train.csv")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Mounted at /content/gdrive


Instaziamo gli iperparametri. Molti degli iperparametri utilizzati sono stati
scelti come vedremo successivamente, grazie all'utilizzo della grid search.
Il language model utilizzato è invece xlm-roberta-base. Il motivo di questa scelta è legato alla struttura del dataset. Di fatti, se pur il dataset è un dataset in lingua italiana, contiene alcune parole o frasi in inglese il che ci ha portato ad una scelta di Roberta. Roberta è un modello multilingue, che riconosce anche le emoji, presenti nel nostro dataset

In [3]:
hyperparameters = {
  "epochs": 1000,
  "learning_rate": 1e-3,
  "batch_size": 64,
  "dropout": 0.4,
  "stopwords": True,
  "h_dim": 512,
  "patience": 300,
  "min_delta": 0.01,
  "max_seq_length": 512,
  "language_model": "xlm-roberta-base"
}

# Data preprocessing


La parte che vedremo fa parte del data pre processing. In questa cella in particolare andremo ad installare le principali librerie e a definire delle funzioni quali ad esempio "extract_most_common_emoji" che estrae le emoji più comuni , le conta e le ordina . Inoltre viene definito un pattern regex (regex_pattern) per identificare diversi elementi, tra cui le emoji.
Viene creato un nuovo pattern regex (nuovo_regex_pattern) che aggiunge un'ulteriore parte per mantenere solo le emoji desiderate (quelle presenti in top_3_emoji).

##Mantenimento Emoji più frequenti

In [4]:
!pip install emoji
from itertools import chain
from collections import Counter
import re

Collecting emoji
  Downloading emoji-2.9.0-py2.py3-none-any.whl (397 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m397.5/397.5 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: emoji
Successfully installed emoji-2.9.0


In [5]:
last_df = df


In [6]:
import emoji
nltk.download("punkt")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [7]:
def extract_most_common_emoji(text):
  emojis = [element['emoji'] for element in emoji.emoji_list(text)]
  return emojis

# Funzione di sostituzione per mantenere solo le emoji desiderate
def emoji_selezionata(match):
  return match.group(0) if match.group(0) in top_3_emoji else ''

# Esempio di utilizzo
emojis_list = [extract_most_common_emoji(commento) for commento in df.comment_text]

# Appiattimento della lista utilizzando itertools.chain
emojis_list = list(chain(*emojis_list))

# Creo dizionario contenente l'emoji con la propria frequenza
emojis_dict = Counter(emojis_list)

# Ordino il dizionario con le emoji
emojis_dict = dict(sorted(emojis_dict.items(), key=lambda x: x[1], reverse=True))

# Stampa le prime 3 emoji più frequenti
top_3_emoji = [emoji[0] for emoji in list(emojis_dict.items())[:3]]

for id, commento, conspiracy in zip(df.index, df['comment_text'], df['conspiracy']):
  # espressione regolare originale
  regex_pattern = r'\b\d{2}/\d{2}/\d{4}\b|\S+@\S+|\b\d{4}-\d{2}-\d{2}\b|https?://\S+|[^\w\s🇮🇹]'

  # Aggiungi la parte per mantenere le emoji in top_3_emoji, incluso "IT"
  nuovo_regex_pattern = f'{regex_pattern}|(?<=\s)({"|".join(re.escape(emoji) for emoji in top_3_emoji)})(?=\s)'

  #print("\n----------------------\n Commento_vecchio: ", commento)

  # unisco le 2
  commento = re.sub(nuovo_regex_pattern, emoji_selezionata, commento)

  df.loc[id, 'comment_text'] = commento

  #print("\n Commento_nuovo: ", df.loc[id, 'comment_text'])

print("Top 3 Emojis:", top_3_emoji)
print("Num Tot. Emoji: ", len(emojis_list))

Top 3 Emojis: ['🇮🇹', '❗', '⚠️']
Num Tot. Emoji:  694


In questo blocco viene implementata la lemmatizzazione che è un processo linguistico che consiste nel ricondurre una parola al suo lemma ovvero alla sua forma base.
Il codice importa il modello "SpaCy" in italiano per poi iterare su ogni "commento" del dataset e creare un oggetto "doc" che contiene un elenco di token cioè segmenti di testo che rappresentano le parole, le frasi o le parti del testo che verranno processati dal modello per poi essere uniti in un unica frase che andrà ad aggiornare il dataset con il commeto lemmatizzato

##Lemmatizzazione

In [8]:
!python -m spacy download it_core_news_sm

2024-01-05 22:36:53.552351: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-01-05 22:36:53.552415: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-01-05 22:36:53.554860: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
Collecting it-core-news-sm==3.6.0
  Downloading https://github.com/explosion/spacy-models/releases/download/it_core_news_sm-3.6.0/it_core_news_sm-3.6.0-py3-none-any.whl (13.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m53.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: it-core-news-sm
Successfully insta

In [9]:
import spacy

# Carica il modello italiano di SpaCy
nlp = spacy.load("it_core_news_sm")

for id, commento, conspiracy in zip(df.index, df['comment_text'], df['conspiracy']):
  # Processa il testo con SpaCy
  doc = nlp(commento)

  #Unisci i vari token elaborati di una frase in un unica stringa
  comment_lem = ' '.join([token.lemma_ for token in doc])

  nuovo_record = {'ID': id, 'Comment text modificato': comment_lem, 'conspiracy': conspiracy}

  df.loc[id, 'comment_text'] = comment_lem

##Conteggio parole piu frequenti

In [16]:
import nltk
from collections import Counter
import string
from nltk.stem import WordNetLemmatizer
nltk.download('stopwords')

covid_comments = df[df['conspiracy'] == 0]['comment_text']
qanon_comments = df[df['conspiracy'] == 1]['comment_text']
terrapiatta_comments=df[df['conspiracy']==2]['comment_text']
prorussia_comments=df[df['conspiracy']==3]['comment_text']

def remove_stopwords(text):
  tokens_list = nltk.word_tokenize(text, language='italian')
  return ' '.join([word for word in tokens_list if not word.lower() in nltk.corpus.stopwords.words("italian")])


# Applico la rimozione delle stopwords ai commenti
covid_comments = covid_comments.apply(remove_stopwords)
qanon_comments = qanon_comments.apply(remove_stopwords)
terrapiatta_comments = terrapiatta_comments.apply(remove_stopwords)
prorussia_comments = prorussia_comments.apply(remove_stopwords)


# Conto le parole più frequenti dopo la rimozione delle stopwords
covid_comments_word_counts = Counter(" ".join(covid_comments).split())
top_covid_words = covid_comments_word_counts.most_common(10)

qanon_word_counts = Counter(" ".join(qanon_comments).split())
top_qanon_words = qanon_word_counts.most_common(10)

terrapiatta_word_counts = Counter(" ".join(terrapiatta_comments).split())
top_terrapiatta_words = terrapiatta_word_counts.most_common(10)

prorussia_word_counts = Counter(" ".join(prorussia_comments).split())
top_prorussia_words = prorussia_word_counts.most_common(10)


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Le top parole, a parte alcuni molto evidenti come "vaccino" nel caso ad esempio del covid, dopo la lemmatizzazione diventano comuni per ogni label motivo per il quale abbiamo scelto di non inserirli nel nostro dataset finale di train

Funzione che viene utilizzata per rimuovere le stopword italiane

In [17]:
def remove_stopwords(text):
  tokens_list = nltk.word_tokenize(text, language='italian')
  return ' '.join([word for word in tokens_list if not word.lower() in nltk.corpus.stopwords.words("italian")])

Funzione che conta le parole che sono del tutto in maiuscolo. Si evince che le parole scritte del tutto in maiuscolo vengono scritte per enfatizzare un concetto con tono "duro", ad esempio in frasi cospirazioniste.

In [18]:
import re

def count_uppercase_words(sentence):
    words = re.findall(r'\b[A-Z]+\b', sentence)
    return len(words)

def count_words(sentence):
    # Rimuove la punteggiatura dalla frase
    sentence_without_punctuation = re.sub(r'[^\w\s]', '', sentence)

    #Rimuove date, link, email dalla frase
    sentence_without_punctuation = re.sub(r'\b\d{2}/\d{2}/\d{4}\b|\S+@\S+|\b\d{4}-\d{2}-\d{2}\b|https?://\S+', '', sentence)

    # Divide la frase in parole
    words = sentence_without_punctuation.split()

    # Restituisce il numero totale di parole
    return len(words)

Funzione che genera gli embeddings. Dopo aver eseguito l'inferenza con il modello, la funzione estrae l'ultimo stato nascosto (last_hidden_states) dal tensore risultante

##Data agumentation sul dataset di train

Definiamo in questa parte di codice le librerie necessarie e inizializziamo il modulo per l'unmaskin con il modello xlm-roberta-base. Questo sarà utile per la data augumentation da effettuare sul dataset di train per aumentare , nel nostro caso, raddoppiare il dataset.

In [19]:
(x_train, x_val, y_train, y_val) = train_test_split( df['comment_text'], df['conspiracy'], test_size=0.2, stratify=df['conspiracy'] , random_state=17)
(x_train,x_test,y_train, y_test) = train_test_split( x_train, y_train, test_size=0.2, random_state=17)

In [20]:
from transformers import pipeline

# Impostazione del percorso del nuovo file CSV
percorso_file_dataAug = root + "data/train_dataset_Aug.csv"

# Creazione di un nuovo DataFrame vuoto
train_dataset_Aug = pd.DataFrame()

# Inizializzazione del modulo per l'unmasking
unmasker = pipeline('fill-mask', model='xlm-roberta-base')

# Limita il numero di token da considerare
limite_massimo = 512
numero_minimo_token = 3


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/1.12G [00:00<?, ?B/s]

Some weights of the model checkpoint at xlm-roberta-base were not used when initializing XLMRobertaForMaskedLM: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
- This IS expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [21]:
def remove_stopwords(text): # text = 'testo scritto tra virgolette'
  tokens_list = nltk.word_tokenize(text, language='italian')
  return ' '.join([word for word in tokens_list if not word.lower() in nltk.corpus.stopwords.words("italian")])

Mantenimento emoji nel nuovo dataset di train, con data augumentation. Il lavoro è uguale a quello fatto precedentemente.

In [22]:
from nltk.tokenize import TweetTokenizer

# Create a reference variable for Class TweetTokenizer
tk = TweetTokenizer()

def gen_mask_sentence(sentence):

  tokenized_result = tk.tokenize(sentence)

  _tokenized_result = tokenized_result[:512]

  #genero la maschera, il while mi permette di sostituire solo elementi che siano parole
  rand_idx = random.randint(0, len(_tokenized_result) - 1)

  flag = 0

  while not _tokenized_result[rand_idx].isalpha() and flag < 100:
    rand_idx = random.randint(0, len(_tokenized_result) - 1)
    flag += 1

  _tokenized_result[rand_idx] = '<mask>'

  return ' '.join(_tokenized_result)

In [23]:
x_train = x_train.sort_index()
y_train = y_train.sort_index()

x_train.index

Int64Index([  0,   1,   2,   3,   4,   6,   8,  12,  13,  14,
            ...
            796, 798, 799, 800, 802, 803, 804, 805, 807, 808],
           dtype='int64', length=518)

Viene eseguita quindi la fase di "Data Augmentation" sul set di addestramento x_train, splittato precedentemente e differenziato quindi dai dataset di validation/test.  In particolare, viene verificata la lunghezza minima dei token , se la lunghezza dei token è minore a 3, allora il commento non verrà mascherato , altrimenti, viene utilizzato l'unmasker con xlm-roberta-base e verrà mascherato un solo token. Il nuovo commento viene aggiunto in coda al commento originale e verrà memorizzato nel dataframe prima inizializzato "train_dataset" che verrà usato successivamente per la fase di train.

In [24]:
import re
import random

for commento, conspiracy in zip(x_train, y_train):

  #rimuovo ritorni a capo
  commento = commento.replace('\n\n', '').replace('\n', '').replace('\r', '')

  # Aggiungo uno spazio dopo ogni apice, poichè si è notato che altrimenti viene tokenizzato male
  commento = commento.replace("'", "' ").replace('"', '" ')

  #Rimuovo le stopwords
  commento = remove_stopwords(commento)

  # Masking di una word nella frase
  commento_with_mask = gen_mask_sentence(commento)

  # Verifica la lunghezza minima dei token
  if len(tk.tokenize(commento)) < numero_minimo_token:
    record = {'comment_text': commento, 'conspiracy': conspiracy}
    train_dataset_Aug = pd.concat([train_dataset_Aug, pd.DataFrame([record])], ignore_index=True)

  else:
    try:
      new_commento = unmasker(commento_with_mask, tokenizer_kwargs={"truncation": True})

      record = {'comment_text': commento, 'conspiracy': conspiracy}
      train_dataset_Aug = pd.concat([train_dataset_Aug, pd.DataFrame([record])], ignore_index=True)

      record = {'comment_text': new_commento[0]['sequence'], 'conspiracy': conspiracy}
      train_dataset_Aug = pd.concat([train_dataset_Aug, pd.DataFrame([record])], ignore_index=True)

    except Exception as e:

      print(f"\nErrore: {e}")
      record = {'comment_text': commento, 'conspiracy': conspiracy}
      train_dataset_Aug = pd.concat([train_dataset_Aug, pd.DataFrame([record])], ignore_index=True)


train_dataset_Aug.to_csv(percorso_file_dataAug, index=False)



Errore: No mask_token (<mask>) found on the input


In [25]:
x_val = x_val.sort_index()
y_val = y_val.sort_index()

percorso_file_validation = root + "data/validation_dataset.csv"
validation_dataset = pd.DataFrame()

validation_dataset = pd.concat([validation_dataset, pd.DataFrame({'comment_text': x_val, 'conspiracy': y_val})], ignore_index=True)

validation_dataset.to_csv(percorso_file_validation, index=False)

In [26]:
x_test = x_test.sort_index()
y_test = y_test.sort_index()

percorso_file_test = root + "data/test_dataset.csv"
test_dataset = pd.DataFrame()

test_dataset = pd.concat([test_dataset, pd.DataFrame({'comment_text': x_test, 'conspiracy': y_test})], ignore_index=True)

test_dataset.to_csv(percorso_file_test, index=False)

# Definizione classe Dataset

In [27]:
def remove_stopwords(text): # text = 'testo scritto tra virgolette'
  tokens_list = nltk.word_tokenize(text, language='italian')
  return ' '.join([word for word in tokens_list if not word.lower() in nltk.corpus.stopwords.words("italian")])

In [28]:
def gen_embeddings(input_id_text, attention_mask):
  with torch.no_grad():
    last_hidden_states = lm_model(input_id_text, attention_mask=attention_mask).last_hidden_state
    last_hidden_states = last_hidden_states[:,0,:].squeeze(0)

  return last_hidden_states

Classe My Dataset personalizzata. La classe accetta in input x che di base è la colonna "comment_text" e le labels "conspiracy". per ogni commento, nella colonna comment_text, vengono rimosse le stopword, ed eliminati i ritorno a capo.

In [29]:
class MyDataset(torch.utils.data.Dataset):

  def __init__(self, x, y, stopwords):
    #rimuovo ritorni a capo
    x = [text.replace('\n\n', '').replace('\n', '').replace('\r', '') for text in tqdm(x, desc='Remove ritorno a capo ...')]
    # rimuovo le stopwords se sono presenti
    if stopwords:
      text_clean = []
      for sentence in tqdm(x, desc='Remove StopWord ... '):
        sentence = remove_stopwords(sentence)
        text_clean.append(sentence)
    else:
      text_clean = x

    # genero gli embeddings
    embeddings_list = []
    for text in tqdm(text_clean, desc='Generation Embeddings ...'):
      tokens = tokenizer([text], add_special_tokens=True, return_tensors='pt', padding='max_length', max_length = 512, truncation=True)
      input_id_texts = tokens['input_ids'].squeeze(1).to(device)
      mask_texts = tokens['attention_mask'].squeeze(1).to(device)
      embeddings_list.append(gen_embeddings(input_id_texts, mask_texts))


    self.embeddings = embeddings_list
    self.labels = [torch.tensor(label) for label in y]

  def classes(self):
    return self.labels

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    return self.embeddings[idx], np.array(self.labels[idx])

##EarlyStopping

Funzione base con EarlyStopping. Verrà utilizzato per terminare l'addestramento quando il modello non migliora più

In [30]:
class EarlyStopping:
  def __init__(self, patience=5, min_delta=0.0):

    self.patience = patience
    self.min_delta = min_delta
    self.counter = 0
    self.early_stop = False
    self.min_validation_loss = torch.inf

  def __call__(self, validation_loss):
    if (validation_loss + self.min_delta) >= self.min_validation_loss:
      self.counter += 1
      if self.counter >= self.patience:
        self.early_stop = True
        print("Early stop!")
    else:
      self.min_validation_loss = validation_loss

      self.counter = 0

##Definizione Classificatore

Questo codice definisce una classe MyClassifier che estende nn.Module,un modello PyTorch. Il classificatore scelto è un classificatore di tipo feed-forward . Questa tipologia è largamente usata per l'analisi testuale e la categorizzazione dei testi per la sua flessibilità e per la capacità di apprendere rappresentazioni complesse nei dati. Tra i layer vengono utilizzati tecniche di regolarizzazione come dropout per prevenire l'overfitting , fondamentale in casi come questo in cui il dataset è di ridotte dimensioni e la batch normalization che viene utilizzata per migliorare la stabilità dell'addestramento

In [31]:
class MyClassifier(nn.Module):

  def __init__(self, num_inputs, num_hidden, num_outputs,dropout):
    super().__init__()
    self.linear1 = nn.Linear(num_inputs, num_hidden)
    self.linear2 = nn.Linear(num_hidden, num_hidden//2)
    self.linear3 = nn.Linear(num_hidden//2, num_hidden//4)
    self.linear4 = nn.Linear(num_hidden//4, num_outputs)
    self.act_fn = nn.LeakyReLU(0.2)
    self.batch_norm = nn.BatchNorm1d(num_hidden)
    self.batch_norm2 = nn.BatchNorm1d(num_hidden//2)
    self.batch_norm3 = nn.BatchNorm1d(num_hidden//4)
    self.dropout  = nn.Dropout(dropout)


  def forward(self, input_texts):
    x = input_texts
    x = self.linear1(x)
    x = self.batch_norm(x)
    x = self.dropout(x)
    x = self.act_fn(x)
    x = self.linear2(x)
    x = self.batch_norm2(x)
    x = self.dropout(x)
    x = self.act_fn(x)
    x = self.linear3(x)
    x = self.batch_norm3(x)
    x = self.dropout(x)
    x = self.act_fn(x)
    x = self.linear4(x)
    return x

##Definizione delle varie funzioni per l'addestramento e il test

Definiamo la funzione f1_score_macro_sklearn che calcola la F1 Score macro utilizzando la libreria scikit-learn, come previsto dalla competizione EVALITA 2023

In [32]:
## F1 Score Macro

from sklearn.metrics import f1_score

def f1_score_macro_sklearn(y_true, y_pred):
  y_pred = torch.round(y_pred.cpu()).numpy()
  y_true = y_true.cpu().numpy()
  return f1_score(y_true, y_pred, average='macro')

Definiamo ora un ciclo di addestramento personalizzato. Viene impostato il modello in modalità train e vengono inizializzati i valori di *train_loss,true_output,all_predictions e all_labels* utili per il calcolo delle performance. Il ciclo for attraversa il dataloader di addestramento che restituisce minibatch di dati ad ogni iterazione. Il modello viene eseguito sui dati di input(data_inputs, data_uppercase) per restituire l'output .
Per il calcolo dell'accuracy viene utilizzata la softmax poichè il problema è un problema multiclasse

In [33]:
def my_train_loop(model, optimizer, train_dataloader, len_train_dataset, loss):
  #imposto il modello in modalità allenamento
  model.train()

  train_loss = 0.0
  true_output = 0.0
  all_predictions = []
  all_labels = []

  for data_inputs, data_labels in train_dataloader:

    # 1. Sposto i dati di input sul dispositivo
    data_inputs = data_inputs.to(device)
    data_labels = data_labels.to(device)

    # 2. Eseguo il modello sui dati di input
    output = model(data_inputs)

    # 3. Calcolo la perdita
    batch_loss = loss(output, data_labels) # il valore calcolato viene valutato per blocco

    # 4. Eseguo la BackPropagation

    optimizer.zero_grad()
    batch_loss.backward()

    # 5. Aggiorno i parametri (cioè i pesi)
    optimizer.step()

    # 6: Sommo il batch loss
    train_loss += batch_loss.item()

    # 7: Calcolo l'accuracy
    softmax = nn.LogSoftmax(dim=1)
    predictions= softmax(output).argmax(dim=1)
    true_output += (predictions == data_labels).sum().item()

    #8: Salvo predizioni e etichette per calcolare la F1-score macro
    all_predictions.append(predictions)
    all_labels.append(data_labels)

  #Calcolo Loss e accuracy
  train_loss = train_loss/len(train_dataloader)
  train_acc = true_output/len_train_dataset

  all_labels = torch.cat(all_labels, dim=0)
  all_predictions = torch.cat(all_predictions, dim=0)
  f1_macro = f1_score_macro_sklearn(all_labels, all_predictions)

  return train_loss, train_acc, f1_macro

Questa pate del codice definisce invece una funzione, my_test_loop , utile per valutare le performance di un modello su un set di dati di test. Il modello viene impostato in modalità di valutazione con model.eval(), per cui non verranno attivate funzioni come batchNorm e dropout (a differenza di quando vengono attivate con model.train()). Vengono inizializzati i valori per il calcolo delle performance .
La funzione restituisce l'accuratezza media, la perdita media e la F1-score macro ottenute durante la valutazione del modello su dati di test.

In [51]:
def my_test_loop(model, dataset, len_dataset, loss):
  #imposto il modello in modalità di valutazione
  model.eval()

  true_output, test_loss = 0., 0.
  all_predictions = []
  all_labels = []

  with torch.no_grad():
    for data_inputs, data_labels in dataset:

      # 1. Sposto i dati di input sul dispositivo
      data_inputs = data_inputs.unsqueeze(0).to(device)
      data_labels = torch.from_numpy(data_labels)
      data_labels = data_labels.unsqueeze(0).to(device)


      # 2. Eseguo il modello sui dati di input
      output = model(data_inputs)
      output = output

      # 3. Calcolo la perdita
      batch_loss = loss(output, data_labels)
      test_loss += batch_loss.item()

      #4: Calcolo l'accuracy

      softmax = nn.LogSoftmax(dim=1)
      predictions= softmax(output).argmax(dim=1)
      true_output += (predictions == data_labels).sum().item()

      #5: Salvo predizioni e etichette per calcolare la F1-score macro alla fine
      all_predictions.append(predictions)
      all_labels.append(data_labels)


    test_loss = test_loss / len_dataset
    test_acc = true_output / len_dataset

    all_labels = torch.cat(all_labels, dim=0)
    all_predictions = torch.cat(all_predictions, dim=0)
    f1_macro = f1_score_macro_sklearn(all_labels, all_predictions)

  return test_acc, test_loss, f1_macro

La funzione my_train_test esegue invece il processo di addestramento e valutazione di un modello su set di dati di addestramento, validazione e test.
Vengono inizializzati i dataloader per il set di dati di addestramento (train_dataloader) e le liste per monitorare i valori di perdita, accuratezza e F1-score durante l'addestramento e la validazione.

In [35]:
def my_train_test(model, optimizer, train_dataset, val_dataset, test_dataset, loss, num_epochs, early_stopping=None):

  len_train_dataset = len(train_dataset)
  len_val_dataset = len(val_dataset)
  len_test_dataset = len(test_dataset)

  train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = hyperparameters['batch_size'], shuffle=True)

  # liste dei valori di loss e accuracy epoca per epoca per il plot
  list_train_loss, list_train_acc, list_val_loss, list_val_acc = [], [], [], []
  list_train_f1, list_val_f1 = [], []

  ## A. FASE DI ADDESTRAMENTO
  for epoch in tqdm(range(num_epochs),  desc='training epch ...'):

    # 1. ADDESTRAMENTO del Modello
    train_loss, train_acc , train_f1= my_train_loop(model, optimizer, train_dataloader, len_train_dataset, loss)

    # 2. Fase di VALIDAZIONE se è presente la callback di early stopping
    if early_stopping != None:
      val_acc, val_loss,val_f1 = my_test_loop(model, val_dataset, len_val_dataset, loss)
      list_val_loss.append(val_loss)
      list_val_acc.append(val_acc)
      list_val_f1.append(val_f1)

      # EARLY STOPPING
      early_stopping(val_loss)
      if early_stopping.early_stop:
        break

    # Aggiornamento delle lista che contengono le varie valutazioni del modello
    list_train_loss.append(train_loss)
    list_train_acc.append(train_acc)
    list_train_f1.append(train_f1)

  ## B. FASE DI TESTING
  test_acc, test_loss, test_f1 = my_test_loop(model, test_dataset, len_test_dataset, loss)

  return list_train_loss, list_train_acc, list_train_f1, list_val_loss,list_val_acc,list_val_f1, test_acc,test_f1

In questa parte del codice dopo aver importato le librerie necessarie, acquisiamo il device su cui effettueremo il training. Carichiamo le configurazioni (config), il tokenizzatore (tokenizer), e il modello di linguaggio (lm_model) utilizzando la libreria Transformers di Hugging Face.
Definiamo inoltre il criterion, con la funzione di perdita CrossEntropyLoss e utilizziamo l'ottimizzatore Adam. Utilizziamo la CrossEntropyLoss perchè il problema è un problema multiclasse, viene utilizzata inoltre come funzione di attivazione del modello in combinazione con l'uso della softmax.

## Generazione dei dataset di train,validation e test

##Avvio dell'addestramento

In [46]:
from torch.optim import Adam
from transformers import AutoConfig, AutoModel, AutoTokenizer

# Acquisiamo il device su cui effettueremo il training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

config = AutoConfig.from_pretrained(hyperparameters["language_model"])
tokenizer = AutoTokenizer.from_pretrained(hyperparameters["language_model"])
lm_model = AutoModel.from_pretrained(hyperparameters["language_model"], config=config).to(device)
modelClassifier = MyClassifier(num_inputs=768,num_hidden=hyperparameters["h_dim"], num_outputs=4, dropout = hyperparameters["dropout"]).to(device)


print(modelClassifier)
#numero totale dei parametri del modello
total_params = sum(p.numel() for p in modelClassifier.parameters())
print(f"Numbero totale dei parametri: {total_params}")

criterion = nn.CrossEntropyLoss()
optimizer = Adam(modelClassifier.parameters(), lr=hyperparameters["learning_rate"])

#callback di early stopping da passare al nostro metodo di addestramento
early_stopping = EarlyStopping(patience=hyperparameters['patience'], min_delta=hyperparameters['min_delta'])

Using cuda device


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


MyClassifier(
  (linear1): Linear(in_features=768, out_features=512, bias=True)
  (linear2): Linear(in_features=512, out_features=256, bias=True)
  (linear3): Linear(in_features=256, out_features=128, bias=True)
  (linear4): Linear(in_features=128, out_features=4, bias=True)
  (act_fn): LeakyReLU(negative_slope=0.2)
  (batch_norm): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batch_norm3): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout): Dropout(p=0.4, inplace=False)
)
Numbero totale dei parametri: 560260


In questa parte del codice passiamo alla classe MyDataset i valori x_train,y_train,x_val,y_val,x_test,y_test precedentemente trovati e allo stesso modo, passiamo alla classe MyDataset i valori di comment_text e labels con data augumentation. Questo train_dataset diventa ora il dataset su cui effettuare il training

In [47]:
df_train_dataAug = pd.read_csv(root+ "data/train_dataset_Aug.csv")
df_val = pd.read_csv(root+ "data/validation_dataset.csv")
df_test = pd.read_csv(root+ "data/test_dataset.csv")

In [48]:
x_train_aug, y_train_aug = df_train_dataAug['comment_text'], df_train_dataAug['conspiracy']
x_val, y_val = df_val['comment_text'], df_val['conspiracy']
x_test, y_test = df_test['comment_text'], df_test['conspiracy']

In [49]:
train_dataset = MyDataset(x_train_aug.values, y_train_aug.values, hyperparameters["stopwords"])
val_dataset = MyDataset(x_val.values, y_val.values, hyperparameters["stopwords"])
test_dataset = MyDataset(x_test.values, y_test.values, hyperparameters["stopwords"])

Remove ritorno a capo ...: 100%|██████████| 1033/1033 [00:00<00:00, 330212.33it/s]
Remove StopWord ... : 100%|██████████| 1033/1033 [00:15<00:00, 65.74it/s] 
Generation Embeddings ...: 100%|██████████| 1033/1033 [00:30<00:00, 33.91it/s]
Remove ritorno a capo ...: 100%|██████████| 162/162 [00:00<00:00, 241463.13it/s]
Remove StopWord ... : 100%|██████████| 162/162 [00:01<00:00, 107.73it/s]
Generation Embeddings ...: 100%|██████████| 162/162 [00:04<00:00, 34.78it/s]
Remove ritorno a capo ...: 100%|██████████| 130/130 [00:00<00:00, 235228.44it/s]
Remove StopWord ... : 100%|██████████| 130/130 [00:01<00:00, 106.84it/s]
Generation Embeddings ...: 100%|██████████| 130/130 [00:03<00:00, 34.79it/s]


Avvio la fase di addestramento attraverso la funzione my_train_test e visualizzando i risultati attraverso i plot. Vengono stampati a schermo i risultati di accuracy e di f1

In [None]:
train_loss, train_acc,train_f1, validation_loss, validation_acc, val_f1, test_acc, test_f1 = my_train_test(modelClassifier,
                                                                                            optimizer,
                                                                                            train_dataset,
                                                                                            val_dataset,
                                                                                            test_dataset,
                                                                                            criterion,
                                                                                            hyperparameters["epochs"],
                                                                                            early_stopping)


print("\n---------------\n")
print(f"Test Accuracy: {test_acc}\n")
print(f"Test f1: {test_f1}")
print("\n---------------\n")

training epch ...:  12%|█▏        | 119/1000 [00:21<02:26,  6.02it/s]

In [None]:
import matplotlib.pyplot as plt

plt.plot(train_loss, label='training loss')
plt.plot(validation_loss, label='validation loss')
#plt.plot(test_loss, label='test loss')
plt.legend(loc='upper right')
plt.ylim(0,1)
plt.show()

In [None]:
plt.plot(train_acc, label='training accuracy')
plt.plot(validation_acc, label='validation accuracy')
#plt.plot(test_acc, label='test accuracy')
plt.legend(loc='lower right')
plt.ylim(0,1)
plt.show()

##Grid Search

Grid Search con relativi plot. La Grid Search è stata implementata grazie alla libreria sklearn e ci ha permesso di individuare alcuni dei parametri migliori

In [None]:
from sklearn.model_selection import ParameterGrid
import matplotlib.pyplot as plt

# Definizione del set di iperparametri da testare
param_grid = {
    'learning_rate': [1e-3, 1e-5, 1e-6],
    'batch_size': [32, 64, 128],
    'dropout': [0.3, 0.4],
    'h_dim': [512, 768],
}

# Creazione di tutte le combinazioni di iperparametri
param_combinations = list(ParameterGrid(param_grid))

# Lista per salvare i risultati della grid search
results = []

# Ciclo attraverso tutte le combinazioni di iperparametri
for params in param_combinations:
    print("\nTesting with hyperparameters:", params)

    # Acquisiamo il device su cui effettueremo il training
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using {device} device")

    config = AutoConfig.from_pretrained(hyperparameters["language_model"])
    tokenizer = AutoTokenizer.from_pretrained(hyperparameters["language_model"])
    lm_model = AutoModel.from_pretrained(hyperparameters["language_model"], config=config).to(device)
    modelClassifier = MyClassifier(num_inputs=768, num_hidden=params["h_dim"], num_outputs=4, dropout=params["dropout"]).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(modelClassifier.parameters(), lr=params["learning_rate"])

    # Creiamo la callback di early stopping da passare al nostro metodo di addestramento
    early_stopping = EarlyStopping(patience=hyperparameters['patience'], min_delta=hyperparameters['min_delta'])

    # Addestramento del modello con i parametri correnti
    train_loss, train_acc, train_f1, val_loss, val_acc,val_f1, test_acc, test_f1 = my_train_test(
        modelClassifier, optimizer, train_dataset, val_dataset, test_dataset, criterion, 100, early_stopping
    )


    print("\n---------------\n")
    print(f"Test Accuracy: {test_acc}\n")
    print(f"Test f1: {test_f1}")
    print("\n---------------\n")

    #Plot LOSS
    plt.plot(train_loss, label='training loss')
    plt.plot(val_loss, label='validation loss')
    plt.legend(loc='upper right')
    plt.ylim(0,1)
    plt.show()

    #Plot Acc
    plt.plot(train_acc, label='training accuracy')
    plt.plot(val_acc, label='validation accuracy')
    plt.legend(loc='lower right')
    plt.ylim(0,1)
    plt.show()

    # Salvataggio dei risultati nella lista
    results.append({
        'params': params,
        'train_loss': train_loss,
        'train_acc': train_acc,
        'val_loss': val_loss[-1],  # Prendi l'ultimo valore della lista di loss di validazione
        'val_acc': val_acc[-1],    # Prendi l'ultimo valore della lista di accuracy di validazione
        'test_f1': test_f1,
        'test_acc': test_acc,
    })

# Trova la combinazione di iperparametri che ha ottenuto il miglior risultato sulla validazione
best_result = max(results, key=lambda x: x['val_acc'])

print("\nBest hyperparameters:", best_result['params'])
print("Validation Accuracy:", best_result['val_acc'])
print("Test Accuracy:", best_result['test_acc'])

Carico dataset di test per avviare la submission

In [None]:
df_test = pd.read_csv(root+"data/subtaskB_test.csv")

#subtask B : fase di test per la challenge

In [40]:
df_test = pd.read_csv(root+"data/subtaskB_test.csv")

In [None]:
emojis_list = [extract_most_common_emoji(commento) for commento in df_test.comment_text]


emojis_list = list(chain(*emojis_list))
emojis_list = list(set(emojis_list))

top_3_emoji = emojis_list

for id, commento in zip(df_test.index, df_test['comment_text']):
  # espressione regolare originale
  regex_pattern = r'\b\d{2}/\d{2}/\d{4}\b|\S+@\S+|\b\d{4}-\d{2}-\d{2}\b|https?://\S+|[^\w\s🇮🇹]'

  # Aggiungi la parte per mantenere le emoji in top_3_emoji, incluso "IT"
  nuovo_regex_pattern = f'{regex_pattern}|(?<=\s)({"|".join(re.escape(emoji) for emoji in top_3_emoji)})(?=\s)'


  # unisco le 2
  commento = re.sub(nuovo_regex_pattern, emoji_selezionata, commento)
  df_test.loc[id, 'comment_text'] = commento


print("Top 3 Emojis:", top_3_emoji)
print("Num Tot. Emoji: ", len(emojis_list))

Top 3 Emojis: ['⚡', '🇮🇹', '🤦\u200d♂', '😳', '🔻', '🤣', '💪', '🧠', '📚', '😃', '😎', '😂', '😱', '▶️', '✔️', '🚨', '🔽', '🗂', '📃', '🔗', '😬', '🌺', '↪️', '💥', '‼️', '😁', '👏', '☝️', '😨', '🫁', '📊', '✝️', '❤️', '⁉️', '🥳', '👊🏾', '👤', '👉', '😉', '🫀', '🇮🇱', '⚠️', '👀', '🎯', '❗', '🙏🏻', '❓', '🔴', '😊', '📝', '📄', '🇬🇧', '😛', '🙏', '🗞', '🙄']
Num Tot. Emoji:  56


In [42]:
import spacy

# Carica il modello italiano di SpaCy
nlp = spacy.load("it_core_news_sm")

for id, commento in zip(df_test.index, df_test['comment_text']):
  # Processa il testo con SpaCy
  doc = nlp(commento)

  #Unisci i vari token elaborati di una frase in un unica stringa
  comment_lem = ' '.join([token.lemma_ for token in doc])

  df_test.loc[id, 'comment_text'] = comment_lem

Definiamo quindi una classe MyDataset_test diversa da quella precedentemente definita. In questa classe infatti, l'unico input sarà il comment_text e non verranno calcolate le performance.

In [43]:
class MyDataset_test(torch.utils.data.Dataset):

  def __init__(self, x, stopwords):
    #rimuovo ritorni a capo
    x = [text.replace('\n\n', '').replace('\n', '').replace('\r', '') for text in tqdm(x, desc='Remove ritorno a capo ...')]

    # rimuovo le stopwords se sono presenti
    if stopwords:
      text_clean = []
      for sentence in tqdm(x, desc='Remove StopWord ... '):
        sentence = remove_stopwords(sentence)
        text_clean.append(sentence)
    else:
      text_clean = x

    # Conto il numero di uppercase e normalizzo il valore per la lunghezza della frase
    num_uppercase = []
    for sentence in tqdm(text_clean, desc="Count UpperCase ... "):
      count_words_ = count_words(sentence)
      count_uppercase_words_ = count_uppercase_words(sentence)

      if(count_words_ != 0 and count_words_ >= count_uppercase_words_):
        num_uppercase.append(count_uppercase_words_ / count_words_)
      else:
        num_uppercase.append(0)

    # genero gli embeddings
    embeddings_list = []
    for text in tqdm(text_clean, desc='Generation Embeddings ...'):
      tokens = tokenizer([text], add_special_tokens=True, return_tensors='pt', padding='max_length', max_length = 512, truncation=True)
      input_id_texts = tokens['input_ids'].squeeze(1).to(device)
      mask_texts = tokens['attention_mask'].squeeze(1).to(device)
      embeddings_list.append(gen_embeddings(input_id_texts, mask_texts))


    self.embeddings = embeddings_list

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    return self.embeddings[idx]

In [44]:
test = df_test['comment_text']

In [45]:
dataset = MyDataset_test(test.values, hyperparameters["stopwords"])

Remove ritorno a capo ...: 100%|██████████| 300/300 [00:00<00:00, 108604.45it/s]
Remove StopWord ... : 100%|██████████| 300/300 [00:03<00:00, 93.96it/s]
Count UpperCase ... : 100%|██████████| 300/300 [00:00<00:00, 9046.86it/s]
Generation Embeddings ...:   0%|          | 0/300 [00:00<?, ?it/s]


NameError: name 'device' is not defined

Attraverso la funzione my prediction_test definiamo come fatto precedentemente le performance del modello sul nuovo dataset di test.

In [None]:
def prediction_test(model, dataset):

  model.eval()

  all_predictions = []


  with torch.no_grad():
    for data_inputs, data_uppercase in dataset:

      # 1. Sposto i dati di input sul dispositivo
      data_inputs = data_inputs.unsqueeze(0).to(device)

      # 2. Eseguo il modello sui dati di input
      output = model(data_inputs)
      output = output.squeeze(0)


      # 4. Calcolo accuracy
      softmax = nn.LogSoftmax(dim=1)
      predictions= softmax(output).argmax(dim=1)

      # 5: Salva predizioni e etichette per calcolare la F1-score macro alla fine
      all_predictions.append(predictions)


  return all_predictions

In [None]:
list_prediction_test = prediction_test(modelClassifier, dataset)

In [None]:
list_prediction_test

In [None]:
lista_interi = [int(tensor.item()) for tensor in list_prediction_test]

In [None]:
lista_interi

In [None]:
#Impostazione del percorso del nuovo file CSV
percorso_file_output = root + "data/output_testB.csv"

Salviamo infine i risultati ottenuti su un file csv

In [None]:
import csv

# Salva la lista come file CSV
with open(percorso_file_output, 'w', newline='') as file:
  writer = csv.writer(file)
  writer.writerow(["id", "Expected"])
  writer.writerows([[i, valore] for i, valore in enumerate(lista_interi)])