In [1]:
import pandas as pd
import numpy as np
import os
from sklearn import preprocessing
from transformers import BertTokenizer, BertModel, BertPreTrainedModel, TrainingArguments, Trainer
from transformers import RobertaTokenizer, RobertaPreTrainedModel, RobertaModel, AutoTokenizer, AutoModel, PreTrainedModel
from transformers import TrainerCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch import optim
from transformers import TrainingArguments, AutoModel
from transformers import TrainerCallback, TrainerState, TrainerControl
import random


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
df = pd.read_pickle('AnnoMI-full-with-audio-cleaned-text.pkl')

In [4]:
df_client = df[df['interlocutor'] == 'client']
#df_therapist = df[df['interlocutor'] == 'therapist']

In [5]:
X = df_client['utterance_text']
y_text = df_client['client_talk_type']

# X = df_therapist['utterance_text']
# y_text = df_therapist['main_therapist_behaviour']

le = preprocessing.LabelEncoder()
le.fit(y_text)
print(f'Original classes {le.classes_}')
print(f'Corresponding numeric classes {le.transform(le.classes_)}')
y =le.transform(y_text)
print(f"X: {X.shape}")
print(f"y: {y.shape} {np.unique(y)}")

Original classes ['change' 'neutral' 'sustain']
Corresponding numeric classes [0 1 2]
X: (6338,)
y: (6338,) [0 1 2]


In [6]:
# Splitting
train_texts, val_texts, train_labels, val_labels = train_test_split(X, y, test_size=0.2, random_state=42)

In [7]:
tokenizer_twitter_sentiment = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
tokenizer_distilbert = AutoTokenizer.from_pretrained('distilbert-base-uncased')
tokenizer_goemotion = AutoTokenizer.from_pretrained('SamLowe/roberta-base-go_emotions')
tokenizer_roberta_large = AutoTokenizer.from_pretrained('roberta-large')

In [8]:
tokenizer = tokenizer_goemotion
train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding=True)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True)

In [9]:
class ClassificationDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = np.array(labels).astype('int')

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

class StopOnZeroLossCallback(TrainerCallback):
    def on_log(self, args, state: TrainerState, control: TrainerControl, logs=None, **kwargs):
        # Check if the training loss is exactly zero
        if logs.get("loss", 1) == 0: 
            print("Training loss reached zero, stopping training!")
            control.should_training_stop = True
    
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        log_prob = F.log_softmax(inputs, dim=-1)
        prob = torch.exp(log_prob)
        return F.nll_loss(
            ((1 - prob) ** self.gamma) * log_prob,
            targets,
            reduction=self.reduction
        )
    
# Function to compute f1_macro
def f1_macro(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=1)
    return {'f1_macro': f1_score(labels, predictions, average='macro')}

class ThresholdEarlyStoppingCallback(TrainerCallback):
    def on_evaluate(self, args, state, control, metrics, **kwargs):
        f1 = metrics['eval_f1_macro']
        if f1 > 0.6:
            control.should_training_stop = True
        return control
    
# A custom class that would stop the training if the validation loss rises for 5 consecutive epochs
class StopOnRisingValidationLossCallback(TrainerCallback):
    def __init__(self):
        self.counter = 0
        self.best_loss = np.inf
        
    def on_evaluate(self, args, state, control, metrics, **kwargs):
        val_loss = metrics['eval_loss']
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= 5:
                control.should_training_stop = True
        return control
# A custom class that would stop the training if the validation f1_macro go down for 5 consecutive epochs
class StopOnF1MacroCallback(TrainerCallback):
    def __init__(self):
        self.counter = 0
        self.best_f1 = 0
        
    def on_evaluate(self, args, state, control, metrics, **kwargs):
        f1 = metrics['eval_f1_macro']
        if f1 > self.best_f1:
            self.best_f1 = f1
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= 5:
                control.should_training_stop = True
        return control

class RobertaClassificationTwitter(nn.Module):
    def __init__(self, labels):
        super(RobertaClassificationTwitter, self).__init__()
        self.roberta = AutoModel.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.roberta.config.hidden_size

        # Adding an additional hidden layer
        self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
        # Adding L2 regularization (weight decay) to the hidden layer
        self.regularization = nn.LayerNorm(hidden_size//2)
        
        # Final classification layer with 3 classes
        self.classifier = nn.Linear(hidden_size//2, labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        
        # Passing through the hidden layer with ReLU activation
        hidden_output = self.hidden_layer(pooled_output)
        hidden_output = F.relu(hidden_output)
        
        # Applying Layer Normalization (regularization)
        hidden_output = self.regularization(hidden_output)
        
        logits = self.classifier(hidden_output)
        
        loss = None
        if labels is not None:
            loss_fn = FocalLoss(alpha=0.25, gamma=2)
            loss = loss_fn(logits, labels)
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}
    
# Same like RobertaClassificationTwitter_3 but with distilbert
class DistilbertClassification(nn.Module):
    def __init__(self, labels):
        super(DistilbertClassification, self).__init__()
        self.distilbert = AutoModel.from_pretrained('distilbert-base-uncased')
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.distilbert.config.hidden_size

        # Adding an additional hidden layer
        self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
        # Adding L2 regularization (weight decay) to the hidden layer
        self.regularization = nn.LayerNorm(hidden_size//2)
        
        # Final classification layer with 3 classes
        self.classifier = nn.Linear(hidden_size//2, labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.distilbert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[0][:, 0, :]
        pooled_output = self.dropout(pooled_output)
        
        # Passing through the hidden layer with ReLU activation
        hidden_output = self.hidden_layer(pooled_output)
        hidden_output = F.relu(hidden_output)
        
        # Applying Layer Normalization (regularization)
        hidden_output = self.regularization(hidden_output)
        
        logits = self.classifier(hidden_output)
        
        loss = None
        if labels is not None:
            loss_fn = FocalLoss(alpha=0.25, gamma=2)
            loss = loss_fn(logits, labels)
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

class RobertaClassificationGoEmotions(nn.Module):
    def __init__(self, labels):
        super(RobertaClassificationGoEmotions, self).__init__()
        self.roberta = AutoModel.from_pretrained('SamLowe/roberta-base-go_emotions')
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.roberta.config.hidden_size

        # Adding an additional hidden layer
        self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
        # Adding L2 regularization (weight decay) to the hidden layer
        self.regularization = nn.LayerNorm(hidden_size//2)
        
        # Final classification layer with 3 classes
        self.classifier = nn.Linear(hidden_size//2, labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[0][:, 0, :]
        pooled_output = self.dropout(pooled_output)
        
        # Passing through the hidden layer with ReLU activation
        hidden_output = self.hidden_layer(pooled_output)
        hidden_output = F.relu(hidden_output)
        
        # Applying Layer Normalization (regularization)
        hidden_output = self.regularization(hidden_output)
        
        logits = self.classifier(hidden_output)
        
        loss = None
        if labels is not None:
            loss_fn = FocalLoss(alpha=0.25, gamma=2)
            loss = loss_fn(logits, labels)
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

In [10]:
labels = len(le.classes_)
model = RobertaClassificationGoEmotions(labels=labels)

Some weights of RobertaModel were not initialized from the model checkpoint at SamLowe/roberta-base-go_emotions and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [11]:
train_dataset = ClassificationDataset(train_encodings, train_labels)
val_dataset = ClassificationDataset(val_encodings, val_labels)

In [12]:
model = model.to(device)

In [13]:
# Define training arguments and trainer

training_args = TrainingArguments(
    output_dir='./output_text_pretrained',
    per_device_train_batch_size=128,
    per_device_eval_batch_size=128,
    learning_rate=0.00001,
    num_train_epochs=1000,
    logging_dir='./logs',
    evaluation_strategy='steps',
    load_best_model_at_end=True,
    logging_steps=100,
    weight_decay=0.0001,
    lr_scheduler_type='cosine',  # Using a cosine scheduler
    warmup_steps=100  # Number of warmup steps
)


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=f1_macro,
    callbacks=[ThresholdEarlyStoppingCallback(), StopOnZeroLossCallback(), StopOnF1MacroCallback()],
)


# Train the model
trainer.train()
eval_results = trainer.evaluate()
print(eval_results)

Step,Training Loss,Validation Loss,F1 Macro
100,0.3728,0.313802,0.527283
200,0.2839,0.306421,0.578116
300,0.233,0.320606,0.585921
400,0.2,0.333208,0.586581
500,0.1687,0.350294,0.592778
600,0.145,0.379153,0.586471
700,0.1273,0.405334,0.589913
800,0.1172,0.425291,0.588822
900,0.11,0.438313,0.590596
1000,0.1026,0.446955,0.598399


{'eval_loss': 0.350293904542923, 'eval_f1_macro': 0.5927784645762696, 'eval_runtime': 2.2885, 'eval_samples_per_second': 554.079, 'eval_steps_per_second': 4.37, 'epoch': 37.5}


In [14]:
from sklearn.metrics import classification_report

# Get predictions
predictions, labels, _ = trainer.predict(val_dataset)
predictions = np.argmax(predictions, axis=1)

# Print classification report
print(classification_report(labels, predictions, target_names=le.classes_))

              precision    recall  f1-score   support

      change       0.59      0.49      0.54       332
     neutral       0.76      0.81      0.79       786
     sustain       0.45      0.46      0.45       150

    accuracy                           0.69      1268
   macro avg       0.60      0.59      0.59      1268
weighted avg       0.68      0.69      0.68      1268

