<h1>Emotion Classification</h1>
<p>This notebook consists simple code used for emotion classification in the form of a simple sequence classification task using RoBERTa-base from HuggingFace</p>
<p>The notebook has both the code for training followed by inference on the evaluation set. Please look out for the comments prior to each cell</p>

In [None]:
# !pip install transformers[torch]
# !pip install datasets
# !pip install accelerate -U
# !pip install ipywidgets

In [1]:
with open('/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/data/text/Subtask_1_train.json', "r") as f:
  import json
  data_ = json.load(f)
print(len(data_))
from transformers import RobertaTokenizer, RobertaForSequenceClassification
from transformers import Trainer, TrainingArguments
import torch
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, random_split

batch_size =34
print(batch_size)

1374


<h1>Re-implementing the RoBERTa for Sequence classification class</h1>
<p>We Re-implement the Huggingface RobertaForSequenceClassification class to account for the class imbalance amongst the emotional utterances. We do so by passing the scaling factor to the CrossEntropyLoss function in the forward method, the idea being to balance the prediction across less representative classes as well</p>

In [3]:
_CHECKPOINT_FOR_DOC = "roberta-base"
_CONFIG_FOR_DOC = "RobertaConfig"
import random
seed = 12654 
random.seed(seed)
random.shuffle(data_)
sep = '</s>'
from transformers.models.roberta.modeling_roberta import RobertaClassificationHead
from typing import List, Optional, Tuple, Union
from transformers import RobertaPreTrainedModel, RobertaModel
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers.utils import (
    add_code_sample_docstrings,
    add_start_docstrings,
    add_start_docstrings_to_model_forward,
    logging,
    replace_return_docstrings,
)
from transformers.models.roberta.modeling_roberta import (
    ROBERTA_INPUTS_DOCSTRING,
    ROBERTA_START_DOCSTRING,
    RobertaEmbeddings
)
from transformers.modeling_outputs import SequenceClassifierOutput
@add_start_docstrings(
    """
    RoBERTa Model transformer with a sequence classification/regression head on top (a linear layer on top of the
    pooled output) e.g. for GLUE tasks.
    """,
    ROBERTA_START_DOCSTRING,
)
class RobertaForSequenceClassificationCustom(RobertaPreTrainedModel):
    def __init__(self, config, pos_weight):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.roberta = RobertaModel(config, add_pooling_layer=False)
        self.classifier = RobertaClassificationHead(config)
        self.pos_weight = pos_weight
        print(pos_weight)
        # Initialize weights and apply final processing
        self.post_init()

    @add_start_docstrings_to_model_forward(ROBERTA_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint="cardiffnlp/twitter-roberta-base-emotion",
        output_type=SequenceClassifierOutput,
        config_class=_CONFIG_FOR_DOC,
        expected_output="'optimism'",
        expected_loss=0.08,
    )
    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.roberta(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        sequence_output = outputs[0]
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            # move labels to correct device to enable model parallelism
            labels = labels.to(logits.device)
            weights = self.pos_weight.to(logits.device) if self.pos_weight else -1
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    print("single_label_classification")
                    self.config.problem_type = "single_label_classification"
                else:
                    print("multi_label_classification")
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                if weights!=-1:
                    loss_fct = CrossEntropyLoss(weight=weights)
                else:
                    loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss(pos_weight=self.pos_weight)
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

<h1>Data set class</h1>

In [4]:
class EmotionDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels=None):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels:
            item['labels'] = torch.tensor(self.labels[idx])
        else:
            item['labels'] = -1
        return item

    def __len__(self):
        return len(self.labels) if self.labels is not None else self.encodings['input_ids'].shape[0]

<h1> Methods that supplement the training and testing </h1>
<h2>get_data() method</h2>
<p>get_data(data_trial, train = False) is used to create the input prompt for the classifier</p>
<h2>f1_calc() method</h2>
<p>f1_calc(y_true, y_pred, average = 'weighted') is used to calculate runtime metrics during training</p>
<h2>compute_metrics() method</h2>
<p>compute_metrics(p) is the primary hook to calculate runtime metrics during training</p>


In [None]:
def get_data(data_trial, train = False):
    ds = []
    if not train:
        print(f"get data - train? {train}")
        ds = get_data_test(data_trial)
        return ds, -1
    
    counter = {'anger': 0, 'disgust': 0, 'fear': 0, 'joy': 0, 'sadness': 0, 'surprise': 0, 'neutral': 0}
    for x in data_trial:
        utt_all = ' '.join(conv['text'] for conv in x['conversation'])
#         utt_all =  x['conversation']
        for utt_i in x['conversation']:
            conversation_ID = x['conversation_ID']
            utt_id = utt_i['utterance_ID']
            counter[utt_i['emotion']]+=1
            u_i = utt_i['text']
            utt_all_causal = " "
            
#             ll = []
#             for idx,cnv in enumerate(utt_all):
# #                 if (idx+1) <=utt_id:
# #                     ll.append(cnv['speaker']+" : "+cnv['text'])
# #                 ll.append(cnv['speaker']+" : "+cnv['text'])
#                 ll.append(cnv['text'])
#             utt_all_causal = ".".join(ll)   
#             text = f'{u_i} {sep} {utt_all_causal}'
            text = f'{u_i} {sep} {utt_all}'
            ds.append({'id':f'{conversation_ID}_{utt_id}', 'label':utt_i['emotion'], 'text':text})
    return ds, counter

def get_data_test(data_trial):
    ds = []
    for x in data_trial:
        utt_all =  x['conversation']
        for utt_i in x['conversation']:
            conversation_ID = x['conversation_ID']
            utt_id = utt_i['utterance_ID']
            u_i = utt_i['text']
            utt_all_causal = " "
            ll = []
            for idx,cnv in enumerate(utt_all):
                ll.append(cnv['text'])
            utt_all_causal = ".".join(ll)   
            text = f'{u_i} {sep} {utt_all_causal}'
            ds.append({'id':f'{conversation_ID}_{utt_id}', 'label':None, 'text':text})
    return ds

def get_datan2(data_trial):
    ds = []
    counter = {'anger': 0, 'disgust': 0, 'fear': 0, 'joy': 0, 'sadness': 0, 'surprise': 0, 'neutral': 0}
    for x in data_trial:
        utt_all = ' '.join(conv['text'] for conv in x['conversation'])
#         utt_all =  x['conversation']
        conversation_ID = x['conversation_ID']
        
        
        
        for utt_i in x['conversation']:
            
            utt_i_id = utt_i['utterance_ID']
            u_i = utt_i['text']
#             counter[utt_i['emotion']]+=1
            for utt_j in x['conversation']:
                utt_j_id = utt_j['utterance_ID']
                u_j = utt_j['text']
                text = f'{u_i} {sep} {u_j} {sep} {utt_all}'
#             text = f'{u_i} {sep} {utt_all}'
                ds.append({'id':f'{conversation_ID}_{utt_i_id}_{utt_j_id}', 'label': utt_i.get('emotion', -1), 'text':text})
    return ds, counter

from sklearn.metrics import f1_score
def f1_calc(y_true, y_pred, average = 'weighted'):
        
        emotion_idx = dict(zip(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"], range(7)))
        inv_map = {v: k for k, v in emotion_idx.items()}

#         datapy = np.array([ (emotion_idx[x['predicted_emotion']] ,emotion_idx[x['gold_emotion']]) for x in output[key]])
#         y_pred,y_true  = datapy[:,0], datapy[:,1]
        f1_scores = f1_score(y_true, y_pred, average=average)
#         print(f'{file}, f1: {f1_scores}')
        f1_scores_n = f1_score(y_true, y_pred, average=None, labels=[0, 1, 2, 3, 4, 5, 6])
        
        score_map = {}
        for label, score in zip(range(7), f1_scores_n):
            l = inv_map[label]
            score_map[f'f1_{l}'] = score
        score_map['weighted_f1']=f1_scores
        print(score_map)
        return score_map


def compute_metrics(p):
    """
    Customize the `compute_metrics` of `transformers`
    Args:
        - p (tuple):      2 numpy arrays: predictions and true_labels
    Returns:
        - metrics (dict): f1 score on
    """
    metrics = {}
    predictions, true_labels = p
    f1_scores_n = f1_calc(true_labels, predictions.argmax(-1))
    metrics= f1_scores_n
    return metrics

def _add_whitespace_after_punctuations(txt):
    n = len(txt)
    punctuations = [
        ',',
        '!',
        '?',
        '.',
        ';',
        '$',
        '&',
        '"',
        '...'
    ]

    if n < 3:
        return txt

    if txt[-1] in punctuations:
        txt = f'{txt[:-1]} {txt[-1]}'
    if txt[0] in punctuations:
        txt = f'{txt[0]} {txt[1:]}'

    inner = txt[1: -1]

    for char in punctuations:
        inner = inner.replace(char, f' {char} ')

    txt = f'{txt[0]}{inner}{txt[-1]}'
    # removing extra whitespaces.
    txt = txt.replace('  ', ' ').upper()

    return txt

In [4]:
# sep = tokenizer.special_tokens_map['sep_token']
# Example dataset
labels = []
total_size = len(data_)
train_size = int(total_size * 0.7) # 80% for training
eval_size = int(total_size * 0.2) # remaining 10% for eval
trial_size = total_size - train_size - eval_size # remaining 10% for testing
print(f'{train_size} {eval_size} {trial_size}')
data_train = data_[:train_size]
data_eval = data_[train_size:train_size+eval_size]
data_trial = data_[train_size+eval_size: train_size+eval_size+trial_size+1]


# vals = {}
# for x in coversations:
#     vals[x['text']] = x['emotion'] 
# print(len(conversations))
# print(len(labels))
# texts = conversations
# labels = labels  # Your actual dataset will be much larger

# Initialize tokenizer and model


tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

train_ds,counter =  get_data(data_train)
train_label_encoder = LabelEncoder()
train_label_encoder.fit(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"])
train_texts, train_labels  =  zip(*[(x['text'], x['label']) for x in train_ds])
train_encoded_labels = train_label_encoder.transform(train_labels)
train_encodings = tokenizer(train_texts, padding=True, truncation=True, return_tensors="pt")

eval_ds,_ = get_data(data_eval)
eval_label_encoder = LabelEncoder()
eval_label_encoder.fit(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"])
eval_texts, eval_labels  =  zip(*[(x['text'], x['label']) for x in eval_ds])
eval_encoded_labels = eval_label_encoder.transform(eval_labels)
eval_encodings = tokenizer(eval_texts, padding=True, truncation=True, return_tensors="pt")

trial_ds,cc = get_data(data_trial)
trial_label_encoder = LabelEncoder()
trial_label_encoder.fit(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"])
trial_texts, trial_labels  =  zip(*[(x['text'], x['label']) for x in trial_ds])
trial_encoded_labels = trial_label_encoder.transform(trial_labels)
trial_encodings = tokenizer(trial_texts, padding=True, truncation=True, return_tensors="pt")

print(f'{len(train_ds)} {len(eval_ds)} {len(trial_ds)}')
# print(f'{train_ds}')
# Convert to Dataset object
# class EmotionDataset(torch.utils.data.Dataset):
#     def __init__(self, encodings, labels):
#         self.encodings = encodings
#         self.labels = labels

#     def __getitem__(self, idx):
#         item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
#         item['labels'] = torch.tensor(self.labels[idx])
#         return item

#     def __len__(self):
#         return len(self.labels)

train_dataset = EmotionDataset(train_encodings, train_encoded_labels)
trial_dataset = EmotionDataset(trial_encodings, trial_encoded_labels)
eval_dataset = EmotionDataset(eval_encodings, eval_encoded_labels)


total_samples = sum(counter.values())
pos_weight = torch.tensor([total_samples / counter[class_name] for class_name in counter]).to('cuda')

# tensors_on_gpus = [pos_weight.to(f'cuda:{i}') for i in range(torch.cuda.device_count())]

model = RobertaForSequenceClassificationCustom.from_pretrained('roberta-base', num_labels=7, pos_weight = pos_weight)

961 274 139


Some weights of RobertaForSequenceClassificationCustom were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


9408 2743 1468


In [5]:
len(trial_ds)

1468

In [6]:
print(f'{len(trial_encoded_labels)}')

1468


In [7]:
#sample input to the model
train_ds[1]

{'id': '1281_2',
 'label': 'neutral',
 'text': 'Okay . </s> We will do a quick check ..Okay ..So , eight days late huh ?.Yeah ..You must be a little uncomfortable ..Eh , just a tad ..You are about 80 percent effaced , so you are on your way ..It still could last a little while longer ..If you are anxious there are a few ways to help things along ..Do them ! !.Actually , they are things you can do . Just some home remedies , but in my experience I have found that some of them are quite effective ..Well , we are ready to try anything ..Okay , there is an herbal tea you can drink ..Okay .'}

In [8]:
#class distribution across classes in the train set
counter

{'anger': 1101,
 'disgust': 280,
 'fear': 259,
 'joy': 1654,
 'sadness': 790,
 'surprise': 1244,
 'neutral': 4080}

In [9]:
# label weights
pos_weight

tensor([ 8.5450, 33.6000, 36.3243,  5.6880, 11.9089,  7.5627,  2.3059],
       device='cuda:0')

In [11]:
# Split the dataset
# Create DataLoaders for training and testing
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=batch_size, shuffle=False)
trial_loader = DataLoader(trial_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Training arguments
training_args = TrainingArguments(
    output_dir='./results_30_epochs_data_leak_corrected_shuffled_ui_uall_custom_roberta_base_weighted_1',
    num_train_epochs=20,
    per_device_train_batch_size=batch_size,
    warmup_steps=500,
    weight_decay=0.01,
    learning_rate=5e-5,
    logging_dir='./logs',
    logging_steps=10,
    save_strategy='epoch',
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model='weighted_f1',
    log_level='critical',
    evaluation_strategy="epoch",
    seed=12345
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_loader.dataset,
    eval_dataset=eval_loader.dataset,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()

  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


single_label_classification




Epoch,Training Loss,Validation Loss,F1 Anger,F1 Disgust,F1 Fear,F1 Joy,F1 Sadness,F1 Surprise,F1 Neutral,Weighted F1
1,1.5042,1.381559,0.449275,0.0,0.0,0.099792,0.656131,0.0,0.0,0.360411
2,1.2556,1.210467,0.412541,0.169014,0.122249,0.457711,0.752589,0.291262,0.014963,0.487236
3,1.0267,1.123763,0.501951,0.161074,0.18845,0.488746,0.759673,0.382514,0.479574,0.582117
4,0.7793,1.147696,0.507463,0.172727,0.206349,0.540965,0.753591,0.385542,0.424812,0.581105
5,0.7278,1.243983,0.488318,0.218182,0.227848,0.496894,0.733149,0.433915,0.507614,0.581043
6,0.4326,1.412905,0.504854,0.22093,0.24,0.555294,0.76198,0.472727,0.599144,0.62147
7,0.3551,1.5024,0.499295,0.251309,0.242038,0.551412,0.747279,0.436548,0.562033,0.606395
8,0.2798,1.679076,0.496434,0.150943,0.186047,0.55814,0.764291,0.450199,0.532051,0.606644
9,0.2024,1.929004,0.461087,0.200873,0.188406,0.541906,0.714099,0.428,0.613402,0.589309


{'f1_anger': 0.4492753623188406, 'f1_disgust': 0.0, 'f1_fear': 0.0, 'f1_joy': 0.0997920997920998, 'f1_sadness': 0.6561307901907357, 'f1_surprise': 0.0, 'f1_neutral': 0.0, 'weighted_f1': 0.36041077095337065}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.41254125412541254, 'f1_disgust': 0.16901408450704225, 'f1_fear': 0.12224938875305623, 'f1_joy': 0.4577114427860697, 'f1_sadness': 0.7525891829689298, 'f1_surprise': 0.2912621359223301, 'f1_neutral': 0.014962593516209476, 'weighted_f1': 0.48723635692321604}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.5019505851755527, 'f1_disgust': 0.1610738255033557, 'f1_fear': 0.1884498480243161, 'f1_joy': 0.4887459807073955, 'f1_sadness': 0.7596726190476191, 'f1_surprise': 0.3825136612021858, 'f1_neutral': 0.47957371225577267, 'weighted_f1': 0.5821173242308864}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.5074626865671642, 'f1_disgust': 0.17272727272727273, 'f1_fear': 0.20634920634920634, 'f1_joy': 0.5409652076318743, 'f1_sadness': 0.7535911602209945, 'f1_surprise': 0.3855421686746988, 'f1_neutral': 0.424812030075188, 'weighted_f1': 0.5811046064622154}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.4883177570093458, 'f1_disgust': 0.21818181818181817, 'f1_fear': 0.22784810126582278, 'f1_joy': 0.4968944099378882, 'f1_sadness': 0.7331493890421757, 'f1_surprise': 0.4339152119700748, 'f1_neutral': 0.5076142131979695, 'weighted_f1': 0.5810430402619057}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.5048543689320388, 'f1_disgust': 0.22093023255813954, 'f1_fear': 0.24, 'f1_joy': 0.5552941176470588, 'f1_sadness': 0.761980198019802, 'f1_surprise': 0.4727272727272727, 'f1_neutral': 0.5991440798858774, 'weighted_f1': 0.6214698366315993}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.4992947813822285, 'f1_disgust': 0.2513089005235602, 'f1_fear': 0.24203821656050956, 'f1_joy': 0.5514124293785311, 'f1_sadness': 0.747279322853688, 'f1_surprise': 0.4365482233502538, 'f1_neutral': 0.5620328849028401, 'weighted_f1': 0.6063949298633993}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.4964336661911555, 'f1_disgust': 0.1509433962264151, 'f1_fear': 0.18604651162790697, 'f1_joy': 0.5581395348837209, 'f1_sadness': 0.7642913077525451, 'f1_surprise': 0.450199203187251, 'f1_neutral': 0.532051282051282, 'weighted_f1': 0.6066438302416444}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


{'f1_anger': 0.461086637298091, 'f1_disgust': 0.20087336244541484, 'f1_fear': 0.18840579710144928, 'f1_joy': 0.5419058553386912, 'f1_sadness': 0.7140986468790921, 'f1_surprise': 0.428, 'f1_neutral': 0.6134020618556701, 'weighted_f1': 0.5893093619418969}


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


In [153]:
with open('/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/data/text/Subtask_1_test.json', "r") as f:
# with open('dataset_1701871028.json', "r") as f:
  import json
  data_test_ = json.load(f)
print(len(data_test_))

batch_size =34
print(batch_size)

665
34


In [154]:

# print(f'{train_ds}')
# Convert to Dataset object
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
test_final_ds,test_final_counter =  get_datan2(data_test_)
# test_final_ds,test_final_counter =  get_data(data_test_)

# test_final_label_encoder = LabelEncoder()
# test_final_label_encoder.fit(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"])
# test_final_texts, test_final_labels  =  zip(*[(x['text'], x['label']) for x in test_final_ds])
test_final_texts  =  [x['text'] for x in test_final_ds]
# test_final_encoded_labels = test_final_label_encoder.transform(test_final_labels)
test_final_encodings = tokenizer(test_final_texts, padding=True, truncation=True, return_tensors="pt")

test_final_dataset = EmotionDataset(test_final_encodings, None)
test_final_loader = DataLoader(test_final_dataset, batch_size=batch_size, shuffle=False)
print(f'{len(test_final_ds)}')

81885


In [121]:
test_enc = LabelEncoder()
test_enc.fit(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"])

<h1> Creating enriched dataset </h1>
<p> At this step we enrich the original dataset with emotions at utterance level from the trained model to pass to the span extractor </p>

In [155]:
from copy import deepcopy
enriched_data = deepcopy(data_test_)

In [156]:
conv_id_mapping = {data['conversation_ID']: idx for idx, data in enumerate(enriched_data)}

In [157]:
for data in enriched_data:
    data['emotion-cause_pairs'] = []

In [158]:
# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_1_n_square_with_speaker_custom_roberta_base/checkpoint-7868/"
# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_ui_uall_custom_roberta_base_weighted/checkpoint-2679/"
# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_shuffled_ui_uall_custom_roberta_base_weighted/checkpoint-834"
# Initialize tokenizer and modelcode/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected/checkpoint-3990/config.json
# tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_shuffled_ui_uall_custom_roberta_base_weighted/checkpoint-834"

# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_ui_uall_custom_roberta_base_weighted/checkpoint-2820"
# saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected/checkpoint-665"
saved_path = "/workspace/SpanBERT/code/redundant/MultiModalEmotionCauseAnalysis/v2/results_30_epochs_data_leak_corrected_1_n_square_with_speaker_custom_roberta_base/checkpoint-7868"
base_path = os.path.join(*os.path.split(saved_path)[:-1])

model = RobertaForSequenceClassificationCustom.from_pretrained(saved_path, num_labels=7, pos_weight=None).cuda()

# if torch.cuda.device_count() > 1:
#     print(f"Using {torch.cuda.device_count()} GPUs!")
#     model = torch.nn.DataParallel(model)

# Move your model to GPU
# model.cuda()

dev = model.device


print(len(test_final_loader.dataset))

n = test_final_loader.batch_size

with torch.no_grad():
    fp = []
    cpunter = 0
    
    for idx, x in enumerate(test_final_loader):
        input_ids = x['input_ids'].to(dev)
        attention_mask = x['attention_mask'].to(dev)
        logits = model(input_ids=input_ids, attention_mask=attention_mask, labels=None).logits.cpu()
        labels = test_enc.inverse_transform(logits.argmax(-1).tolist())
        
        for i, label in enumerate(labels):
            idx_offset = idx * n + i
            conv_utt_id = test_final_ds[idx_offset]['id']
            conv_id, utt_id = conv_utt_id.split('_')[:2]
            data_idx = conv_id_mapping[int(conv_id)]
            
            enriched_data[data_idx]['emotion-cause_pairs'].append([f'{utt_id}_{label}'])
    print("done inferring")
    
with open(os.path.join(base_path, 'enriched_data.json'), 'w') as f:
    json.dump(enriched_data, f, indent=4)

None
81885


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  item = {key: torch.tensor(val[idx]) for key, val in self.encod

done inferring


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}


In [22]:
import numpy as np
y_true = np.array([x['label'] for x in fp])
y_pred = np.array([x['pred_label'] for x in fp])

In [20]:
f1_scores_n = f1_score(y_true, y_pred, average=None, labels=[0, 1, 2, 3, 4, 5, 6])
emotion_idx = dict(zip(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"], range(7)))
inv_map = {v: k for k, v in emotion_idx.items()}
for i,x in enumerate(f1_scores_n):
    print(f'{inv_map[i]} - {x}')

anger - 0.48021108179419525
disgust - 0.22580645161290322
fear - 0.21875
joy - 0.5583333333333333
sadness - 0.7091327705295472
surprise - 0.38235294117647056
neutral - 0.6018957345971564


In [21]:
y_pred.shape

(20631,)

In [22]:
torch.ones(7)

tensor([1., 1., 1., 1., 1., 1., 1.])

In [23]:
f1_scores_n = f1_score(y_true, y_pred, average=None, labels=[0, 1, 2, 3, 4, 5, 6])
emotion_idx = dict(zip(["anger", "disgust", "fear", "joy", "sadness", "surprise", "neutral"], range(7)))
inv_map = {v: k for k, v in emotion_idx.items()}
for i,x in enumerate(f1_scores_n):
    print(f'{inv_map[i]} - {x}')

anger - 0.48253968253968255
disgust - 0.20454545454545456
fear - 0.19047619047619047
joy - 0.5949367088607594
sadness - 0.7406318883174137
surprise - 0.43902439024390244
neutral - 0.5862068965517241


In [51]:
for idx, x in enumerate(trial_loader):
    break
print(idx)

0


  item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
