In [1]:
# !pip install levenshtein

In [2]:
from Levenshtein import distance 

def find_best_subsequence(answer, context):
    context = context.split()
    
    min_dist = distance(context, answer)
    best_ij = 0, len(context)
    
    for i in range(0, len(context)):
        for j in range(i + 1, len(context)):
            subsequence = " ".join(context[i:j])
            dist = distance(subsequence, answer)
            if min_dist > dist:
                min_dist = dist
                best_ij = i, j
    
    return best_ij

find_best_subsequence("BBB CCC", "AAA BBB CCC DDD EEEE")

(1, 3)

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import warnings
import os
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import peft
import string

warnings.filterwarnings('ignore')

from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from tqdm import tqdm
from peft import LoraConfig, TaskType, get_peft_model, PeftModel

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

<hr>

In [4]:
tokenizer = AutoTokenizer.from_pretrained("tae898/emoberta-large")
model_l0 = AutoModelForSequenceClassification.from_pretrained("tae898/emoberta-large")

model_l0.load_state_dict(torch.load("models/model_l0_v07.pth", map_location="cpu"))
_ = model_l0.eval()

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, ff_dim, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, d_model)
        )
        self.normalizer = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        x = self.normalizer(x)
        x = self.feed_forward(x)
        x = self.dropout(x)
        x, weights = self.self_attn(x, x, x, attn_mask=mask)    
        return x, weights

class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, logit_dim, model_dim, n_layers, n_heads, ff_dim, max_len, dropout=0.1):
        super(TransformerEncoder, self).__init__()
  
        self.compressor = nn.Linear(embed_dim + logit_dim, model_dim)
        self.dropout = nn.Dropout(dropout)
        self.positional_encoding = self.get_positional_encoding(max_len, model_dim).permute(1, 0, 2)
        self.layers = nn.ModuleList([EncoderLayer(model_dim, n_heads, ff_dim, dropout) for _ in range(n_layers)])
        

    def get_positional_encoding(self, max_len, d_model):
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float32) * -(np.log(10000.0) / d_model))
        pos_enc = torch.zeros((max_len, 1, d_model))
        pos_enc[:, 0, 0::2] = torch.sin(position * div_term)
        pos_enc[:, 0, 1::2] = torch.cos(position * div_term)
        return pos_enc

    def forward(self, x, y, mask=None):
        
        x = torch.concatenate((x, y), dim=-1)
        x = self.compressor(x)   
        x = self.dropout(x)
        
        x = x + self.positional_encoding[:, :x.shape[1], :].to(x.device)
        
        for layer in self.layers:
            x, weights = layer(x, mask)
           
        return x, weights
    
model_l1 = TransformerEncoder(
    embed_dim=1024, model_dim=256, logit_dim=7, n_layers=1, n_heads=1, ff_dim=1024, max_len=40)

model_l1.load_state_dict(torch.load("models/model_l1_v09.pth", map_location="cpu"))
model_l1.eval()

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

count_parameters(model_l0), count_parameters(model_l1)

(355366919, 1053440)

In [6]:
from transformers import AutoModelForQuestionAnswering, AutoTokenizer

checkpoint = "models/qa/checkpoint-10500"
model_name = "deepset/deberta-v3-base-squad2"

qa_model = AutoModelForQuestionAnswering.from_pretrained(model_name)
qa_model = PeftModel.from_pretrained(qa_model, checkpoint)
tokenizer_qa = AutoTokenizer.from_pretrained(checkpoint)

In [7]:
@torch.no_grad
def generate_emotion_causes(model_l0, model_l1, tokenizer, all_texts, all_speaker):
    
    model_l0.to(device)
    model_l1.to(device)
    
    texts = [f"speaker: {s} dialog: {d}" for s, d in zip(all_speaker, all_texts)]

    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
    inputs = inputs.to(device)

    input_ids, attention_mask = inputs['input_ids'], inputs['attention_mask']

    with torch.no_grad():
        output = model_l0(
            input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)

        embeds = output.hidden_states[-1][:, 0, :]
        logits = F.softmax(output.logits, dim=-1)

    embeds, logits = embeds.unsqueeze(0), logits.unsqueeze(0)
    
    weights = model_l1(embeds, logits)[1].squeeze(0)
    
    cause_org, cause_dst = torch.where(weights >= 0.2)
    cause_org = cause_org.tolist()
    cause_dst = cause_dst.tolist()
    
    cpairs = list(zip(cause_org, cause_dst))
    logits = logits.squeeze(0)
    
    emotions = logits.max(dim=-1)[1]
    emotions = [model_l0.config.id2label[x] for x in emotions.tolist()]
        
    return emotions, cpairs

<hr>

In [10]:
def func(data):
    final_results = []
    for conversation in tqdm(data):

        sample = conversation["conversation"]
        all_texts = [d["text"] for d in sample]
        all_speaker = [d["speaker"] for d in sample]

        emotions, cpairs = generate_emotion_causes(model_l0, model_l1, tokenizer, all_texts, all_speaker)

        for i, e in enumerate(emotions):
            if e == "neutral":
                cpairs = [x for x in cpairs if x[0] != i]
            else:
                if any(x[0] == i for x in cpairs):
                    continue
                cpairs.append((i, i - 1))

        if len(cpairs) > 0:
            prompt = "Which part of the text ’{}’ is the reason for ’ {} ’'s feeling of ’ {} ’ when ’ {} ’ is said?"

            all_questions = []
            all_contexts = []

            for org, dst in cpairs:
                d1 = sample[org]
                d2 = sample[dst]

                all_questions.append(prompt.format(d2["text"], d1["speaker"], emotions[org], d1["text"]))
                all_contexts.append(d2["text"])

            inputs = tokenizer_qa(all_questions, all_contexts, padding=True, truncation=True, return_tensors="pt")

            with torch.no_grad():
                outputs = qa_model(**inputs)

            answer_start_index, answer_end_index = \
                outputs.start_logits.argmax(dim=-1).tolist(), outputs.end_logits.argmax(dim=-1).tolist()

            answers = []

            for i, (start, end, context) in enumerate(zip(answer_start_index, answer_end_index, all_contexts)):
                
                context = context[::-1]
                while context[0] in string.punctuation:
                    context = context[1:]

                context = context[::-1]
                while context[0] in string.punctuation:
                    context = context[1:]

                try:
                    answer = tokenizer_qa.decode(inputs.input_ids[i, start:end])
                    answer = answer[::-1]
                    while answer[0] in string.punctuation:
                        answer = answer[1:]

                    answer = answer[::-1]
                    while answer[0] in string.punctuation:
                        answer = answer[1:]

                    answer.strip()
                    answers.append(find_best_subsequence(answer, context))

                except IndexError:
                    answer = context
                    answer = answer[::-1]
                    while answer[0] in string.punctuation:
                        answer = answer[1:]

                    answer = answer[::-1]
                    while answer[0] in string.punctuation:
                        answer = answer[1:]

                    answer.strip()
                    answers.append((0, len(answer.split())))

            results = []
            for i, (org, dst) in enumerate(cpairs):
                d1 = sample[org]
                d2 = sample[dst]

                results.append(["{}_{}".format(d1["utterance_ID"], emotions[org]), "{}_{}_{}".format(d2["utterance_ID"], answers[i][0], answers[i][1])])

            conversation = copy.deepcopy(conversation)
            conversation.update({"emotion-cause_pairs": results})
        else:
            conversation = copy.deepcopy(conversation)
            conversation.update({"emotion-cause_pairs": []})

        final_results.append(conversation)
    return final_results

In [11]:
import json

with open("Subtask_1_test.json") as f:
    data = json.load(f)
    
final_results = func(data)
with open("Subtask_1_pred.json", "w") as f:
    json.dump(final_results, f, indent=4)

100%|██████████| 665/665 [03:02<00:00,  3.65it/s]


In [12]:
import json

with open("Subtask_2_test.json") as f:
    data = json.load(f)
    
final_results = func(data)
with open("Subtask_2_pred.json", "w") as f:
    json.dump(final_results, f, indent=4)

100%|██████████| 665/665 [02:55<00:00,  3.78it/s]


In [None]:
import json

with open("Subtask_2_pred.json") as f:
    data = json.load(f)

json_formatted_str = json.dumps(data, indent=4)
for i in range(len(data)):
  ecps = data[i]["emotion-cause_pairs"]
  for j in range(len(ecps)):
    ecp = ecps[j]
    l = ecp[1][:ecp[1].find("_")]
    data[i]["emotion-cause_pairs"][j][1] = l

with open('Subtask_2_pred.json', 'w') as f:
    json.dump(data, f, indent=4)