In [1]:
import sys
import torch
import pickle
import argparse
import pandas as pd

# sys.path.append('../')

from pathlib import Path
from tqdm.notebook import tqdm
from sklearn.metrics import classification_report
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM

In [2]:
model_id = "unsloth/llama-3-8b-Instruct-bnb-4bit"
k = 20

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

embedding_tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
embedding_model = AutoModel.from_pretrained("google-bert/bert-base-uncased").to(device)



In [4]:
inference_tokenizer = AutoTokenizer.from_pretrained(model_id, padding='left', padding_side='left')
#inference_tokenizer = AutoTokenizer.from_pretrained(model_id, padding_side='left')
inference_tokenizer.pad_token = inference_tokenizer.eos_token
terminators = [inference_tokenizer.eos_token_id, inference_tokenizer.convert_tokens_to_ids("<|eot_id|>")]

generation_model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

Unused kwargs: ['_load_in_4bit', '_load_in_8bit', 'quant_method']. These kwargs are not used in <class 'transformers.utils.quantization_config.BitsAndBytesConfig'>.


In [5]:
def get_utterance_embeddings(df):    
    
    
    utterance_embed_d = {}

    for utterance in tqdm(df.utterance):
        # print(utterance)
        while True:
            try:
                inputs = embedding_tokenizer(utterance, return_tensors="pt").to(device)
                output = embedding_model(**inputs)
                embedding = output[1][0].squeeze().cpu()
                utterance_embed_d[utterance] = embedding.detach().numpy()
                break
            except Exception as e:
                print(e)
                
    return utterance_embed_d

In [6]:
emotion_map = {
    'AN': 'anger',
    'DI': 'disgust',
    'FE': 'fear',
    'SA': 'sadness',
    'SU': 'surprise',
    'JO': 'joy'
}

def extract_emotions(row):

    emotion_str = row.emotion

    if emotion_str == 'Neutral':
        return ['neutral']

    emotions = emotion_str.split('-')
    tags = []

    for emotion in emotions:
        abbrev = emotion[:2]  # Get the abbreviation
        value_part = emotion[2:]  # Get the value part
        
        # Ensure that the value part is a valid integer and abbrev is in the emotion_map
        if abbrev in emotion_map and value_part.isdigit():
            value = int(value_part)
            if value > 0:
                tags.append(emotion_map[abbrev].lower())
        else:
            print(f"Warning: Skipping invalid emotion entry: '{emotion}'")
    return tags  

In [39]:
CURRENT_DIR = Path.cwd()
EAC_DIR = Path(CURRENT_DIR) / "emotion_analysis_comics"
ICL_DIR = Path(CURRENT_DIR) / "incontext_learning"
DATASET_DIR = Path(EAC_DIR) / "dataset_files"
OUTPUT_DIR = Path(EAC_DIR) / "incontext_learning" / "results" / f"comics35_icl_{model_id.split('/')[1]}"

In [40]:
OUTPUT_DIR

PosixPath('/Utilisateurs/umushtaq/emotion_analysis_comics/incontext_learning/results/comics35_icl_llama-3-8b-Instruct-bnb-4bit')

In [8]:
df = pd.read_csv(DATASET_DIR / "comics_dataset.csv")
df['emotions_list'] = df.apply(lambda row: extract_emotions(row), axis=1)

In [9]:
utterance_embed_d = get_utterance_embeddings(df)
df['utterance_embedding'] = df.utterance.apply(lambda x: utterance_embed_d[x])

train_df = df[df.split == "TRAIN"].reset_index(drop=True)
test_df = df[df.split == "TEST"].reset_index(drop=True)

  0%|          | 0/7129 [00:00<?, ?it/s]

In [10]:
import torch
import random
from operator import itemgetter
import torch.nn.functional as F

device = 'cuda' if torch.cuda.is_available() else 'cpu'

def get_k_neighbours(k, utterance, train_df, test_df):
    
    
    test_utterance_embedding = test_df[test_df.utterance == utterance]["utterance_embedding"].values[0]
    #test_utterance_embedding = torch.tensor(test_utterance_embedding)#.to(device)

    utterance_embed_d = {}
    for e in train_df.iterrows():
        if e[1].utterance not in utterance_embed_d:
            #utterance_embed_d[e[1].utterance] = e[1].utterance_embedding
            utterance_embed_d[e[1].utterance] = e[1].utterance_embedding#.to(device)

    # train_titles = set(df[df.split == 'TRAIN'].title.unique())
    train_utterances = set(train_df.utterance)

    dist_l = []
    for t, v in utterance_embed_d.items():
        if t in train_utterances:
            # d = cos_sim(title_embed_d[title], v)
            d = F.cosine_similarity(torch.tensor(test_utterance_embedding), torch.tensor(v), dim=0)
            dist_l.append((t, d.item()))

    sorted_dist_l = sorted(dist_l, key=itemgetter(1), reverse=True)
    
    return sorted_dist_l[0: k]

def prepare_similar_example_prompts(utterance, k, train_df, test_df, seed=33):
    """
    Create a part of prompt made of k examples in the train set, whose topic is most similar to a given title.
    """

    random.seed(seed)

    neighbours_l = get_k_neighbours(2*k, utterance, train_df=train_df, test_df=test_df) # Fetch the 2*k closest neighbors
    # print(neighbours_l)
    sampled_neighbours_l = random.sample(neighbours_l, k) # Only keep k of them
    # bprint(sampled_neighbours_l)

    prompt = ''
    cnt = 0
    for i, (utterance, dist) in enumerate(sampled_neighbours_l):
        prompt += f'EXAMPLE {i+1}\n'

        example_df = train_df[train_df.utterance == utterance]
        # example_df = example_df[example_df.aty != 'none'].reset_index()
        
        class_l = []
        for k in example_df.iterrows():
            
            # if k[0] == 0:

            #     prompt += f'# Abstract:\n{example_df.iloc[0].utterance}\n\n# Arguments:\n'
            #     cnt = 0
                
            # prompt += f'Argument {cnt + 1}={k[1].text} - Class={k[1].aty}\n'
            prompt += f'Input: {k[1].utterance}'
            class_l.append(k[1].emotions_list)
            cnt += 1
            
        prompt += '\nOutput: '
        prompt += '{' + ', '.join([f'"list_emotion_classes": "{class_l[i]}"' for i in range(len(class_l))]) + '}'
        prompt += '\n\n'

    return prompt

In [11]:
def build_instruction():
    
    emotion_classes = ["anger", "disgust", "fear", "sadness", "surprise", "joy", "neutral"]
    formatted_classes = ", ".join([f'"{emotion}"' for emotion in emotion_classes])
    
    instruction = f"""### Emotion Analysis Expert Role

You are an advanced emotion analysis expert specializing in comic book dialogue interpretation. Your task is to analyze utterances and identify their emotional content.

INPUT:
- You will receive a single utterance from a comic book
- The utterance may express one or multiple emotions
- You will receive {k} example utterances and their emotion classifications
- Given example utterances and their emotion classifications, analyze the new utterance following the same pattern

TASK:
1. Carefully analyze the emotional context and tone of the utterance
2. Identify applicable emotions from the following classes:
   {formatted_classes}

OUTPUT REQUIREMENTS:
- Format: JSON object with a single key "list_emotion_classes"
- Value: Array of one or more emotion classes as strings
- Example: {{"list_emotion_classes": ["anger", "fear"]}}

IMPORTANT NOTES:
- Do not include any explanations in the output, only the JSON object

"""
    return instruction

In [12]:
instruction = build_instruction()

In [13]:
sys_msg_l = []
task_msg_l = []

In [14]:
# test_df = test_df[:10]

In [15]:
test_df.shape

(1326, 13)

In [16]:
for row in tqdm(test_df.iterrows(), total=len(test_df)):
    
    sys_msg = {"role": "user", "content": instruction + "EXAMPLES:\n\n" + prepare_similar_example_prompts(row[1].utterance, k, train_df=train_df, test_df=test_df)}
    #sys_msg = {"role":"system", "content": "### Task description: You are an expert biomedical assistant that takes 1) an abstract text, 2) the list of all arguments from this abstract text, and must classify all arguments into one of two classes: Claim or Premise. " + proportion_desc + " You must absolutely not generate any text or explanation other than the following JSON format {\"Argument 1\": <predicted class for Argument 1 (str)>, ..., \"Argument n\": <predicted class for Argument n (str)>}\n\n### Class definitions:" + " Claim = " + claim_fulldesc + " Premise = " + premise_fulldesc + "\n\n### Examples:\n\n" + prepare_similar_example_prompts(title_l[i], experiment_df, k=3, seed=seed)}  # Sample by similar title
    task_msg = {"role": "assistant", "content": f"Now classify this utternace:\nInput: {row[1].utterance}\nOutput: "}
    
    sys_msg_l.append(sys_msg)
    task_msg_l.append(task_msg)

  0%|          | 0/1326 [00:00<?, ?it/s]

In [17]:
print(sys_msg_l[0]['content'])

### Emotion Analysis Expert Role

You are an advanced emotion analysis expert specializing in comic book dialogue interpretation. Your task is to analyze utterances and identify their emotional content.

INPUT:
- You will receive a single utterance from a comic book
- The utterance may express one or multiple emotions
- You will receive 20 example utterances and their emotion classifications
- Given example utterances and their emotion classifications, analyze the new utterance following the same pattern

TASK:
1. Carefully analyze the emotional context and tone of the utterance
2. Identify applicable emotions from the following classes:
   "anger", "disgust", "fear", "sadness", "surprise", "joy", "neutral"

OUTPUT REQUIREMENTS:
- Format: JSON object with a single key "list_emotion_classes"
- Value: Array of one or more emotion classes as strings
- Example: {"list_emotion_classes": ["anger", "fear"]}

IMPORTANT NOTES:
- Do not include any explanations in the output, only the JSON objec

In [18]:
print(task_msg_l[0]['content'])

Now classify this utternace:
Input: TIME TO FACE OUR FEARS, PEOPLE…
Output: 


In [19]:
prepared_sys_task_msg_l = []

for i in range(len(sys_msg_l)):
    prepared_sys_task_msg_l.append([sys_msg_l[i], task_msg_l[i]])

In [20]:
def batch_tensor(tensor, batch_size):
    return [tensor[i:i+batch_size] for i in range(0, tensor.size(0), batch_size)]

In [21]:
inputs = inference_tokenizer.apply_chat_template(
            prepared_sys_task_msg_l,
            #pad_token = inference_tokenizer.eos_token,
            padding=True,
            truncation=True,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt",
)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [22]:
BATCH_SIZE = 16

input_ids_batches = batch_tensor(inputs['input_ids'], BATCH_SIZE) # type: ignore
attention_mask_batches = batch_tensor(inputs['attention_mask'], BATCH_SIZE) # type: ignore

In [23]:
generated_outputs = []

for i, (input_ids_batch, attention_mask_batch) in tqdm(enumerate(zip(input_ids_batches, attention_mask_batches)), total=len(input_ids_batches)):
    
    print(f"\n\n ***** Processing batch {i + 1} *****\n\n")
    
    inputs = {
        'input_ids': input_ids_batch.to(generation_model.device), # type: ignore
        'attention_mask': attention_mask_batch.to(generation_model.device) # type: ignore
    }

    outputs = generation_model.generate(
    **inputs,
    max_new_tokens=64,
    pad_token_id=inference_tokenizer.eos_token_id,
    eos_token_id=terminators,
    do_sample=True,
    temperature=0.1,
    top_p=0.9,
    )
    
    generated_outputs.append(outputs)

  0%|          | 0/83 [00:00<?, ?it/s]



 ***** Processing batch 1 *****




 ***** Processing batch 2 *****




 ***** Processing batch 3 *****




 ***** Processing batch 4 *****




 ***** Processing batch 5 *****




 ***** Processing batch 6 *****




 ***** Processing batch 7 *****




 ***** Processing batch 8 *****




 ***** Processing batch 9 *****




 ***** Processing batch 10 *****




 ***** Processing batch 11 *****




 ***** Processing batch 12 *****




 ***** Processing batch 13 *****




 ***** Processing batch 14 *****




 ***** Processing batch 15 *****




 ***** Processing batch 16 *****




 ***** Processing batch 17 *****




 ***** Processing batch 18 *****




 ***** Processing batch 19 *****




 ***** Processing batch 20 *****




 ***** Processing batch 21 *****




 ***** Processing batch 22 *****




 ***** Processing batch 23 *****




 ***** Processing batch 24 *****




 ***** Processing batch 25 *****




 ***** Processing batch 26 *****




 ***** Processing batch 27 *****




 ***** P

In [24]:
decoded_outputs = []

for batch in generated_outputs:

    for prediction in batch:

        decoded_outputs.append(inference_tokenizer.decode(prediction[inputs['input_ids'].shape[1]:], skip_special_tokens=True)) # type: ignore

In [25]:
decoded_outputs

['{"list_emotion_classes": ["fear", "anger"]}',
 '{"list_emotion_classes": "[\'joy\']"}',
 '{"list_emotion_classes": ["anger", "surprise"]}',
 '{"list_emotion_classes": ["fear", "surprise"]}',
 '{"list_emotion_classes": ["fear", "surprise"]}',
 '{"list_emotion_classes": ["neutral", "fear"]}',
 '{"list_emotion_classes": ["anger", "fear"]}',
 '{"list_emotion_classes": ["fear", "surprise"]}',
 '{"list_emotion_classes": ["fear", "sadness"]}',
 '{"list_emotion_classes": ["sadness", "disgust"]}',
 '{"list_emotion_classes": "[\'sadness\']"}',
 '{"list_emotion_classes": ["anger", "fear", "surprise"]}',
 '{"list_emotion_classes": ["surprise", "joy"]}',
 '{"list_emotion_classes": ["joy", "surprise"]}',
 '{"list_emotion_classes": ["neutral"]}',
 '{"list_emotion_classes": ["neutral"]}',
 '{"list_emotion_classes": ["surprise", "anger"]}',
 '{"list_emotion_classes": ["sadness", "fear"]}',
 '{"list_emotion_classes": "[\'anger\']"}',
 '{"list_emotion_classes": ["neutral"]}',
 '{"list_emotion_classes":

In [41]:
grounds = test_df.emotions_list.tolist()   

In [42]:
results_file = Path(OUTPUT_DIR) / f"results_{k}.pickle"
results_file.parent.mkdir(parents=True, exist_ok=True)

In [43]:
results_d = {"grounds": grounds,
            "predictions": decoded_outputs    
        
    }

In [44]:
with results_file.open('wb') as fh:
  
    pickle.dump(results_d, fh)

In [45]:
## Post process

In [46]:
import json

In [52]:
predictions_l = []

for i, prediction in enumerate(decoded_outputs):
        try:
            # Use json.loads to safely parse the JSON-like string
            parsed_prediction = json.loads(prediction)
            # Append the values of the parsed prediction to preds
            predictions_l.append(parsed_prediction["list_emotion_classes"])
            
        except json.JSONDecodeError as e:
            print(f"Error decoding prediction: {i}")

In [53]:
grounds

[['neutral'],
 ['joy'],
 ['surprise', 'joy'],
 ['surprise', 'joy'],
 ['fear', 'surprise'],
 ['anger', 'surprise'],
 ['anger', 'surprise'],
 ['fear', 'surprise'],
 ['fear', 'sadness', 'surprise'],
 ['joy'],
 ['disgust', 'fear', 'sadness'],
 ['fear', 'sadness', 'surprise'],
 ['sadness'],
 ['anger', 'sadness', 'surprise'],
 ['sadness'],
 ['neutral'],
 ['surprise'],
 ['neutral'],
 ['anger'],
 ['neutral'],
 ['surprise'],
 ['fear'],
 ['disgust', 'surprise'],
 ['fear', 'surprise'],
 ['neutral'],
 ['surprise'],
 ['neutral'],
 ['sadness'],
 ['fear', 'sadness'],
 ['fear'],
 ['disgust', 'surprise'],
 ['surprise'],
 ['disgust', 'sadness'],
 ['sadness'],
 ['sadness'],
 ['sadness'],
 ['sadness', 'joy'],
 ['sadness', 'joy'],
 ['joy'],
 ['anger'],
 ['fear', 'surprise'],
 ['anger'],
 ['sadness', 'surprise'],
 ['fear', 'surprise'],
 ['fear', 'surprise'],
 ['surprise'],
 ['fear', 'surprise'],
 ['fear'],
 ['surprise'],
 ['surprise'],
 ['fear', 'surprise'],
 ['fear', 'surprise'],
 ['fear', 'surprise'],
 ['

In [54]:
predictions_l

[['fear', 'anger'],
 "['joy']",
 ['anger', 'surprise'],
 ['fear', 'surprise'],
 ['fear', 'surprise'],
 ['neutral', 'fear'],
 ['anger', 'fear'],
 ['fear', 'surprise'],
 ['fear', 'sadness'],
 ['sadness', 'disgust'],
 "['sadness']",
 ['anger', 'fear', 'surprise'],
 ['surprise', 'joy'],
 ['joy', 'surprise'],
 ['neutral'],
 ['neutral'],
 ['surprise', 'anger'],
 ['sadness', 'fear'],
 "['anger']",
 ['neutral'],
 "['anger','surprise']",
 ['fear', 'neutral'],
 "['anger', 'disgust']",
 "['fear','surprise']",
 ['disgust', 'fear'],
 ['surprise', 'joy'],
 "['fear', 'neutral']",
 ['sadness', 'disgust'],
 ['sadness', 'joy'],
 ['joy'],
 ['anger', 'disgust'],
 "['surprise', 'fear']",
 ['joy'],
 ['joy', 'sadness'],
 ['anger', 'fear'],
 ['surprise', 'joy'],
 ['joy'],
 ['surprise', 'joy'],
 ['joy'],
 ['anger', 'fear'],
 ['fear', 'surprise'],
 ['neutral'],
 ['anger', 'surprise'],
 ['anger', 'surprise', 'fear'],
 "['surprise', 'joy']",
 ['surprise', 'fear'],
 "['surprise']",
 "['surprise', 'fear']",
 ['surp

In [55]:
len(grounds), len(predictions_l)

(1326, 1326)

In [56]:
import ast

In [57]:
predictions = []

for item in predictions_l:
    if isinstance(item, str):
        # Convert the string to a list using ast.literal_eval
        predictions.append(ast.literal_eval(item))
    else:
        # If the item is already a list, append as is
        predictions.append(item)

In [58]:
len(grounds), len(predictions_l)

(1326, 1326)

In [60]:
from sklearn.preprocessing import MultiLabelBinarizer

In [61]:
def get_mlb(grounds, predictions):
    
    mlb = MultiLabelBinarizer()
    grounds_mhot = mlb.fit_transform(grounds)
    predictions_mhot = mlb.transform(predictions)
    
    return grounds_mhot, predictions_mhot, mlb.classes_

In [62]:
grounds_matrix, predictions_matrix, classes = get_mlb(grounds, predictions)



In [63]:
print(classification_report(grounds_matrix, predictions_matrix, target_names=classes, digits=3))

              precision    recall  f1-score   support

       anger      0.520     0.566     0.542       454
     disgust      0.155     0.400     0.223        50
        fear      0.371     0.569     0.449       299
         joy      0.503     0.589     0.543       297
     neutral      0.261     0.440     0.328       109
     sadness      0.497     0.433     0.463       344
    surprise      0.510     0.648     0.571       355

   micro avg      0.444     0.550     0.491      1908
   macro avg      0.402     0.521     0.446      1908
weighted avg      0.464     0.550     0.498      1908
 samples avg      0.461     0.555     0.482      1908



In [64]:
classification_file = Path(OUTPUT_DIR) / f"classification_report_{k}.pickle"

with classification_file.open('wb') as fh:
    
    pickle.dump(classification_report(grounds_matrix, predictions_matrix, target_names=classes, output_dict=True), fh)