In [1]:
%%capture
!pip install transformers

In [2]:
from transformers import AutoTokenizer, AutoModelForMaskedLM,BertForTokenClassification

from google.colab import drive

from torch.utils.data import Dataset, DataLoader
from torch.optim import SGD
from pandas import DataFrame

from tqdm import tqdm
import pandas as pd
import numpy as np
import os
import torch
import time

drive.mount('/content/drive')
folder = "drive/Othercomputers/Il mio Laptop/Universita/[IA] Artificial Intelligence/[HLT] Human Language Technologies/project/"
bert  = "dbmdz/bert-base-italian-xxl-cased"

# start_time = time.time()
# print("--- %s seconds ---" % (time.time() - start_time))

Mounted at /content/drive


In [3]:
tokenizer = AutoTokenizer.from_pretrained(bert)

dataset = pd.read_csv(folder+"Sources/dataset.csv")
length = len(dataset)

df_train, df_val, df_test = np.split(dataset.sample(frac=1, random_state=42),[int(.8 * length), int(.9 * length)])

Downloading:   0%|          | 0.00/59.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/433 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/235k [00:00<?, ?B/s]

In [4]:
labels_to_ids = {'B-ACTI': 0, 'B-BODY': 1, 'B-DISO': 2, 'B-DRUG': 3, 'B-SIGN': 4, 'B-TREA': 5, 'I-ACTI': 6,
                 'I-BODY': 7, 'I-DISO': 8, 'I-DRUG': 9, 'I-SIGN': 10, 'I-TREA': 11, 'O': 12}

class MyDataset(torch.utils.data.Dataset):

  def __init__(self, dataset: DataFrame):

    self.texts, self.labels = [], []

    for _,row in dataset.iterrows():
      token_text = tokenizer(row[0], padding='max_length', max_length = 512, truncation=True, return_tensors="pt")
      label_ids = self.align_label(token_text.word_ids(),row[1])

      self.texts.append(token_text)
      self.labels.append(label_ids)

  def __len__(self):
    return len(self.labels)
    
  def __getitem__(self, idx):
    return self.texts[idx], torch.LongTensor(self.labels[idx])

  def align_label(self, token: list, labels: str):
    
    labels = labels.split() # Trasforming a string of label into array

    # We take into cosideration the previous word to identify if the id is already seen
    previous_id = None
    label_ids = [] # Aligned labels

    # We can all ids in the token and we try to associate to a label  
    for word_idx in token:
  
      if word_idx is None: # typically when we encounter [CLS]
        label_ids.append(-100)
      else:
        try: # We try to associate a label
          label_ids.append(labels_to_ids[labels[word_idx]])
        except:
          label_ids.append(-100)
  
      previous_id = word_idx 

    return label_ids

In [5]:
class BertModel(torch.nn.Module):

    def __init__(self):
        super(BertModel, self).__init__()
        self.bert = AutoModelForMaskedLM.from_pretrained(bert,num_labels=13)
        
    def forward(self, input_id, mask, label):
        output = self.bert(input_ids=input_id, attention_mask=mask, labels=label, return_dict=False)
        return output

In [6]:
def train(model, df_train: DataFrame, df_val: DataFrame, batch_size: int, lr: float, epochs: int):

  tr = DataLoader(MyDataset(df_train), num_workers=2, batch_size=batch_size, shuffle=True)
  vl = DataLoader(MyDataset(df_val), num_workers=2, batch_size=batch_size)

  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")

  optimizer = SGD(model.parameters(), lr=lr)

  if use_cuda:
    model = model.cuda()

  for e in range(epochs):

    total_acc_train = 0
    total_loss_train = 0

    model.train()

    for tr_text, tr_label in tqdm(tr):
      
      tr_label = tr_label.to(device)
      mask = tr_text['attention_mask'].squeeze(1).to(device)
      input_id = tr_text['input_ids'].squeeze(1).to(device)

      optimizer.zero_grad()
      loss, logits = model(input_id, mask, tr_label)

      for i in range(logits.shape[0]):

        logits_clean = logits[i][tr_label[i] != -100]
        label_clean = tr_label[i][tr_label[i] != -100]

        predictions = logits_clean.argmax(dim=1)
        acc = (predictions == label_clean).float().mean()
        total_acc_train += acc
        total_loss_train += loss.item()

      loss.backward()
      optimizer.step()

    model.eval()

    total_acc_val = 0
    total_loss_val = 0

    for val_data, val_label in vl:

      val_label = val_label.to(device)
      mask = val_data['attention_mask'].squeeze(1).to(device)
      input_id = val_data['input_ids'].squeeze(1).to(device)

      loss, logits = model(input_id, mask, val_label)

      for i in range(logits.shape[0]):

        logits_clean = logits[i][val_label[i] != -100]
        label_clean = val_label[i][val_label[i] != -100]

        predictions = logits_clean.argmax(dim=1)
        acc = (predictions == label_clean).float().mean()
        total_acc_val += acc
        total_loss_val += loss.item()

    val_accuracy = total_acc_val / len(df_val)
    val_loss = total_loss_val / len(df_val)

    print(
          f'Epochs: {e + 1} | Loss: {total_loss_train / len(df_train): .3f} | Accuracy: {total_acc_train / len(df_train): .3f} | Val_Loss: {total_loss_val / len(df_val): .3f} | Accuracy: {total_acc_val / len(df_val): .3f}')


In [None]:
model = BertModel()

train(model,df_train, df_val,2,5e-3,1)

Downloading:   0%|          | 0.00/445M [00:00<?, ?B/s]

Some weights of the model checkpoint at dbmdz/bert-base-italian-xxl-cased were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  cpuset_checked))
  0%|          | 139/27972 [00:34<1:44:00,  4.46it/s]