In [1]:
#Set up the environment
!pip install -q transformers datasets

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
#Download the accelarator
!pip install transformers[torch] -U

Collecting accelerate>=0.21.0 (from transformers[torch])
  Downloading accelerate-0.27.2-py3-none-any.whl (279 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m280.0/280.0 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: accelerate
Successfully installed accelerate-0.27.2


In [None]:
#Import the library
import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Import pandas
import pandas as pd

# Read the CSV file into a DataFrame
file_path = '/content/drive/My Drive/Sentiment Analysis/data.csv'
df = pd.read_csv(file_path)

In [None]:
# Check the shape of data
df.shape

(10000, 14)

In [None]:
# Split the dataset into training and validation sets
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

# Create Dataset objects
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)

# Create DatasetDict object
dataset = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset
})

dataset = dataset.remove_columns("__index_level_0__")
dataset = dataset.remove_columns("Unnamed: 0")
# Display the dataset
dataset

DatasetDict({
    train: Dataset({
        features: ['ID', 'Tweet', 'Optimistic', 'Thankful', 'Empathetic', 'Pessimistic', 'Anxious', 'Sad', 'Annoyed', 'Denial', 'Official report', 'Surprise', 'Joking'],
        num_rows: 8000
    })
    validation: Dataset({
        features: ['ID', 'Tweet', 'Optimistic', 'Thankful', 'Empathetic', 'Pessimistic', 'Anxious', 'Sad', 'Annoyed', 'Denial', 'Official report', 'Surprise', 'Joking'],
        num_rows: 2000
    })
})

In [None]:
# Check the sample data
example = dataset['train'][0]
example

{'ID': 1245204293461966849,
 'Tweet': 'This quarantine got me watching everyone’s snap story without skipping any 😂😭 #COVID19 #QuaratineLife #StayAtHomePlease #StayAtHome',
 'Optimistic': 1,
 'Thankful': 0,
 'Empathetic': 0,
 'Pessimistic': 0,
 'Anxious': 0,
 'Sad': 1,
 'Annoyed': 0,
 'Denial': 0,
 'Official report': 0,
 'Surprise': 0,
 'Joking': 0}

In [None]:
#It create a mapping relationship betweeen label and its index
labels = [label for label in dataset['train'].features.keys() if label not in ['ID', 'Tweet']]
id2label = {idx:label for idx, label in enumerate(labels)}
label2id = {label:idx for idx, label in enumerate(labels)}
labels

['Optimistic',
 'Thankful',
 'Empathetic',
 'Pessimistic',
 'Anxious',
 'Sad',
 'Annoyed',
 'Denial',
 'Official report',
 'Surprise',
 'Joking']

In [None]:
#Use the AutoTokenizer API to tokenize the data, define the function to process the data including making sure all the data having the same length
#create a array to store the key number for each lable
from transformers import AutoTokenizer
import numpy as np

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def preprocess_data(examples):
  # take a batch of texts
  text = examples["Tweet"]
  # encode them
  encoding = tokenizer(text, padding="max_length", truncation=True, max_length=128)
  # add labels
  labels_batch = {k: examples[k] for k in examples.keys() if k in labels}
  # create numpy array of shape (batch_size, num_labels)
  labels_matrix = np.zeros((len(text), len(labels)))
  # fill numpy array
  for idx, label in enumerate(labels):
    labels_matrix[:, idx] = labels_batch[label]

  encoding["labels"] = labels_matrix.tolist()

  return encoding

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
#Process all the data with the given function
encoded_dataset = dataset.map(preprocess_data, batched=True, remove_columns=dataset['train'].column_names)

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

In [None]:
#Check the features
example = encoded_dataset['train'][0]
print(example.keys())

dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'labels'])


In [None]:
#Check the example
tokenizer.decode(example['input_ids'])

'[CLS] this quarantine got me watching everyone ’ s snap story without skipping any [UNK] # covid19 # quaratinelife # stayathomeplease # stayathome [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD]'

In [None]:
#Check the example's labels
example['labels']

[1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]

In [None]:
#Return the label
[id2label[idx] for idx, label in enumerate(example['labels']) if label == 1.0]

['Optimistic', 'Sad']

In [None]:
#We set the format of out data into standard PyTorch datasets
encoded_dataset.set_format("torch")

In [None]:
#Load the model for training
from transformers import AutoModelForSequenceClassification

model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased",
                                                           problem_type="multi_label_classification",
                                                           num_labels=len(labels),
                                                           id2label=id2label,
                                                           label2id=label2id)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
#We define the batch size and the metrics to evaluate the model
batch_size = 8
metric_name = "f1"

In [None]:
#Define the training hyperparamerters, for example, we want to evaluate after every epoch of training and save the results every epoch, leaning rate and the batch size
# and the number of epochs to train for
from transformers import TrainingArguments, Trainer

args = TrainingArguments(
    f"bert-finetuned-sem_eval-english",
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model=metric_name,
    #push_to_hub=True,
)

In [None]:
#Define the function to evaluate the results
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from transformers import EvalPrediction
import torch

def multi_label_metrics(predictions, labels, threshold=0.5):
    # first, use sigmoid on predictions which are of shape (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    # finally, compute metrics
    y_true = labels
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average='micro')
    roc_auc = roc_auc_score(y_true, y_pred, average = 'micro')
    accuracy = accuracy_score(y_true, y_pred)
    # return as dictionary
    metrics = {'f1': f1_micro_average,
               'roc_auc': roc_auc,
               'accuracy': accuracy}
    return metrics

def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions,
            tuple) else p.predictions
    result = multi_label_metrics(
        predictions=preds,
        labels=p.label_ids)
    return result

In [None]:
#Check the label type
encoded_dataset['train'][0]['labels'].type()

'torch.FloatTensor'

In [None]:
#Check the encoding example
encoded_dataset['train']['input_ids'][0]

tensor([  101,  2023, 24209, 20486, 10196,  2288,  2033,  3666,  3071,  1521,
         1055, 10245,  2466,  2302, 25978,  2151,   100,  1001,  2522, 17258,
        16147,  1001, 24209, 25879,  3170, 15509,  1001,  2994,  8988,  8462,
        10814, 11022,  1001,  2994,  8988,  8462,   102,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0])

In [None]:
#Performa a forward pass to model
outputs = model(input_ids=encoded_dataset['train']['input_ids'][0].unsqueeze(0), labels=encoded_dataset['train'][0]['labels'].unsqueeze(0))
outputs

We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.


SequenceClassifierOutput(loss=tensor(0.7206, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>), logits=tensor([[-0.2891, -0.1942,  0.1110, -0.1462,  0.2849, -0.2215, -0.2619,  0.5243,
         -0.1114, -0.4090,  0.0934]], grad_fn=<AddmmBackward0>), hidden_states=None, attentions=None)

In [None]:
#Start to train the model
trainer = Trainer(
    model,
    args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()

Epoch,Training Loss,Validation Loss,F1,Roc Auc,Accuracy
1,0.3515,0.335522,0.484526,0.666281,0.2


Epoch,Training Loss,Validation Loss,F1,Roc Auc,Accuracy
1,0.3515,0.335522,0.484526,0.666281,0.2


In [None]:
#We evaluate the model
trainer.evaluate()

***** Running Evaluation *****
  Num examples = 886
  Batch size = 8


{'epoch': 5.0,
 'eval_accuracy': 0.2799097065462754,
 'eval_f1': 0.7096149188665537,
 'eval_loss': 0.31915703415870667,
 'eval_roc_auc': 0.805805895058838,
 'eval_runtime': 4.7187,
 'eval_samples_per_second': 187.766,
 'eval_steps_per_second': 23.524}

In [None]:
#Test a model on a new sentence
text = "I'm happy I can finally train a model for multi-label classification"

#Tokenize the text
encoding = tokenizer(text, return_tensors="pt")

#Move the tensor to the device where the model is loaded
encoding = {k: v.to(trainer.model.device) for k,v in encoding.items()}

#Pass the device-adapted input to the pre-trained model
outputs = trainer.model(**encoding)

In [None]:
#The logits is a tensor that contains the scores for every individual label.
logits = outputs.logits
logits.shape

torch.Size([1, 11])

In [None]:
# apply a sigmoid function independently to every score, such that every score is turned into a number between 0 and 1,
#that can be interpreted as a "probability" for how certain the model is that a given class belongs to the input text.
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(logits.squeeze().cpu())
predictions = np.zeros(probs.shape)
#we use a threshold (typically, 0.5) to turn every probability into either a 1 or a 0
predictions[np.where(probs >= 0.5)] = 1
# turn predicted id's into actual label names
predicted_labels = [id2label[idx] for idx, label in enumerate(predictions) if label == 1.0]
print(predicted_labels)

['joy', 'optimism']


In [None]:
#Define the place to save the trained model
output_model_dir = '/content/drive/My Drive/Colab Notebooks/Fine Tuned BERT Model'

In [None]:
#Save the model
trainer.model.save_pretrained(output_model_dir)

In [None]:
# Load the trained BERT model
from transformers import BertForSequenceClassification

loaded_model = BertForSequenceClassification.from_pretrained(output_model_dir)

In [None]:
#Use the trained model to make prediction
text = "I'm happy and thanks!"

encoding = tokenizer(text, return_tensors="pt")
encoding = {k: v.to(trainer.model.device) for k,v in encoding.items()}

outputs = trainer.model(**encoding)

In [None]:
#The logits is a tensor that contains the scores for every individual label.
logits = outputs.logits
logits.shape

torch.Size([1, 10])

In [None]:
# apply sigmoid + threshold
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(logits.squeeze().cpu())
predictions = np.zeros(probs.shape)
predictions[np.where(probs >= 0.5)] = 1
# turn predicted id's into actual label names
predicted_labels = [id2label[idx] for idx, label in enumerate(predictions) if label == 1.0]
print(predicted_labels)

['Optimistic', 'Thankful']
