In [6]:
import warnings
warnings.filterwarnings("ignore")

import json
import numpy as np
import pandas as pd
import glob
import scipy.stats as st

import sys
sys.path.append("/home/sho/Git/cuhksz-phd/sho_util/pyfiles/")
from basic import get_bool_base_on_conditions

def get_json(path):
    with open(path, 'r') as j:
         contents = json.loads(j.read())
    return contents

experiments = {
    "main": ["OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn", "OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn", "relative-attributes_OpenSMILE_esd", "msemotts_OpenSMILE_esd"],
    "features": ["OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn", "WavLM_EPR-globaloutput-GRL_esd_fcn", "OpenSMILE_EPR-globaloutput-GRL_esd_fcn"],
    "methods": ["OpenSMILE_EPR-globaloutput-GRL_esd_fcn", "OpenSMILE_SER-globaloutput-GRL_esd_fcn", "relative-attributes_OpenSMILE_esd"],
    "GRL": ["OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn", "OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn", "OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput_esd_fcn", "OpenSMILE-OpenSMILE-WavLM_SER-globaloutput_esd_fcn"],
    "SSL": [
        "OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn", "WavLM_EPR-globaloutput-GRL_esd_fcn",
        "OpenSMILE-OpenSMILE-cHubert_EPR-globaloutput-GRL_esd_fcn", "cHubert_EPR-globaloutput-GRL_esd_fcn",
        "OpenSMILE-OpenSMILE-emotion2vec_EPR-globaloutput-GRL_esd_fcn", "emotion2vec_EPR-globaloutput-GRL_esd_fcn",
           ],
}

intensitylabels = ["L", "M", "H"]
emos = ["Angry", "Happy", "Sad", "Surprise"]

mainmodels = list(set([b for a in [experiments[exid] for exid in ["main", "features", "methods"]] for b in a]))
mainmodels.sort()
mainmodels.remove("msemotts_OpenSMILE_esd")
modelnames = list(set([b for a in list(experiments.values()) for b in a]))
modelnames.sort()

In [7]:
def get_control_result(contents, name, sampleid, segment):
    arrays = []
    for idx, a in enumerate(contents):
        if idx>=48:
            continue
        mn, fileid, emotion = a["TestID"].split("---")[-3:]
        allocation = {item[0]: item[2] for item in a["PresentationOrder"].split(", ")}
        res = [allocation[key] for key in a["Preference"]]
        array = [mn, fileid, emotion, *res]
        arrays += [array]

    columns = ["model name", "file id", "emotion", "Lowest Prediction", "Highest Prediction"]
    sampledf = pd.DataFrame(np.array(arrays), columns=columns)
    samplearrays = []
    for mn in modelnames:
        for fileid in sampledf["file id"].unique():
            params = {"model name": [mn], "file id": [fileid]}
            modeldf = sampledf[get_bool_base_on_conditions(sampledf, params)]
            if len(modeldf)>0:
                int_array = []
                for emotion in emos:
                    params = {"emotion": [emotion]}
                    emotiondf = modeldf[get_bool_base_on_conditions(modeldf, params)]
                    a = [(emotiondf["Lowest Prediction"]==label).sum() for label in intensitylabels]
                    b = [(emotiondf["Highest Prediction"]==label).sum() for label in intensitylabels]
                    int_array += [a + b]
                array = list(np.array(int_array).reshape(-1)) + list(np.sum(np.array(int_array), axis=0))
            else:
                array = [None]*30
            samplearrays += [[name, sampleid, fileid, mn, *array]]

    columns = []
    for cl in ["Test Name", "Test ID", "File ID", "model name"]:
        columns += [("basic", "", cl)]
    for emotion in emos + ["total"]:
        for key in ["LP", "HP"]:
            for label in intensitylabels:
                columns += [(f"{segment}-level control", emotion, f"{key}-{label}")]
    result = pd.DataFrame(np.array(samplearrays), columns=pd.MultiIndex.from_tuples(columns))
    return result

def get_mushra_result(contents, name, sampleid, metric):
    arrays = []
    for idx, a in enumerate(contents):
        if idx>=100:
            continue
        if len(a)==1:
            continue
        _, fileid = a["TestID"].split("---")
        ratings = a["rating"]
        arrays += [[name, sampleid, fileid, key, ratings[key]]for key in ratings]
    columns = []
    for cl in ["Test Name", "Test ID", "File ID", "model name"]:
        columns += [("basic",  cl)]
    for label in [f"{metric}"]:
        columns += [(metric, "score")]
    result = pd.DataFrame(np.array(arrays), columns=pd.MultiIndex.from_tuples(columns))
    result.loc[:, [metric]] = result.loc[:, [metric]].values.astype(float)
    return result

def get_contents(path):
    contents = get_json(path)
    name = contents[-1]["UserName"]
    sampleid = path[:-5].split("_")[-1]
    return contents, name, sampleid

In [8]:
resultfiles = glob.glob(f"./web_service/results/*.json")
resultfiles.sort()
resultfiles = resultfiles[11:]

In [20]:
not_found = []
results = {key: [] for key in ["WC", "UC", "SIM", "NAT"]}
for path in resultfiles:
    contents, name, sampleid = get_contents(path)
    if len(contents)==2:
        continue
    if len(contents[0]["TestID"].split("---"))==3:
        testtype = "WC"
        result = get_control_result(contents, name, sampleid, "word")
    else:
        testtype = contents[0]["TestID"].split("---")[0]
        if testtype=="WC":
            result = get_control_result(contents, name, sampleid, "word")
        elif testtype=="UC":
            try:
                result = get_control_result(contents, name, sampleid, "utterance")
            except TypeError:
                not_found += [path]
        elif testtype=="SIM":
            result = get_mushra_result(contents, name, sampleid, "SIM")
        elif testtype=="NAT":
            result = get_mushra_result(contents, name, sampleid, "NAT")
    results[testtype] += [result]

# Check Individually

In [28]:
idx = 2
segment = "word"
label = "WC" if segment=="word" else "UC"

result = results[label][idx]
print(result[("basic", "", "Test Name")][0])
df = result.groupby([("basic","", "Test ID"), ("basic", "", "model name")]).sum()[(f"{segment}-level control", "total")]
df.index = [key[1] for key in df.index]
df[df.sum(axis=1)>0]

flora


Unnamed: 0,LP-L,LP-M,LP-H,HP-L,HP-M,HP-H
OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn,7,0,1,0,1,7
OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn,6,1,1,1,3,4
OpenSMILE_EPR-globaloutput-GRL_esd_fcn,7,0,1,0,3,5
OpenSMILE_SER-globaloutput-GRL_esd_fcn,4,2,2,1,3,4
WavLM_EPR-globaloutput-GRL_esd_fcn,7,0,1,1,1,6
relative-attributes_OpenSMILE_esd,8,0,0,0,2,6


In [37]:
idx = 5
metric = "NAT"

result = results[metric][idx]
print(result[("basic", "Test Name")][0])
result.groupby([("basic","Test Name"), ("basic","model name")]).mean()[metric]

flora


Unnamed: 0_level_0,Unnamed: 1_level_0,score
"(basic, Test Name)","(basic, model name)",Unnamed: 2_level_1
flora,OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn,31.5
flora,OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn,27.7
flora,ground_truth,38.7
flora,msemotts_OpenSMILE_esd,29.1
flora,reconstruct,35.7
flora,relative-attributes_OpenSMILE_esd,27.1


# Summary

In [327]:
def color_gradient(val, min_val=-1, max_val=1, color='red'):
    if val < min_val: val = min_val
    if val > max_val: val = max_val
    normalized_val = (val - min_val) / (max_val - min_val)
    
    if color == 'red':
        red_intensity = int(255 * normalized_val)
        return f'background-color: rgb({red_intensity}, 0, 0)'
    elif color == 'green':
        green_intensity = int(255 * normalized_val)
        return f'background-color: rgb(0, {green_intensity}, 0)'
    elif color == 'blue':
        blue_intensity = int(255 * normalized_val)
        return f'background-color: rgb(0, 0, {blue_intensity})'
    else:
        return 'background-color: none'

def apply_color_gradient(df, min_val=0, max_val=1.0):
    """
    df.style.apply(lambda x:apply_color_gradient(x, min_val=0, max_val=100), axis=None)
    """
    styles = df.copy()
    for col in styles.columns:
        color = 'red' if "LP" in col else "green"
        color = "blue" if col=="mean" else color
        styles[col] = df[col].apply(color_gradient, min_val=min_val, max_val=max_val, color=color)
    return styles

- Emotion Control

In [334]:
excluded_list = []

models = list(set([l for key in ["main", "features", "methods"] for l in experiments[key]]))
models.remove("msemotts_OpenSMILE_esd")
models.sort()
dfdir = {}
for segment in ["word", "utterance"]:
    label = "WC" if segment=="word" else "UC"
    df_list = []
    for idx in range(len(results[label])):
        result = results[label][idx]
        if result[("basic", "", "Test Name")][0] in excluded_list:
            continue
        df = result.groupby([("basic","", "Test ID"), ("basic", "", "model name")]).sum()[(f"{segment}-level control", "total")]
        df.index = [key[1] for key in df.index]
        df = df[df.sum(axis=1)>0].loc[models]
        df_list += [df]

    dfdir[segment] = df_list[0]
    for df in df_list[1:]:
        dfdir[segment] = dfdir[segment] + df
    total = dfdir[segment].sum(axis=1).values[0]/2
    dfdir[segment] = np.round(dfdir[segment]/total*100).astype(int)

In [335]:
dfdir["word"].style.apply(lambda x:apply_color_gradient(x, min_val=0, max_val=100), axis=None)

Unnamed: 0,LP-L,LP-M,LP-H,HP-L,HP-M,HP-H
OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn,100,0,0,0,6,94
OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn,81,6,12,19,6,75
OpenSMILE_EPR-globaloutput-GRL_esd_fcn,100,0,0,0,12,88
OpenSMILE_SER-globaloutput-GRL_esd_fcn,88,6,6,0,25,75
WavLM_EPR-globaloutput-GRL_esd_fcn,88,6,6,0,38,62
relative-attributes_OpenSMILE_esd,75,6,19,12,19,69


- MUSHRA

In [337]:
standardization = False
excluded_list = []

models = list(set([l for key in ["main"] for l in experiments[key]]))
models.sort()
dfdir = {}
for metric in ["NAT", "SIM"]:
    add = ["ground_truth", "reconstruct"] if metric=="NAT" else []
    df_list = []
        
    df = pd.concat([results[metric][idx] for idx in range(len(results[metric]))], axis=0)
    params = {("basic", "Test Name"): list(set(list(df[("basic", "Test Name")].unique())) - set(excluded_list))}
    df = df[get_bool_base_on_conditions(df, params, True)]
    if standardization:
        for testid in df[("basic", "Test ID")].unique():
            a = df[get_bool_base_on_conditions(df, {("basic", "Test ID"): [testid]}, True)]
            col = (metric, "score")
            df.loc[get_bool_base_on_conditions(df, {("basic", "Test ID"): [testid]}, True), col]  = (a[col] - a[col].mean()) / a[col].std() 
    arrays = []
    for mn in add+models:
        params = {("basic", "model name"): [mn]}
        d = df[get_bool_base_on_conditions(df, params, True)]
        values = d[(metric, "score")].values
        mean = np.mean(values)
        ivl = st.t.interval(alpha=0.95, df=len(values)-1, loc=np.mean(values), scale=st.sem(values)) 
        ivl = (ivl[1]-ivl[0])/2
        arrays += [[mean, ivl]]
    df = pd.DataFrame(arrays, index=add+models, columns=["mean", "interval"])
    dfdir[metric] = df

In [338]:
df = dfdir["NAT"]
df.style.apply(lambda x:apply_color_gradient(x, min_val=df["mean"].min(), max_val=df["mean"].max()), axis=None, subset=["mean"])

Unnamed: 0,mean,interval
ground_truth,64.1,11.007271
reconstruct,57.1,12.783689
OpenSMILE-OpenSMILE-WavLM_EPR-globaloutput-GRL_esd_fcn,41.55,8.034072
OpenSMILE-OpenSMILE-WavLM_SER-globaloutput-GRL_esd_fcn,48.3,9.605974
msemotts_OpenSMILE_esd,43.15,10.848369
relative-attributes_OpenSMILE_esd,39.55,8.353452
