In [None]:
### import the packages
import numpy as np 
import pandas as pd
from sentence_transformers import SentenceTransformer
import matplotlib.pyplot as plt
from googletrans import Translator
import expansion_utils as utils
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForMaskedLM, AutoTokenizer, AutoModelForSequenceClassification
import fasttext
import fasttext.util
import torch
import csv
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
from scipy.stats import spearmanr, kendalltau, pearsonr, ttest_ind
from transformers import BertTokenizer, BertModel
import importlib
importlib.reload(utils)

In [None]:
#translate Ekman emotion file into all other languages
utils.translate_emotions("All_emotions//english.csv", "All_emotions//english_context.csv", "en")
utils.translate_emotions("All_emotions//english.csv", "All_emotions//spanish_context.csv", "es")
utils.translate_emotions("All_emotions//english.csv", "All_emotions//chinese_context.csv", "zh-CN")
utils.translate_emotions("All_emotions//english.csv", "All_emotions//japanese_context.csv", "ja")
utils.translate_emotions("All_emotions//english.csv", "All_emotions//hindi_context.csv", "hi")

In [None]:
def get_embedding(embedding_type, model_name, words):     
    if(embedding_type == "sentence_transformers"):  
        model = SentenceTransformer(model_name)
        embeddings = model.encode(words) 
        return embeddings
    elif(embedding_type == "roberta"):
        try:
            roberta_model = AutoModelForMaskedLM.from_pretrained(model_name)
            roberta_tokenizer = AutoTokenizer.from_pretrained(model_name)
        except:
            try:
                roberta_model = AutoModelForSequenceClassification.from_pretrained(model_name)
                roberta_tokenizer = AutoTokenizer.from_pretrained(model_name)
            except:    
                roberta_model = BertModel.from_pretrained(model_name)
                roberta_tokenizer = BertTokenizer.from_pretrained(model_name)           
        embeddings = []
        for word in words:
            sent_embedding = roberta_encoding(word, roberta_model, roberta_tokenizer)
            embeddings.append(sent_embedding)
        return torch.concat(embeddings, dim=0).numpy()
  
def roberta_encoding(sentence, roberta_model, roberta_tokenizer):
  '''
    Generate a mean-pooled sentence embedding using a pre-trained RoBERTa model.
    Args:
        sentence (str): The input sentence to be embedded.
        model (RobertaModel): The pre-trained RoBERTa model.
        tokenizer (RobertaTokenizer): The tokenizer corresponding to the RoBERTa model. 
    Returns:
        mean_pooled_embedding (torch.Tensor): The mean-pooled sentence embedding.
  '''
  inputs = roberta_tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
  with torch.no_grad():
      outputs = roberta_model(**inputs, output_hidden_states=True)
  token_embeddings = outputs.hidden_states[-1]
  input_mask = inputs['attention_mask']
  sum_embeddings = torch.sum(token_embeddings * input_mask.unsqueeze(-1), dim=1)
  total_tokens = torch.clamp(input_mask.sum(1), min=1e-9)
  mean_pooled_embedding = sum_embeddings / total_tokens.unsqueeze(-1)
  return mean_pooled_embedding

In [None]:
#writes to file in the form of "emotion, embedding"
def encode_emotions(input_file, embedding_type, model_name, output_file):
    emotion_list = []
    with open(("All_emotions//"+input_file), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            emotion_list.append(row[0].lower().strip())
            
    embeddings = get_embedding(embedding_type, model_name, emotion_list)
    with open(("Encoded_Emotions/"+output_file), 'w') as f:
        for i in range(len(emotion_list)):
            f.write(emotion_list[i] + "\t" + str(list(embeddings[i])) + "\n")

In [None]:
import ast

# input: csv file in the form emotion, embedding
# output: csv file in the form of emotion, list of nearest emotions in embedding space
def get_nearest_emotions(input_file, output_file):
    emotion_list = []
    embeddings = []
    with open(("Encoded_Emotions/"+input_file), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter='\t')
        for row in reader:
            emotion_list.append(row[0])
            embeddings.append(ast.literal_eval(row[1]))
    embeddings = np.array(embeddings)

    with open(("Nearest_Emotion_Results/"+output_file), 'w') as f:
        for i in range(len(emotion_list)):
            curr_emotion = emotion_list[i]
            f.write(curr_emotion)
            distances = {}
            for j in range(len(emotion_list)):
                if(i != j):
                    distances[emotion_list[j]] = euclidean_distances([embeddings[i]], [embeddings[j]])[0][0]
                    # distances[emotion_list[j]] = cosine_similarity([embeddings[i]], [embeddings[j]])[0][0]
            sorted_distances = sorted(distances.items(), key=lambda x: x[1])
            for emotion in enumerate(sorted_distances):
                f.write("," + emotion[1][0] + ": " + str(emotion[1][1]))
            f.write("\n")
    print("DONE")

In [None]:
#create dictionary of english words --> other words

def make_translation_dict(english_file, other_file):
    english_words = []
    other_words = []
    with open("All_emotions//"+english_file, newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            english_words.append(row[0].lower().strip())
    with open("All_emotions//"+other_file, newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            other_words.append(row[0].lower().strip())
    assert(len(english_words) == len(other_words))
    
    TRANS_DICT = {}
    for i in range(len(english_words)):
        TRANS_DICT[english_words[i]] = other_words[i]
    return TRANS_DICT

es_trans = make_translation_dict("english_context.csv", "spanish_context.csv")
zh_trans = make_translation_dict("english_context.csv", "chinese_context.csv")
ja_trans = make_translation_dict("english_context.csv", "japanese_context.csv")

In [None]:

#read in files that contain the nearest emotion for each emotion
def compare_nearest_emotions(nearest_file1, nearest_file2):
    emotion_list1 = []
    nearest_list1 = []
    with open(("Nearest_Emotion_Results/"+nearest_file1), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            emotion_list1.append(row[0])     
            nearest_list1.append(row[1:])
    emotion_list2 = []
    nearest_list2 = []
    with open(("Nearest_Emotion_Results/"+nearest_file2), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            emotion_list2.append(row[0])
            nearest_list2.append(row[1:])
    assert(emotion_list1 == emotion_list2)

    correlation_scores = []
    p_vals = []
    percent_overlap = []
    for emotion in emotion_list1:
        nearest1 = [x.split(":")[0] for x in nearest_list1[emotion_list1.index(emotion)]]
        nearest2 = [x.split(":")[0] for x in nearest_list2[emotion_list2.index(emotion)]]
        
        nearest1_unique = []
        [nearest1_unique.append(x) for x in nearest1 if x not in nearest1_unique]
        nearest2_unique = []
        [nearest2_unique.append(x) for x in nearest2 if x not in nearest2_unique]
        nearest1_unique = nearest1_unique[:50]
        nearest2_unique = nearest2_unique[:50]
        
        correlation_scores.append(kendalltau(nearest1_unique, nearest2_unique)[0])
        p_vals.append(kendalltau(nearest1_unique, nearest2_unique)[1])
        percent_overlap.append(len(set(nearest1_unique).intersection(nearest2_unique)) / len(nearest1_unique) * 100)
    
    print(np.round(np.mean(percent_overlap), 2))

en_ekman = ["i feel joy", "i feel sadness", "i feel anger", "i feel fear", "i feel disgust", "i feel surprise"]
es_ekman = [es_trans[x] for x in en_ekman]
zh_ekman = [zh_trans[x] for x in en_ekman]
ja_ekman = [ja_trans[x] for x in en_ekman]
ekman_all = en_ekman + es_ekman + ja_ekman + zh_ekman

def get_emotion_correlation(nearest_file1, nearest_file2, mapping=False, trans_dict=None):
    emotion_list1 = []
    nearest_list1 = []
    with open(("Nearest_Emotion_Results/"+nearest_file1), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            emotion_list1.append(row[0])     
            nearest_list1.append(row[1:])
    emotion_list2 = []
    nearest_list2 = []
    with open(("Nearest_Emotion_Results/"+nearest_file2), newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        for row in reader:
            emotion_list2.append(row[0])
            nearest_list2.append(row[1:])

    if(mapping):
        alphabetized_nearest_list1 = []
        alphabetized_nearest_list2 = []
        for i in range(len(nearest_list1)):
            temp_emotions_en = [x.split(":")[0] for x in nearest_list1[i]]
            temp_distances_en = [float(x.split(":")[1]) for x in nearest_list1[i]]
            temp_emotions_other = [x.split(":")[0] for x in nearest_list2[i]]
            temp_distances_other = [float(x.split(":")[1]) for x in nearest_list2[i]]

            overlap_emotions_en = []
            overlap_distances_en = []
            visited_emotions = []
            for i in range(len(temp_emotions_en)):
                if(trans_dict[temp_emotions_en[i]] in temp_emotions_other and trans_dict[temp_emotions_en[i]] not in visited_emotions):
                    overlap_emotions_en.append(trans_dict[temp_emotions_en[i]])
                    overlap_distances_en.append(temp_distances_en[i])
                    visited_emotions.append(trans_dict[temp_emotions_en[i]])
            assert(len(overlap_emotions_en) == len(temp_emotions_other))

            sorted_list1 = sorted(zip(overlap_emotions_en, overlap_distances_en))
            sorted_list2 = sorted(zip(temp_emotions_other, temp_distances_other))

            alphabetized_nearest_list1.append([sorted_list1[x][1] for x in range(len(sorted_list1))])
            alphabetized_nearest_list2.append([sorted_list2[x][1] for x in range(len(sorted_list2))])
    else:
        assert(emotion_list1 == emotion_list2)
        alphabetized_nearest_list1 = []
        alphabetized_nearest_list2 = []
        for l in nearest_list1:
            sorted_list = sorted(l)
            sorted_distances = [float(x.split(":")[1]) for x in sorted_list]
            alphabetized_nearest_list1.append(sorted_distances)    
        for l in nearest_list2:
            sorted_list = sorted(l)
            sorted_distances = [float(x.split(":")[1]) for x in sorted_list]
            alphabetized_nearest_list2.append(sorted_distances)

    assert(len(alphabetized_nearest_list1) == len(alphabetized_nearest_list2))
    for i in range(len(alphabetized_nearest_list1)):
        try:
            assert(len(alphabetized_nearest_list1[i]) == len(alphabetized_nearest_list2[i]))
        except:
            print(len(alphabetized_nearest_list1[i]), len(alphabetized_nearest_list2[i]))
    
    pearson_correlation_scores = []
    spearman_correlation_scores = []
    pearson_p_vals = []
    spearman_p_vals = []
    count = 0
    
    visited_emotions = []
    # print(nearest_file1.split("_")[0].upper() + "(" + nearest_file1.split("_")[1] + ") vs. " + nearest_file2.split("_")[0].upper() + "(" + nearest_file2.split("_")[1] + ")")
    # print(nearest_file1.split("_")[0].upper())
    for i in range(len(emotion_list1)):
        # if(emotion_list1[i] not in ekman_all): continue
        if(emotion_list1[i] in visited_emotions): continue
        else: visited_emotions.append(emotion_list1[i])
        
        pearson_corr = pearsonr(alphabetized_nearest_list1[i], alphabetized_nearest_list2[i])
        spearman_corr = spearmanr(alphabetized_nearest_list1[i], alphabetized_nearest_list2[i])
        pearson_correlation_scores.append(pearson_corr[0])
        spearman_correlation_scores.append(spearman_corr[0])
        # print("corr: %f, p-val: %s, %s" % (pearson_corr[0], str(pearson_corr[1]), emotion_list1[i]))
        count += 1
    # print("Total emotions included: %d" % (count))
    print("Spearman Avg: %f" % ((np.round(np.mean(spearman_correlation_scores), 3))))
    return spearman_correlation_scores

In [None]:

#Monolingual vs Multilingual
e = get_emotion_correlation("english_roberta_nearest.csv", "english_roberta-xlm_nearest.csv")
s = get_emotion_correlation("spanish_roberta_nearest.csv", "spanish_roberta-xlm_nearest.csv")
c = get_emotion_correlation("chinese_roberta_nearest.csv", "chinese_roberta-xlm_nearest.csv")
j = get_emotion_correlation("japanese_roberta_nearest.csv", "japanese_roberta-xlm_nearest.csv")
print('\n')

In [None]:
print(ttest_ind(e, s, equal_var=False)[1])
print(ttest_ind(e, c, equal_var=False)[1])
print(ttest_ind(e, j, equal_var=False)[1])

In [None]:
#English Monolingual vs. Other Monolingual

sm = get_emotion_correlation("english_roberta_real_nearest.csv", "spanish_roberta_nearest.csv", mapping=True, trans_dict=es_trans)
cm = get_emotion_correlation("english_roberta_real_nearest.csv", "chinese_roberta_nearest.csv", mapping=True, trans_dict=zh_trans)
jm = get_emotion_correlation("english_roberta_real_nearest.csv", "japanese_roberta_nearest.csv", mapping=True, trans_dict=ja_trans)

print('\n')

sx = get_emotion_correlation("english_roberta-xlm-sent_nearest.csv", "spanish_roberta-xlm-sent_nearest.csv", mapping=True, trans_dict=es_trans)
cx = get_emotion_correlation("english_roberta-xlm-sent_nearest.csv", "chinese_roberta-xlm-sent_nearest.csv", mapping=True, trans_dict=zh_trans)
jx = get_emotion_correlation("english_roberta-xlm-sent_nearest.csv", "japanese_roberta-xlm-sent_nearest.csv", mapping=True, trans_dict=ja_trans)

print('\n')

print(ttest_ind(sm, sx, equal_var=False)[1])
print(ttest_ind(cm, cx, equal_var=False)[1])
print(ttest_ind(jm, jx, equal_var=False)[1])


In [None]:

#Unaligned vs. Aligned
e = get_emotion_correlation("english_roberta-xlm_nearest.csv", "english_paraphrase-mpnet_nearest.csv")
s = get_emotion_correlation("spanish_roberta-xlm_nearest.csv", "spanish_paraphrase-mpnet_nearest.csv")
c = get_emotion_correlation("chinese_roberta-xlm_nearest.csv", "chinese_paraphrase-mpnet_nearest.csv")
j = get_emotion_correlation("japanese_roberta-xlm_nearest.csv", "japanese_paraphrase-mpnet_nearest.csv")
print('\n')

print(ttest_ind(e, s, equal_var=False)[1])
print(ttest_ind(e, c, equal_var=False)[1])
print(ttest_ind(e, j, equal_var=False)[1])
