# Tarea Individual Módulo 6 - Parte 2

El entorno virtual de Google Colab tiene instaladas la mayoría de las librerías
que se suelen utilizar en problemas de IA. Sin embargo, necesitamos instalar la
librería transformers de Huggingface y la librería tqdm que es una librería que nos permite mostrar una barra de progreso cuando utlizamos bucles for.

In [None]:
!pip install transformers
!pip install tqdm

# 0. Imports

En la sección `Imports` agrupamos todas las librerías y clases que debemos importar en esta tarea.

In [2]:
# import basic libraries for data science
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# import torch and transformers
import torch
from torch import cuda
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# import different metrics for evaluation
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report, accuracy_score, f1_score

# import tqdm to track progress
from tqdm import tqdm

# import some python modules required for some of the utility functions
import itertools
import re

# 1. Utils

En la sección `Utils` se encuentran varias funciones de utilidad con sus docstrings que se utilizarán a largo de la implementación para preprocesar y limpiar datos y también evaluar los resultados.

In [3]:
################################
# text processing and cleaning #
################################

def encode_sentiment(sentiment):
  """
  Label encode sentiment.

  Parameters
  ----------
  sentiment: str, sentiment {"positive", "sentiment"}

  Returns
  -------
  int, 1's for positive and 0's for negative

  """
  if sentiment == "Positive":
    return 1
  else:
    return 0

def is_ascii(w):
  """
  Check if character is ascii type.

  Parameters
  ----------
  w: str, character

  Returns
  -------
  bool, True if character is ascii. False otherwise.
  """
  try:
    w.encode("ascii")
    return True
  except UnicodeEncodeError:
    return False

def text_cleaning(text):
  """
  Clean text from symbols, punctuation, etc.

  Parameters
  ----------
  text: string, text data
  
  Returns
  -------
  cleaned_text: string, cleaned text data
  """
  # remove string formatting '\n' or '\t'
  tmp_text = re.sub(r'\n+', '. ', text)
  tmp_text = re.sub(r'\t+', '. ', text)
  # remove words with non-ascii characters
  tmp_text = " ".join([word for word in tmp_text.split() if is_ascii(word)])
  # remove email address
  tmp_text = " ".join([word for word in tmp_text.split() if not word.startswith("@")])
  # remove urls
  tmp_text = re.sub(r'http\S+', '', tmp_text, flags=re.MULTILINE)
  tmp_text = re.sub(r'www\S+', '', tmp_text, flags=re.MULTILINE)
  # remove punctuation but . (to split sentences)
  cleaned_text = re.sub('[^A-Za-z.,]+', ' ', tmp_text)
  # lowercase
  cleaned_text = cleaned_text.lower()

  return cleaned_text

##############
# Evaluation #
##############

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label',fontsize=15)
    plt.xlabel('Predicted label',fontsize=15)

# 2. Config

En la sección `Config` definimos variables variables y objetos que utilizaremos en el desarrollo y queremos tener agrupados en el mismo lugar.

In [None]:
# Defining some key variables that will be used later on
MAX_LEN = 256 # max number of tokens
BATCH_SIZE = 64
SEED = 42 # for reprodudible results

# Initialiaze model tokenizer
TOKENIZER = AutoTokenizer.from_pretrained('distilbert-base-uncased-finetuned-sst-2-english')

# Setting up the device for GPU usage if available
DEVICE = 'cuda' if cuda.is_available() else 'cpu'
print(DEVICE)

# 3. Data

En la sección `Data` cargamos el dataset en un dataframe de pandas y codificamos la columna sentimiento.

Además, procedemos a la limpieza del texto utlizando la función `text_cleaning()`.

In [None]:
# load dataset
raw_data = pd.read_csv(
    "/content/drive/MyDrive/Colab Notebooks/MASTERS/DEPORTE/DATA/football_tweets_train.csv", \
    encoding='utf-8'
    ).drop("id", axis=1)
raw_data.head()

In [None]:
print(f"Number of tweets: {raw_data.shape[0]}")

In [None]:
raw_data.polarity.value_counts()

Queremos comparar el desempeño del modelo utilizado en la parte 1 de la tarea. Este modelo estaba ajustado para una tarea de clasificación binaria. Sin embargo, vemos como este dataset esta preparado para un problema de clasificación multiclase.

Nos quedaremos únicamente con los ejemplos positivos y negativos para poder realizar la comparación.

In [None]:
# get rid of neutral examples
data_subset = raw_data.loc[raw_data.polarity != "Neutral"]
data_subset.head()

In [None]:
# encode sentiment
data_subset['polarity'] = data_subset['polarity'].map(encode_sentiment)
data_subset.head()

In [None]:
### clean tweets
cleaned_data = data_subset.copy().reset_index(drop=True)
cleaned_data["text"] = cleaned_data["text"].map(text_cleaning)
cleaned_data.head()

# 4. Dataset and Dataloader

En la sección `Dataset and Dataloader` vamos a crear un dataset de Pytorch del tipo map-iterable que utilizaremos para almacenar los datos y procesarlos de acuerdo a los requirimientos del modelo.

Después creamos un objeto `DataLoader` de Pytorch que toma el dataset para poder pasar ejemplos al modelo en lotes o batches.

__Clase DistilBertDataset__

La clase `DistilBertDataset` toma como argumentos un dataframe con los datos, el tokenizador del modelo y el número de máximo de tokens que definimos en la sección `Config`.

El método `__getitem__()` toma como parámetro un índice de los datos en el dataset, extrae el ejemplo correspondiente del dataframe y utiliza la función `encode_plus()` del tokenizador para procesar los inputs de acuerdo a los requerimientos de DistilBERT ( special tokens [CLS] y [SEP], truncation, padding, etc.). Este método devulce un diccionario de python con los tensores que continen los inputs ids, los attention masks y los el sentimiento codificado.

El método `__len__()` simplemente devuelve el número de ejemplos en el dataset.

__DataLoader__

El `DataLoader` viene a ser un generador que divide el dataset en lotes de tamaño `BATCH_SIZE` para alimentar el modelo.

En los parámetros podemos definir el tamaño de lote o batch size, si seleccionar de forma aleatoria los ejemplos a incluir en el batch o no (recomendable al realizar un entrenamiento) y también el número de workers (2 para Google Colab)

In [11]:
class DistilBertDataset(Dataset):
  """Custom pytorch map-iterable dataset for sentiment analysis with DistilBERT."""
  def __init__(self, dataframe, tokenizer, max_len):
      self.len = len(dataframe)
      self.data = dataframe
      self.tokenizer = tokenizer
      self.max_len = max_len

  def __getitem__(self, index):
      text = str(self.data['text'].loc[index])
      text = " ".join(text.split())
      inputs = self.tokenizer.encode_plus(
          text,
          None,
          add_special_tokens=True,
          max_length=self.max_len,
          padding='max_length',
          return_token_type_ids=False,
          truncation=True
      )
      ids = inputs['input_ids']
      mask = inputs['attention_mask']
      
      return {
          'ids': torch.tensor(ids, dtype=torch.long),
          'mask': torch.tensor(mask, dtype=torch.long),
          'targets': torch.tensor(self.data['polarity'].loc[index], dtype=torch.long)
      }

  def __len__(self):
      return self.len

In [12]:
# create dataset
dataset = DistilBertDataset(cleaned_data, TOKENIZER, MAX_LEN)

In [None]:
print("---- Visually inspecting 5th element ----")
print(f"Input ids: {dataset[6]['ids']}")
print(f"Attention masks: {dataset[6]['mask']}")
print(f"Target: {dataset[6]['targets']}")
print("------------------------------------------")

In [14]:
# define dataloader params
dataloader_params = {'batch_size': BATCH_SIZE,
                'shuffle': False,
                'num_workers': 2
                }

# create dataloader
data_loader = DataLoader(dataset, **dataloader_params)

# 5. Model

En la sección `Model` creamos la clase DistilBERTClass() con el modelo ya ajustado que se encuentra en el hub de modelos de Huggingface.

El método `forward()` de la clase toma los inputs ids y attention mask que devuelve el método `__getitem__()` de la clase `DistilBertDataset`.

In [15]:
class DistilBERTClass(torch.nn.Module):
  """Custom class for DilstilBERT model for Sequence Classification."""
  def __init__(self):
      super(DistilBERTClass, self).__init__()
      self.model = AutoModelForSequenceClassification \
        .from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")

  def forward(self, input_ids, attention_mask):
      output = self.model(input_ids=input_ids, attention_mask=attention_mask) 
      logits = output.logits
      
      return logits

# 6. Inference

En la sección `Inference` vamos a utilizar el modelo para realizar inferencia y obtener los sentimientos de los tweets.

Utilizamos la función `inference()` para obtener los outputs.


In [None]:
# Download and load trained DistilBERT model
model = DistilBERTClass()
model.to(DEVICE)

In [17]:
def inference(data_loader, model, device):
    """
    Binary classification using DistilBERT model.

    Parameters
    ----------
    data_loader: Pytorch DataLoader object
    model: DistilBERTClass Object
    device: str, device

    Returns
    -------
    lists, outputs logits and targets
    """
    # put model in evaluation mode
    model.eval()
    # create lists to be populated with predictions and corresponding targets
    fin_targets = []
    fin_outputs = []
    # do not calculate gradients as not required for inference
    with torch.no_grad():
        # loop over batches and get predictions
        for bi, d in tqdm(enumerate(data_loader), total=len(data_loader)):
            ids = d['ids']
            mask = d['mask']
            targets = d['targets']

            # send them to the cuda device we are using
            ids = ids.to(device, dtype=torch.long)
            mask = mask.to(device, dtype=torch.long)
            targets = targets.to(device, dtype=torch.long)
            # get outputs logits
            outputs = model(
                input_ids=ids,
                attention_mask=mask
            )
            # Normalize logits and store results and targets in lists
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets

In [None]:
# run inference -> sentiment analysis
outputs, targets = inference(
    data_loader=data_loader,
    model=model,
    device=DEVICE
)

In [19]:
# convert normalized logits to sentiment -> Criteria: most probable class
outputs = np.argmax(outputs, axis=1)

# 7. Evaluation

En la sección `Evaluation` calculamos métricas típicas de un poblema de clasificación binario: Accuracy y F1 Score.

También graficamos la matriz de confusión utlizando la función de utilidad `plot_confusion_matrix()`.

Finalmente, hacemos una pequeña evaluación visual de los inputs, los targets y las predicciones.

In [None]:
accuracy = accuracy_score(targets, outputs)
f_score = f1_score(targets, outputs, average='macro')
print(f"Training Accuracy Score = {accuracy}")
print(f"Training F1-Score = {f_score}")

In [None]:
# classification report
print(classification_report(targets, outputs, target_names=["Negative", "Positive"]))

In [None]:
# print the confusion matrix
cnf_matrix = confusion_matrix(targets, outputs, labels=[0, 1])
plt.figure(figsize=(8,6))
plot_confusion_matrix(
    cnf_matrix,classes=['Negative','Positive'],
    normalize=True,
    title='Confusion matrix'
)

In [None]:
#visual evalution
for tweet, cleaned_tweet, target, output in zip(data_subset.text.values[:10], cleaned_data.text.values[:10], targets, outputs):
  print(f"Original text: {tweet}")
  print(f"Text: {cleaned_tweet}")
  print(f"Target: {target}\tOutput: {output}\n")

# Celdas para explorar resultados