Step 1. Install and import packages

In [1]:
import textwrap
import csv
import json
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import time
import google.generativeai as genai
import re 

from IPython.display import display
from IPython.display import Markdown
import ast 
import pandas as pd
from collections import Counter
import numpy as np
from scipy.spatial import distance
import os
from pathlib import Path

def to_markdown(text):
  text = text.replace('•', '  *')
  return Markdown(textwrap.indent(text, '> ', predicate=lambda _: True))

# Or use `os.getenv('GOOGLE_API_KEY')` to fetch an environment variable.
GOOGLE_API_KEY= 'AIzaSyBXOwIFMOC_CJ_1EfgbwYtpWuAvTOvNx90'

genai.configure(api_key=GOOGLE_API_KEY)

model = genai.GenerativeModel('gemini-1.5-flash')

RuntimeError: module was compiled against NumPy C-API version 0x10 (NumPy 1.23) but the running NumPy has C-API version 0xf. Check the section C-API incompatibility at the Troubleshooting ImportError section at https://numpy.org/devdocs/user/troubleshooting-importerror.html#c-api-incompatibility for indications on how to solve this problem.

Step 2. Get the dialog order for all sessions

In [2]:
def get_dialog_order(tran_path):
    diaglog_ids = []
    for session in range(1,6):
        session = "Session" + str(session)
        session_path = os.path.join(tran_path, session)
        trans_path = os.path.join(session_path, 'dialog/transcriptions')
        trans_lists = sorted(os.listdir(trans_path))
        for i in range(len(trans_lists)):
            full_trans_path = os.path.join(trans_path, trans_lists[i])
            texts = open(full_trans_path, 'r').readlines()
            for text in texts:
                sentence_id = text.split()[0]
                diaglog_ids.append(sentence_id)
    return diaglog_ids
tran_path = './data/IEMOCAP_full_release/'
diaglog_id_orders = get_dialog_order(tran_path)
print("Number of sentence ids across all sessions: ", len(diaglog_id_orders))

Number of sentence ids across all sessions:  10238


Step 3. Load the data

In [3]:
file_path = './data/iemocap_ambiguous.json'
data = json.load(open(file_path))
for i in range(len(data)):
    data[i]['speaker'] = data[i]['id'][:5] + data[i]['id'].split("_")[2][0]

Transfer three ground true labels to distributions

In [4]:
def get_label_prob(data):

    emo_labels = ['neu', 'hap', 'ang', 'sad']
    emotion_code_dict = {"Neutral state":"neu", "Happiness":"hap", "Anger":"ang", "Sadness":"sad", "Frustration":"others", "Contempt":"others", "Excitement":"others", "Surprise":"others", "Disgust":"others", "Fear":"others", "Other": "others"}
    num_out_labels = 0
    for item in data:
        amb_labels = []
        if item['need_prediction'] == 'yes':
            for emo in item['emotion']:
                amb_labels.append(emotion_code_dict[emo])

            filtered_labels = [label for label in amb_labels if label in emo_labels]
            for label in amb_labels:
                if label not in emo_labels:
                    num_out_labels += 1


            item['amb_emotion'] = filtered_labels

            emotion_counts = Counter(filtered_labels)
            total_count = sum(emotion_counts.values())
            
            probs = {emo: round(emotion_counts[emo]/total_count,2) for emo in emo_labels}
            item['emotion_probs'] = [probs[emo] for emo in emo_labels]

    return data, num_out_labels

new_data, num_out_labels = get_label_prob(data)
print("Number of instances that their emotions are outside four labels: ", num_out_labels)

Number of instances that their emotions are outside four labels:  0


Rearrange data into sessions:

In [5]:
# rearrange the data into sessions
def rearrange_data(data):
    session_dict = {}
    for item in data:
        session_impro_id = item['id'][:6]
        if session_impro_id not in session_dict:
            session_dict[session_impro_id] = []
        session_dict[session_impro_id].append(item)
    return session_dict

session_data = rearrange_data(new_data)
print('session in the data')
print(list(session_data.keys()))
print("number of session_improvisations:",len(session_data))
print('--------\na sample entry:')
for key, value in session_data[list(session_data.keys())[0]][0].items():
    print(f"{key}: {value}")

session in the data
['Ses01F', 'Ses01M', 'Ses02F', 'Ses02M', 'Ses03F', 'Ses03M', 'Ses04F', 'Ses04M', 'Ses05F', 'Ses05M']
number of session_improvisations: 10
--------
a sample entry:
id: Ses01F_impro01_F000
emotion: ['Neutral state', 'Neutral state', 'Neutral state']
need_prediction: yes
speaker: Ses01F
groundtruth: Excuse me.
audio: IEMOCAP_full_release/Session/1/sentences/wav/Ses01F_impro01_F000/Ses01F_impro01_F000.wav
amb_emotion: ['neu', 'neu', 'neu']
emotion_probs: [1.0, 0.0, 0.0, 0.0]


Reorder data according to dialog ids

In [6]:
def order_sentences(diaglog_id_orders, new_data):
    order_sen = []
    for sentence_id in diaglog_id_orders:
        for item in new_data:
            if item['id'] == sentence_id:
                order_sen.append(item)
                break
    return order_sen
order_data = order_sentences(diaglog_id_orders, new_data)
print("Number of ordered sentences across all sessions: ", len(order_data))
num_pred = 0
for i in range(len(order_data)):
    if order_data[i]['need_prediction'] == 'yes':
        num_pred += 1
print("Number of sentences that need prediction: ", num_pred)
print("List of first 5 sentence ids in the first sessions: \n", [item['id'] for item in order_data[0:5]])

Number of ordered sentences across all sessions:  10039
Number of sentences that need prediction:  4370
List of first 5 sentence ids in the first sessions: 
 ['Ses01F_impro01_F000', 'Ses01F_impro01_M000', 'Ses01F_impro01_F001', 'Ses01F_impro01_M001', 'Ses01F_impro01_F002']


In [7]:
def match_features(index, order_audio_data):
    matched_features = {}
    if 'egemaps' not in order_audio_data[index]:
        print("Error occurred: No egemaps in the audio data, the index is ", index)
    else:
        egemaps = order_audio_data[index]['egemaps']
        column_names = egemaps.columns
        values = egemaps.values[0]
        for col_id in range(len(column_names)):
            matched_features[column_names[col_id]] = float(values[col_id])
        return matched_features

In [8]:
len(order_data)

10039

Step 3. Create an emotion predictor that takes context and spoken text as input and output an emotion label (what you need to modify).

In [9]:
def Gemini_emotion_predictor(cur_sentence):
    """
    Predicts the emotional state of a speaker based on the current input sentence and the conversational context.

    Parameters:
    context (list of dict): A list of dictionaries, each representing a previous conversational turn. Each dictionary
                            should contain at least the keys 'speaker' and 'sentence' indicating who the speaker was
                            and what they said, respectively.
    cur_input (dict): A dictionary representing the current sentence to be analyzed. It should contain at least the keys
                        'speaker' and 'sentence', similar to the dictionaries in `context`.
    number_of_contexts (int, optional): The number of contextual entries to consider for emotion prediction. Defaults to 3. The more context, the more expensive.

    Returns:
    str: The predicted emotion for the current sentence, from a set of predefined emotions such as 'happy', 'sad',
            'neutral', or 'angry'.
    """
    task = f"Predict the probability of the emotion of the sentence from the options [neutral, happy, angry, sad]. The sentence is '{cur_sentence['groundtruth']}'. Output statisfies the following rules.\n"
    task_req1 = "Rule 1: Generate a dictionary of emotion probabilities in format of {'neutral': 0.1, 'happy':0.0, 'angry':0.1, 'sad':0.8}. If you think there is only one emotion in the sentence, then give the probability to 1.\n "
    task_req2 = "Rule 2: Ensure the sum of probability equal to 1.\n"
    task_amb2 = "Rule 3: Do not explain, only the dictionary.\n"
    task_final = "Please check again whether your output follows the three rules."
    prompt = task + task_req1 + task_req2 + task_amb2 + task_final
    # print(prompt)
    response = model.generate_content(prompt)
    return response, prompt

In [11]:
r, p = Gemini_emotion_predictor(order_data[0])
r.text

I0000 00:00:1725759278.544829 16976474 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
I0000 00:00:1725759278.561662 16976474 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported


"```json\n{'neutral': 1.0, 'happy': 0.0, 'angry': 0.0, 'sad': 0.0}\n```"

Step 4. Predict the entire session and output a sequence of emotion prediction for each sentence that requires a prediction.

In [12]:
# transfer response from ```json\n{'neutral': 0.0, 'happy': 0.0, 'angry': 0.0, 'sad': 1.0}\n```
def identify_format(text):
    match = re.search(r"\{.*\}", text)
    if match:
        text = match.group(0)
    result_dict = ast.literal_eval(text)
    return result_dict
def dictToList(dict, emo_labels):
    prob_list = [dict[emo] for emo in emo_labels]
    return prob_list

In [14]:
log = {}
def predict_sentence(index_sentence, data):
    error = False
    emo_labels = ['neutral', 'happy', 'angry', 'sad']
    cur_sentence = data[index_sentence]
    cur_label = cur_sentence['emotion_probs']

    try:
        time.sleep(0.2)
        response,prompt = Gemini_emotion_predictor(cur_sentence)
        response = response.text.strip()

        # input both context and the current sentence to the emotion predictor
        try:
            clear_response = identify_format(response)
            cur_pred = dictToList(clear_response, emo_labels)
            log[cur_sentence["id"]] = [prompt, response]
        except:
            # if there is an error, fill a neutral to keep the output of same dimension
            print('Gemini response is not in the right format: ', response, cur_sentence['id'])
            cur_pred = [1.0,0.0,0.0,0.0]
            error = True
            log[cur_sentence["id"]] = ["Response not in the right format", prompt, response]
    except:
        try: 
            time.sleep(30)
            response,prompt = Gemini_emotion_predictor(cur_sentence)
            response = response.text.strip()

            # input both context and the current sentence to the emotion predictor
            try:
                clear_response = identify_format(response)
                cur_pred = dictToList(clear_response, emo_labels)
                log[cur_sentence["id"]] = [prompt, response]
            except:
                # if there is an error, fill a neutral to keep the output of same dimension
                print('Gemini response is not in the right format: ', response, cur_sentence['id'])
                cur_pred = [1.0,0.0,0.0,0.0]
                error = True
                log[cur_sentence["id"]] = ["Response not in the right format", prompt, response]

        except:
            print('Gemini api has an error.: ', cur_sentence)
            cur_pred = [1.0,0.0,0.0,0.0]
            error = True
            log[cur_sentence["id"]] = ["Gemini api has an error."]

    return cur_label, cur_pred, error 

In [15]:
log = {}
def make_predictions(data):
    number_errors, number_success = 0, 0
    started_sessions = []
    all_ground_truth, all_pred = [], []

    for i, item in enumerate(data):
        
        if item['id'][:-4] not in started_sessions:
            started_sessions.append(item['id'][:-4])
            print("Session ", item['id'][:-4])
            
        if item["need_prediction"] == "yes":
            label, prediction, error = predict_sentence(i, order_data)
            all_ground_truth.append(label)
            all_pred.append(prediction)
            if error == True:
                number_errors += 1
            else: 
                number_success += 1

        if i == len(order_data)-1 or order_data[i+1]['id'][:-4] != order_data[i]['id'][:-4]:
            print('Number of error counts:', number_errors, "; Number of predictions:", number_success)
            number_errors, number_success = 0, 0
            print('------------------------')
        # for testing
        # if i > 20:
        #     break
    return all_pred, all_ground_truth

In [16]:
all_pred, all_ground_truth = make_predictions(order_data)

Session  Ses01F_impro01_
Number of error counts: 0 ; Number of predictions: 2
------------------------
Session  Ses01F_impro02_
Number of error counts: 0 ; Number of predictions: 21
------------------------
Session  Ses01F_impro03_
Gemini response is not in the right format:  Please provide me with the sentence so I can predict the emotion probabilities. Ses01F_impro03_F003
Gemini response is not in the right format:  Please provide the sentence you want me to analyze. I need the sentence to predict the emotion probabilities. Ses01F_impro03_F013
Gemini response is not in the right format:  Please provide the sentence you would like me to analyze. I need the sentence to predict the emotion probabilities. Ses01F_impro03_M013
Gemini response is not in the right format:  Please provide the sentence you would like me to analyze. I need the sentence to determine the emotion probabilities. Ses01F_impro03_F026
Number of error counts: 4 ; Number of predictions: 37
------------------------
Sessi

In [17]:
folder_path = "0907_ct0"
print("Total predictions: ", len(all_pred), "Total ground truth:", len(all_ground_truth))
# Write to a CSV file using a context manager
with open(f'./prediction/{folder_path}/pred.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(all_pred)

with open(f'./prediction/{folder_path}/truth.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(all_ground_truth)

json.dump(log, open(f'./prediction/{folder_path}/log.json', 'w'), indent=4)  

Total predictions:  4370 Total ground truth: 4370


In [19]:
log['Ses05M_script02_1_M008']

['Response not in the right format',
 "Predict the probability of the emotion of the sentence from the options [neutral, happy, angry, sad]. The sentence is ''. Output statisfies the following rules.\nRule 1: Generate a dictionary of emotion probabilities in format of {'neutral': 0.1, 'happy':0.0, 'angry':0.1, 'sad':0.8}. If you think there is only one emotion in the sentence, then give the probability to 1.\n Rule 2: Ensure the sum of probability equal to 1.\nRule 3: Do not explain, only the dictionary.\nPlease check again whether your output follows the three rules.",
 'Please provide the sentence so I can predict the emotion probabilities.']

In [21]:
for i in range(len(order_data)):
    if order_data[i]['id'] == 'Ses05M_script02_1_M008':
        print(order_data[i])

{'id': 'Ses05M_script02_1_M008', 'emotion': ['Happiness', 'Happiness', 'Happiness'], 'need_prediction': 'yes', 'speaker': 'Ses051', 'groundtruth': '', 'amb_emotion': ['hap', 'hap', 'hap'], 'emotion_probs': [0.0, 1.0, 0.0, 0.0]}


In [20]:
order_data[0]

{'id': 'Ses01F_impro01_F000',
 'emotion': ['Neutral state', 'Neutral state', 'Neutral state'],
 'need_prediction': 'yes',
 'speaker': 'Ses01F',
 'groundtruth': 'Excuse me.',
 'audio': 'IEMOCAP_full_release/Session/1/sentences/wav/Ses01F_impro01_F000/Ses01F_impro01_F000.wav',
 'amb_emotion': ['neu', 'neu', 'neu'],
 'emotion_probs': [1.0, 0.0, 0.0, 0.0]}