<a href="https://colab.research.google.com/github/nheumann/nlp-sentiment-analysis/blob/main/emotions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#!pip install datasets transformers

In [2]:
from datasets import load_dataset
import pandas as pd
from transformers import Trainer, TrainingArguments, BertForSequenceClassification, DistilBertForSequenceClassification, AdamW, AutoTokenizer, PretrainedConfig, file_utils, TextClassificationPipeline, Pipeline
from torch.utils.data import DataLoader
import torch
import numpy as np



In [3]:
model_describ = "distilbert-base-cased"

In [4]:
data = load_dataset("go_emotions", "simplified")

train_raw = data['train']
valid_raw = data['validation']

train_df = train_raw.to_pandas()
valid_df = valid_raw.to_pandas()
test_df = data['test'].to_pandas()

Reusing dataset go_emotions (C:\Users\Niklasi5\.cache\huggingface\datasets\go_emotions\simplified\0.0.0\2637cfdd4e64d30249c3ed2150fa2b9d279766bfcd6a809b9f085c61a90d776d)


In [5]:
def one_hot_labels(df, n_labels):
    one_hot = np.zeros((len(df), n_labels), dtype=np.int)
    for i, row in enumerate(df["labels"].iteritems()):
        one_hot[i, row[1]] = 1
    return one_hot

n_labels = 28

train_oh_labels = one_hot_labels(train_df, n_labels)
valid_oh_labels = one_hot_labels(valid_df, n_labels)
test_oh_labels = one_hot_labels(test_df, n_labels)

In [6]:
tokenizer = AutoTokenizer.from_pretrained(model_describ)
#train = train.map(lambda e: tokenizer(e['text'], truncation=True, padding='max_length'), batched=True)
# we can still pass max length here
train_encodings = tokenizer(train_df['text'].values.tolist(), padding=True, truncation=True)#, is_split_into_words=True, return_offsets_mapping=True, padding=True, truncation=True)
val_encodings = tokenizer(valid_df['text'].values.tolist(), padding=True, truncation=True)#, is_split_into_words=True, return_offsets_mapping=True, padding=True, truncation=True)
test_encodings = tokenizer(test_df['text'].values.tolist(), padding=True, truncation=True)#, is_split_into_words=True, return_offsets_mapping=True, padding=True, truncation=True)

In [7]:
class IMDbDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = IMDbDataset(train_encodings, train_oh_labels)
val_dataset = IMDbDataset(val_encodings, valid_oh_labels)
test_dataset = IMDbDataset(test_encodings, test_oh_labels)

In [8]:
class MultilabelTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fct = torch.nn.BCEWithLogitsLoss()
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), 
                        labels.float().view(-1, self.model.config.num_labels))
        return (loss, outputs) if return_outputs else loss

In [9]:
labels = [
    "admiration",
    "amusement",
    "anger",
    "annoyance",
    "approval",
    "caring",
    "confusion",
    "curiosity",
    "desire",
    "disappointment",
    "disapproval",
    "disgust",
    "embarrassment",
    "excitement",
    "fear",
    "gratitude",
    "grief",
    "joy",
    "love",
    "nervousness",
    "optimism",
    "pride",
    "realization",
    "relief",
    "remorse",
    "sadness",
    "surprise",
    "neutral"
  ]
id2label = {i:label for i,label in enumerate(labels)}

# Training

In [10]:
output_dir = './results/models'
best_model_path = output_dir + "/best"

In [12]:
training_args = TrainingArguments(
    output_dir=output_dir,          # output directory
    num_train_epochs=1,              # total number of training epochs
    per_device_train_batch_size=16,  # batch size per device during training
    per_device_eval_batch_size=64,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=100,
    save_total_limit=10,
    evaluation_strategy="steps",
    load_best_model_at_end=True,     # Defaults to loss as criterion

)
model = DistilBertForSequenceClassification.from_pretrained(model_describ, num_labels = n_labels, id2label=id2label)

trainer = MultilabelTrainer(
    model=model,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=val_dataset             # evaluation dataset
)
trainer.train()
trainer.save_model(best_model_path)

Some weights of the model checkpoint at distilbert-base-cased were not used when initializing DistilBertForSequenceClassification: ['vocab_transform.weight', 'vocab_layer_norm.weight', 'vocab_transform.bias', 'vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_projector.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-cased and are newly initialized: ['pre_classifier.bias', 'classifier.bias', 'classifier.weigh

Step,Training Loss,Validation Loss
100,0.6052,0.367197
200,0.2457,0.167335
300,0.1609,0.150261
400,0.1506,0.144212
500,0.138,0.127688
600,0.1285,0.114903
700,0.1172,0.10982
800,0.1096,0.104713
900,0.1097,0.103508
1000,0.1086,0.102078


# Prediction

In [None]:
# load best model
model2 = DistilBertForSequenceClassification.from_pretrained(best_model_path)

In [None]:
# This is pretty much the source code from TextClassificationPipeline, but when i subclassed it
# didnt correctly work(not sure why, didn't try again at the end) so i just copied the whole code.
# We cant use TextClassification itself, since it only outputs the highest label
if file_utils.is_tf_available():
    from transformers.models.auto.modeling_tf_auto import TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING

if file_utils.is_torch_available():
    from transformers.models.auto.modeling_auto import MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING

class MultiLabelTextClassification(Pipeline):
    """
    Text classification pipeline using any :obj:`ModelForSequenceClassification`. See the `sequence classification
    examples <../task_summary.html#sequence-classification>`__ for more information.

    This text classification pipeline can currently be loaded from :func:`~transformers.pipeline` using the following
    task identifier: :obj:`"sentiment-analysis"` (for classifying sequences according to positive or negative
    sentiments).

    If multiple classification labels are available (:obj:`model.config.num_labels >= 2`), the pipeline will run a
    softmax over the results. If there is a single label, the pipeline will run a sigmoid over the result.

    The models that this pipeline can use are models that have been fine-tuned on a sequence classification task. See
    the up-to-date list of available models on `huggingface.co/models
    <https://huggingface.co/models?filter=text-classification>`__.
    """

    def __init__(self, return_all_scores: bool = False, **kwargs):
        super().__init__(**kwargs)

        self.check_model_type(
            TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING
            if self.framework == "tf"
            else MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING
        )

        self.return_all_scores = return_all_scores


    def __call__(self, *args, **kwargs):
        """
        Classify the text(s) given as inputs.

        Args:
            args (:obj:`str` or :obj:`List[str]`):
                One or several texts (or one list of prompts) to classify.

        Return:
            A list or a list of list of :obj:`dict`: Each result comes as list of dictionaries with the following keys:

            - **label** (:obj:`str`) -- The label predicted.
            - **score** (:obj:`float`) -- The corresponding probability.

            If ``self.return_all_scores=True``, one such dictionary is returned per label.
        """
        outputs = super().__call__(*args, **kwargs)


        scores = np.exp(outputs) / (1+np.exp(outputs))
        if self.return_all_scores:
            return [
                [{"label": self.model.config.id2label[i], "score": score.item()} for i, score in enumerate(item)]
                for item in scores
            ]
        else:
            return [
                {"label": self.model.config.id2label[item.argmax()], "score": item.max().item()} for item in scores
            ]

In [None]:
pipeline_config = {
    "return_all_scores":True,
    "device":0    
}
inference_pipeline = MultiLabelTextClassification(model=model2, tokenizer=tokenizer, **pipeline_config)

In [None]:
def analyze_result(result, threshold = 0.5):
    """Sort the results and throw away all labels with prediction under threshold"""
    output = []
    for sample in result:
        sample = np.array(sample)
        scores = np.array([label['score'] for label in sample])
        predicted_samples = np.argwhere(scores > threshold).reshape(-1)
        output.append(sorted(sample[predicted_samples], key = lambda item: item['score'], reverse=True))
    return output

In [None]:
x = inference_pipeline(["O M G"])
analyze_result(x, .2)