In [None]:
import os
import glob
import subprocess
import scipy
from collections import defaultdict
import numpy as np
import sklearn
from multiprocessing import Pool
from functools import partial
import matlab
import matlab.engine
import librosa
import matplotlib.pyplot as plt

# Data preprocess

In [None]:
# emotion = ['exc', 'ang', 'sad', 'sur', 'fru', 'hap', 'neu']
emotions = ['ang', 'exc', 'hap', 'sad', 'neu']
emotion2idx = {}
emotion2idx['neu'] = 0
emotion2idx['hap'] = 1
emotion2idx['exc'] = 1
emotion2idx['ang'] = 2
emotion2idx['sad'] = 3
# for idx, emo in enumerate(emotions):
#     emotion2idx[emo] = idx

In [None]:
dataset_dir = "./data/IEMOCAP"
if not os.path.exists(dataset_dir):
    os.makedirs(dataset_dir)
dataset_list = os.path.join(dataset_dir, "IEMOCAP.txt")
dataset_path = '/datasets/IEMOCAP_full_release/'
sesstion_list = os.listdir(dataset_path)
speaker_dict = defaultdict(list)
emotion_dict = defaultdict(list)

In [None]:
with open(dataset_list, "w") as f:
    for session in sesstion_list:
        if not session.startswith("Session"):
            continue
        path_to_wav = dataset_path + session + '/sentences/wav/'
        path_to_emotions = dataset_path + session + '/dialog/EmoEvaluation/'
        path_to_transcriptions = dataset_path + session + '/dialog/transcriptions/'
        impro_evals = glob.glob(os.path.join(path_to_emotions, "*impro*"))
        emotion_sta = {}
        for impro_eval in impro_evals:
            for line in open(impro_eval).readlines():
                if not line.startswith("["):
                    continue
                cur_emo = line.strip().split("\t")[2]
                if cur_emo not in emotions:
                    continue
                if cur_emo not in emotion_sta:
                    emotion_sta[cur_emo] = 1
                else:
                    emotion_sta[cur_emo] += 1
                cur_wav = path_to_wav + line.strip().split("\t")[1][:line.strip().split("\t")[1].rfind("_")] + \
                          "/" + line.strip().split("\t")[1] + ".wav"
                wav_name = os.path.basename(cur_wav).split(".")[0]
                line = "{} {} {}\n" . format(wav_name, cur_wav, cur_emo)
                f.write(line)
                emotion_dict[cur_emo].append(line)

        print("{} has done, statistics is {}" . format(session, emotion_sta))

# TEO feature extraction

In [None]:
sample_list = open(dataset_list).readlines()
file_list = [line.strip().split(" ")[1] for line in sample_list]

critical_bands = [100, 200, 300, 400, 510, 630, 770, 920, 1080, 1270, 1480,
                  1720, 2000, 2320, 2700, 3150, 3700]

In [None]:
def extract_TEO_feature(wav_file, output_dir, voiced_region=0):
    try:
        wav_name = os.path.basename(wav_file).split(".")[0]
        output_path = os.path.join(output_dir, wav_name + ".fea")
        if os.path.exists(output_path):
            return
        lines = []
        for i in range(1, len(critical_bands)):
            band_range = [critical_bands[i-1], critical_bands[i]]
            cur_feature = eng.teo_cb_auto_env(wav_file, matlab.double(band_range), voiced_region)
            cur_fea = " " . join([str(fea) for fea in cur_feature[0]])
            lines.append(cur_fea + "\n")
        with open(output_path, "w") as f:
            f.writelines(lines)
        print("{} has done." . format(wav_file))
    except:
        print("{} has some problem." . format(wav_file))

In [None]:
def extract_TEO_feature_thread(file_list, output_dir, voiced_region=0):
    eng = matlab.engine.start_matlab()
    for wav_file in file_list:
        try:
            wav_name = os.path.basename(wav_file).split(".")[0]
            output_path = os.path.join(output_dir, wav_name + ".fea")
            if os.path.exists(output_path):
                continue
            lines = []
            for i in range(1, len(critical_bands)):
                band_range = [critical_bands[i-1], critical_bands[i]]
                cur_feature = eng.teo_cb_auto_env(wav_file, matlab.double(band_range), voiced_region)
                cur_fea = " " . join([str(fea) for fea in cur_feature[0]])
                lines.append(cur_fea + "\n")
            with open(output_path, "w") as f:
                f.writelines(lines)
#         print("{} has done." . format(wav_file))
        except:
            print("{} has some problem." . format(wav_file))
    eng.quit()

In [None]:
def extract_glottal_feature(file_list, output_dir):
    eng = matlab.engine.start_matlab()
    for wav_file in file_list:
        wav_name = os.path.basename(wav_file).split(".")[0]
        output_path = os.path.join(output_dir, wav_name + ".fea")
#             if os.path.exists(output_path):
#                 continue
        lines = []
        [t_feature, dh12, psp, hrf] = eng.glottal_feature(wav_file, nargout=4)
        for feature in t_feature:
            cur_fea = " " . join([str(fea) for fea in feature])
            lines.append(cur_fea + "\n")
        cur_fea = " " . join([str(fea) for fea in dh12[0]])
        lines.append(cur_fea + "\n")
        cur_fea = " " . join([str(fea) for fea in psp[0]])
        lines.append(cur_fea + "\n")
        cur_fea = " " . join([str(fea) for fea in hrf[0]])
        lines.append(cur_fea + "\n")            
        with open(output_path, "w") as f:
            f.writelines(lines)
#         print("{} has done." . format(wav_file))
#        except:
#            print("{} has some problem." . format(wav_file))
    eng.quit()

In [None]:
glottal_dir = '/home/cqm/codes/paper/data/glottal_feature_test'
sample_list = open('/home/cqm/codes/paper/data/IEMOCAP/testwav.txt').readlines()
file_list = [line.strip().split(" ")[1] for line in sample_list]
extract_glottal_feature(file_list, glottal_dir)

In [None]:
import math
import threading
sample_list = open(dataset_list).readlines()
file_list = [line.strip().split(" ")[1] for line in sample_list]
def multi_thread_glottal(file_list, output_dir, num_thread):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    num_per_thread = int(math.ceil(len(file_list) / float(num_thread)))
    file_split = [file_list[i:i + num_per_thread] for i in range(0, len(file_list), num_per_thread)]
    for i in range(num_thread):
        cur_file_list = file_split[i]
        t = threading.Thread(target=extract_glottal_feature, args=(file_list, output_dir))
        t.start()

glottal_dir = '/home/cqm/codes/paper/data/glottal_feature'
multi_thread_glottal(file_list, glottal_dir, 16)

In [None]:
def extract_opensmile_feature(file_list, output_dir):
    for wav_file in file_list:
        try:
            wav_name = os.path.basename(wav_file).split(".")[0]
            fea_path = os.path.join(output_dir, wav_name + ".fea")
#             if os.path.exists(fea_path):
#                 continue
            lld_path = os.path.join(output_dir, wav_name + ".lld.fea")
            subprocess.Popen(["/home/cqm/opensmile-2.3.0/SMILExtract", "-instname", wav_name, "-C", 
                          "/home/cqm/opensmile-2.3.0/config/gemaps/eGeMAPSv01a.conf", "-I", wav_file,
#                           "/home/zyq/codes/paper/toolkits/opensmile-2.3.0/config/ComParE_2016.conf", "-I", wav_file,
                             "-csvoutput", fea_path, "-D", lld_path])
        except:
            print("{} has some problem." . format(wav_file))

In [None]:
import threading
import math
feature_dir = '/home/cqm/codes/paper/data/opensmile_org/'
def multi_thread(file_list, output_dir, num_thread=8):
    if not os.path.exists(feature_dir):
        os.makedirs(feature_dir)
    num_per_thread = int(math.ceil(len(file_list) / float(num_thread)))
    file_split = [file_list[i:i + num_per_thread] for i in range(0, len(file_list), num_per_thread)]
    for i in range(num_thread):
        cur_file_list = file_split[i]
        t = threading.Thread(target=extract_opensmile_feature, args=(cur_file_list, feature_dir))
        t.start()
multi_thread(file_list, feature_dir, 16)

In [None]:
TEO_dir = '/home/cqm/codes/paper/data/TEO_features'
if not os.path.exists(TEO_dir):
    os.makedirs(TEO_dir)
eng = matlab.engine.start_matlab()
for file in file_list:
    extract_TEO_feature(file, TEO_dir, 0)
eng.quit()

In [None]:
%run extractFeature.py

In [None]:
TEO_dir = '/home/cqm/codes/paper/data/TEO_features_voiced'
if not os.path.exists(TEO_dir):
    os.makedirs(TEO_dir)
eng = matlab.engine.start_matlab()
for file in file_list:
    extract_TEO_feature(file, TEO_dir, 1)
eng.quit()