# Hierarchical Emotion Distribution (ED) Generation
---
This notebook provides code to automatically generate the hierarchical emotion distribution (ED) from OpenSMILE's acoustic features. It first extracts the distance of the dataset point the decision boudary for each emotion presence classifier. Then, it normalizes those values within the dataset to [0, 1]. We provide the pretrained SVM (`linearsvm_OpenSMILE.pkl`) and the scaler (`scaler_OpenSMILE.pkl`) in our repository so please download it and put them into `Summary-Hierarchical-ED/implementation/parameters/`. 

---

In [None]:
import warnings 
warnings.filterwarnings("ignore")

import pandas as pd
import glob
import numpy as np
import os
from IPython.display import clear_output
import pickle
from tqdm import tqdm

def GetIntensity(scaler, models, feature):
    feature = scaler.transform(feature)
    length_array = []
    for emotion in emos:
        bool_list = np.isnan(feature.astype(float)).sum(axis=1).astype(bool)
        feature[bool_list] = 0
        array = models[emotion].decision_function(feature)
        array[bool_list] = np.nan
        length_array += [array]
    length_array = np.array(length_array)
    return length_array

def get_words_indices(word_dir):
    words_indices = []
    for i, w in enumerate(word_dir):
        words_indices += [i]*len(word_dir[w])
    words_indices = np.array(words_indices)
    return words_indices

def get_boollist_fastspeech2(words_dir):
    bl = [not(key in sil_phones) for key in [e for wd in words_dir.values() for e in wd]]
    bl1 = bl.copy()
    for idx in range(1, len(bl1)-1):
        if not(bl1[idx]):
            if bl1[idx-1]:
                bl1[idx] = True
    bl2 = bl[::-1].copy()
    for idx in range(1, len(bl2)-1):
        if not(bl2[idx]):
            if bl2[idx-1]:
                bl2[idx] = True
    newbl = np.array(bl1)*np.array(bl2[::-1])
    return newbl

def GetMinMax_NoOutliers(outputs):
    q1, q3 = np.quantile(outputs, [0.25,0.75])
    iqr = q3-q1
    bool_list = (q1-1.5*iqr<=outputs)*(q3+1.5*iqr>=outputs)
    min_ = outputs[bool_list].min()
    max_ = outputs[bool_list].max()
    return min_, max_, bool_list

def normalize_svm(x, min_, max_):
    x[x>0] = x[x>0]/max_
    x[x<0] = -x[x<0]/min_
    return (x+1)/2

---
# Automatic Extraction of Distance from Decision Boundary
---

This example demonstrates how to extract the distance from the decision boundary for each emotion classifier, using individual SVM classifiers for each emotion.

- **`hed_extractor_path`**:  
  A string indicating the path to the pretrained SVM model.

- **`scaler_path`**:  
  A string that indicates the path to the pretrained scaler.
  
- **`dataset_dir`**:  
  A string that indicates the path to the dataset directory.

- **`feature_dir`**:  
  A string that specifies the directory where the processed features will be saved. This includes both the OpenSMILE features and the additional features generated later in the process.

- **`depth`**:  
  An integer that defines the directory depth of each wav file relative to `dataset_dir`. For instance, in ESD, `depth=3` because the file path follows the structure:  
  `[speaker]/[emotion]/[data split]/[speaker]_[filename].wav`.

- **`wav2gt`**:  
  A dictionary that maps each wav file path (key) to its corresponding TextGrid file (value).

- **`reset`**:  
  A boolean value that indicates whether to reset the feature generation process. If set to `False`, the feature generation will be skipped if the feature path already exists.

The code generates one file:

- **`feature`** (shape: `(12, phoneme_length)`):  
   A matrix where each element represents the distance from the SVM decision boundary. The rows are organized as follows:
   - **First four rows:** Phoneme-level distances.
   - **Next four rows:** Word-level distances.
   - **Last four rows:** Utterance-level distances.
   
   Within each group of four rows, the rows correspond to the following emotions:
   - Row 4n: Angry
   - Row 4n+1: Happy
   - Row 4n+2: Sad
   - Row 4n+3: Surprise

   This file is saved in `[feature_dir]/HED/raw/`.
  
---

In [None]:
###########################################
########## Adjustable Parameters ##########
###########################################

hed_extractor_path = '../parameters/linearsvm_OpenSMILE.pkl'
scaler_path = '../parameters/scaler_OpenSMILE.pkl'
dataset_dir = "../Dataset/ESD/"
feature_dir = "../Features/ESD/"
depth = 3
wav2tgt = {path: ("../Dataset/ESD/textgrid_corpus_directory/"+"/".join(path.split("/")[-(depth+1):])).replace(".wav", ".TextGrid") for path in glob.glob(dataset_dir + "*/"*depth + "*")}
reset = False

###########################################
###########################################
###########################################

emos = ["Angry", "Happy", "Sad", "Surprise"]
emos.sort()
models = pickle.load(open(hed_extractor_path, 'rb'))
scaler = pickle.load(open(scaler_path, 'rb'))
split_list = ["utt", "words", "phones"]
sil_phones = ["sil", "sp", "spn"]

nonexists = []
files = glob.glob(feature_dir+"opensmile/"+"*/"*depth+"*.npy")
files.sort()
for path in tqdm(files):
    dn = "/".join(path.split("/")[-(depth+1):-1])+"/"
    bn = os.path.basename(path)[:-4]
    savepath = f"{feature_dir}HED/raw/{dn}{bn}.npy"
    if not(reset) and os.path.exists(savepath):
        continue

    features = np.load(path, allow_pickle=True).item()
    try:
        words_dir = np.load(path.replace("opensmile", "words_phones_dir"), allow_pickle=True).item()
    except EOFError:
        nonexists += [dn+bn]
        continue
        
    bl = get_boollist_fastspeech2(words_dir)
    words_indices = get_words_indices(words_dir)[bl]

    iw = GetIntensity(scaler, models, features["words"])
    ip = GetIntensity(scaler, models, features["phones"][:len(bl)])[:,bl]
    iu = GetIntensity(scaler, models, features["utterance"])

    iw = iw[:, words_indices]
    iu = np.repeat(iu, ip.shape[1], axis=1)

    feature = np.concatenate([ip, iw, iu], axis=0)
    
    os.makedirs(os.path.dirname(savepath), exist_ok=True)
    np.save(savepath, feature)

---
# Automatic Extraction of Hierarchical Emotion Distribution (ED)
---

This example shows how to extract the hierarchical emotion distribution using a min-max normalization approach. In this process, outliers are removed to ensure a more robust calculation.


- **`training_files`**:  
  A list of files from the training dataset. These files are used to calculate the minimum and maximum distance values required for normalization.

  
- **`reset`**:  
  A boolean value that indicates whether to reset the feature generation process. If set to `False`, the feature generation will be skipped if the feature path already exists.

The code generates one file:

- **`feature`** (shape: `(12, phoneme_length)`):  
   A matrix where each element represents the intensity of an emotion for a specific phoneme. The rows are organized into three levels:
   - **First four rows:** Phoneme-level distances.
   - **Next four rows:** Word-level distances.
   - **Last four rows:** Utterance-level distances.
   
   Within each group of four rows, the rows correspond to the following emotions:
   - Row 4n: Angry
   - Row 4n+1: Happy
   - Row 4n+2: Sad
   - Row 4n+3: Surprise

   This file is saved in `[feature_dir]/HED/raw/`.

---

In [None]:
###########################################
########## Adjustable Parameters ##########
###########################################

training_files = glob.glob(feature_dir+"HED/raw/*/*/train/*")
reset = True

###########################################
###########################################
###########################################

print("####################################")
print("Compute Min and Max of Training Data")
print("####################################")
print()

training_files.sort()

arrays = []
for path in tqdm(training_files):
    feature = np.load(path)
    arrays += [feature[8:12,0]]
    
print()
min_list = []
max_list = []
for e in range(len(emos)):
    bl = (1-np.isnan(np.array(arrays)).mean(axis=1).astype(bool)).astype(bool)
    min_, max_, _ = GetMinMax_NoOutliers(np.array(arrays)[bl][:, e])
    min_list.append(min_)
    max_list.append(max_)
    print(f"Emotion: {emos[e]}")
    print(f"    Minimum Value: {min_}")
    print(f"    Maximum Value: {max_}")
    
print()
print("##################################")
print("Compute Normalized Hierarchical ED")
print("##################################")
print()
    
files = glob.glob(feature_dir+"HED/raw/"+"*/"*depth+"*.npy")
files.sort()
for path in tqdm(files):
    dn = "/".join(path.split("/")[-(depth+1):-1])+"/"
    bn = os.path.basename(path)[:-4]

    savepath = f"{feature_dir}HED/normalized/{dn}{bn}.npy"
    if not(reset) and os.path.exists(savepath):
        continue
    try:
        a = np.load(path)
    except(FileNotFoundError, ValueError) as error:
        continue

    for s, segment in enumerate(["phones", "words"]):
        for e in range(len(emos)):
            b = normalize_svm(a[s*len(emos)+e], min_list[e], max_list[e])
            b[b<0] = 0
            b[b>1] = 1
            ser = pd.Series(b)
            ser.interpolate(method="linear", limit_direction="both", inplace=True)
            a[s*len(emos)+e] = ser.values

    for e in range(len(emos)):
        iu = normalize_svm(a[8+e], min_list[e], max_list[e])
        iu[iu<0] = 0
        iu[iu>1] = 1
        a[8+e] = iu

    a[np.isnan(a)] = 0 # this happens when all features are nan
    os.makedirs(os.path.dirname(savepath), exist_ok=True)
    np.save(savepath, a)