<a href="https://colab.research.google.com/github/mvassche/colab/blob/main/sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os

import torch
from transformers import TrainingArguments

## parameters ##

# Set GPU according to availability
os.environ["CUDA_VISIBLE_DEVICES"]= "1" # "0,1,2,3"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set the mode (train or inference)
mode = "inference" # options: "train", "inference" (of "trial" if you want to check some things without training/doing inference)

# If inference, set path to saved model, select inference mode (sentence or dataset) and give inference input (sentence or path to dataset)
if mode == "inference":
  inference_modelpath = "my_model2/checkpoint-128"
  inference_mode = "dataset" # options: "sentence", "dataset"
  inference_input = "Wow.. Ik ben echt heel bang. Nooit gedacht dat dit nog zou gebeuren."
  if inference_mode == "dataset":
    inference_input = "/home/luna/Demo/multi_cat_inference.txt"
    # Make sure to create a folder called 'predictions'
    #pred_path = "predictions/my_model2_checkpoint-128.txt" # enter prediction path manually
    pred_path = "predictions/" + inference_modelpath.split('/')[0] + "_" + inference_modelpath.split('/')[1] + ".txt" # or create file name based on model name

# Data file should be in tsv-format with in the first column the id, second column the text, and the third column the label.
# First line should be the column names: id\ttext\tlabel
# For multi-class single-label problems: labels should be converted to numbers from 0 to n
# For multi-class multi-label problems: labels should be converted to numbers from 0 to n and be separated by commas (e.g. 0,3)
# For regression problems: a value is given (either an integer or a float with a point as decimal separator)

data_paths = {"train": "/home/luna/transformers3_8/Demo/multi_cat_train.txt", "eval": "/home/luna/transformers3_8/Demo/multi_cat_train.txt"}
if mode == "inference" and inference_mode == "dataset":
  data_paths["inference"] = inference_input

# Specify the task: "single_label_classification", "multi_label_classification" or "regression"
task = "single_label_classification"

# Provide mapping for labels and numbers
if task != "regression":
  id2label = {0: "neutral", 1: "anger", 2: "fear", 3: "joy", 4: "love", 5: "sadness"}
  label2id = {"neutral": 0, "anger": 1, "fear": 2, "joy": 3, "love": 4, "sadness": 5}
else:
  id2label = None
  label2id = None


model_config = {
    "model_weights": "pdelobelle/robbert-v2-dutch-base", # check huggingface
    "num_labels": 6, # for single-label and multi-label classification: num_labels = n; for regression: num_labels = 1
    "max_length": 128,
    "device": device
}


training_args = TrainingArguments(
    output_dir="my_model2",
    learning_rate=5e-5,
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=4,
    weight_decay=0.01,
    evaluation_strategy="epoch", # set to "no" if you don't want to evaluate during training.
    save_strategy="epoch",
    save_total_limit=1 # Only keeps last checkpoint, deletes the older checkpoints. Comment out if you want to keep all checkpoints.
    #push_to_hub=False,
)

In [None]:
import torch
import numpy as np
import pandas as pd

from tqdm import tqdm


# huggingface libraries
from transformers import AutoTokenizer, AutoConfig, AutoModel, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer

from datasets import load_dataset

import evaluate
from sklearn.metrics import f1_score, mean_squared_error

# import options
from options import *


## Load and preprocess dataset ##

# Choose tokenizer (parameter: model_weights)
tokenizer = AutoTokenizer.from_pretrained(model_config["model_weights"])

# Load dataset with load_dataset() from huggingface datasets
dataset = load_dataset('csv', skiprows=1, data_files=data_paths, column_names = ['id', 'text', 'label'], delimiter='\t')


# Convert labels to one-hot encoding when task is multi-label classification
def one_hot(example, num_labels=model_config["num_labels"]):
  new_label = []
  for i in range(num_labels):
    if str(i) in example["label"].split(","):
      new_label.append(float(1))
    else:
      new_label.append(float(0))
  example["label"] = new_label
  return example

if task == "multi_label_classification":
  dataset = dataset.map(one_hot)


# Function for encoding (tokenizing) data
def encode_data(data):
  text = data["text"]
  label = data["label"]

  encoded_input = tokenizer(
                text,
                add_special_tokens=True,
                max_length= model_config["max_length"],
                padding= "max_length",
                return_overflowing_tokens=True,
                truncation=True
            )
  encoded_input["labels"] = label
  return encoded_input

# Encode full dataset using map() function
encoded_dataset = dataset.map(encode_data, batched=True)

# Make PyTorch tensors
encoded_dataset.set_format("torch")


## Choose an evaluation metric ##
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
  predictions, labels = eval_pred

  if task == "single_label_classification": # single-label classification
    predictions = np.argmax(predictions, axis=1)
    metric = accuracy.compute(predictions=predictions, references=labels)

  elif task == "multi_label_classification": # multi-label classification
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= 0.5)] = 1
    metric = {'f1': f1_score(y_true=labels, y_pred=y_pred, average='micro')}

  elif task == "regression":
    metric = {'mse': mean_squared_error(labels, predictions, squared=False)}

  return metric



## Choose model ##

if task != "regression":
  model = AutoModelForSequenceClassification.from_pretrained(model_config["model_weights"],
  problem_type=task, num_labels=model_config["num_labels"], id2label=id2label, label2id=label2id)
else:
  model = AutoModelForSequenceClassification.from_pretrained(model_config["model_weights"],
  problem_type=task, num_labels=model_config["num_labels"])



## Train and evaluate model ##

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["eval"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

if mode == "train":
  trainer.train()
  trainer.evaluate()



## Inference ##

if mode == "inference":
  tokenizer = AutoTokenizer.from_pretrained(inference_modelpath)
  model = AutoModelForSequenceClassification.from_pretrained(inference_modelpath)

  if inference_mode == "sentence":
    text = inference_input
    inputs = tokenizer(text, return_tensors="pt")

    with torch.no_grad(): # run model
      logits = model(**inputs).logits

      if task == "single_label_classification":
        predicted_class_id = logits.argmax().item()
        print(model.config.id2label[predicted_class_id])

      elif task == "multi_label_classification":
        # apply sigmoid + threshold of 0.5
        sigmoid = torch.nn.Sigmoid()
        probs = sigmoid(logits.squeeze().cpu())
        predictions = np.zeros(probs.shape)
        predictions[np.where(probs >= 0.5)] = 1
        # turn predicted id's into actual label names
        predicted_labels = [id2label[idx] for idx, label in enumerate(predictions) if label == 1.0]
        print(predicted_labels)

      elif task == "regression":
        print(logits.squeeze().detach().numpy())


  if inference_mode == "dataset":
    # test arguments for Trainer
    test_args = TrainingArguments(
        output_dir = training_args.output_dir,
        do_train = False,
        do_predict = True,
        per_device_eval_batch_size = training_args.per_device_train_batch_size,
        dataloader_drop_last = False
    )

    trainer = Trainer(
                  model = model,
                  args = test_args,
                  compute_metrics = compute_metrics)


    # Run trainer in prediction mode
    encoded_dataset["inference"] = encoded_dataset["inference"].remove_columns("label")
    prediction_output = trainer.predict(encoded_dataset["inference"])
    predictions = prediction_output[0]

    ids = dataset["inference"]["id"]
    texts = dataset["inference"]["text"]

    if task == "single_label_classification":
      preds = np.argmax(predictions, axis=1)
      preds = [model.config.id2label[pred] for pred in preds]
    elif task == "multi_label_classification":
      sigmoid = torch.nn.Sigmoid()
      probs = sigmoid(torch.Tensor(predictions))
      preds = np.zeros(probs.shape)
      preds[np.where(probs >= 0.5)] = 1
      preds = [[id2label[idx] for idx, label in enumerate(pred) if label == 1.0] for pred in preds]
    elif task == "regression":
      preds = [float(pred[0]) for pred in predictions]

    predictions_content = list(zip(ids, texts, preds))

    # write predictions to file
    f = open(pred_path, 'w')
    f.write("id\ttext\tprediction\n")
    for line in predictions_content:
      if task == "multi_label_classification":
        label = ','.join(line[2])
        f.write(str(line[0]) + '\t' + str(line[1]) + '\t' + label + '\n')
      else:
        f.write(str(line[0]) + '\t' + str(line[1]) + '\t' + str(line[2]) + '\n')
    f.close()
    print("\nPredictions saved in " + pred_path)


## Trial ##
# Check whether everything works
if mode == "trial":
  print(dataset["train"]["label"])
