# Project: Speech Emotional Recognition

## Requirements
1. OpenSmile

## 1. Preprocessing input data

### 1.1. Copying input wav files to data_path/input

Import the required modules.

In [5]:
import os
import shutil
from sklearn.model_selection import train_test_split
import csv
import numpy as np

Assuming all required files are within the "data" directory

In [6]:
data_path = "/work/peiyun/data"
input_path = os.path.join(data_path, "input")

Create input directory (data_path/input) if not exist.

In [17]:
if not os.path.exists(input_path):
    os.makedirs(input_path)

Copying all input files to input_path. (10,039 utterances in total)

In [18]:
print input_path

/work/peiyun/data/input


In [19]:
# for each session
for session in range(1,6):
    path = os.path.join(data_path, "IEMOCAP", "Session" + str(session), "sentences", "wav")
    
    # for each dialog
    for dialog in os.listdir(path):
        dialog_path = os.path.join(path, dialog)
            
        # for each utterance (file)
        for filename in os.listdir(dialog_path):
            if filename.endswith(".wav"):
                shutil.copy(os.path.join(dialog_path, filename), os.path.join(input_path, filename))

### 1.2. Getting a dictionary of utterance labels

In [20]:
label_dict = {}

# for each session
for session in range(1,6):
    path = os.path.join(data_path, "IEMOCAP/Session" + str(session), "dialog", "EmoEvaluation")
    
    # for file in the session
    for filename in os.listdir(path):
        
        # only interested in "summary" txt files
        if filename.endswith(".txt"):
            f = open(os.path.join(path, filename), "r")
            for line in f.readlines():
                if line[0] == "[":
                    name, label = line.split("\t")[1:3]
                    label_dict[name] = label
            f.close()

### 1.3. Split files into training and test sets (70% training, 15% test, 15% dev, seed = 100)
(seed = 100 for reproducibility)

In [21]:
# get filename list
filename_ls = []
for filename in os.listdir(input_path):
    if filename.endswith(".wav"):
        filename_ls.append(filename[:-4])  # [:-4] for removing .wav
        
# get corresponding label list
label_ls = []
for filename in filename_ls:
    label_ls.append(label_dict[filename])
        
# splitting into train and test
filename_train, filename_remain, label_train, label_remain = train_test_split(filename_ls, label_ls, 
                                                                          train_size=0.7, random_state=100)

# splitting into train and test
filename_dev, filename_test, label_dev, label_test = train_test_split(filename_remain, label_remain, 
                                                                          test_size=0.5, random_state=100)

### 1.4. Writing labels into csv file (7027 train instances, 1506 test instances, 1506 dev instances)

In [22]:
with open(os.path.join(data_path, "label.csv") , mode='w') as label_file:
    writer = csv.writer(label_file, delimiter=",")
    
    # training instances
    for filename in filename_train:
        writer.writerow([filename, "train", label_dict[filename]])
            
    # test instances
    for filename in filename_test:
        writer.writerow([filename, "test", label_dict[filename]])
        
    # test instances
    for filename in filename_dev:
        writer.writerow([filename, "dev", label_dict[filename]])
                
    label_file.close()

### 1.5. Move files into test, dev, and train directories

Create test and train directory for input instances if not exist.

In [23]:
for data_type in ["train", "test", "dev"]:
    path = os.path.join(input_path, data_type)
    if not os.path.exists(path):
        os.makedirs(path)

Moving files to its directory.

In [24]:
for filename in filename_train:
    shutil.move(os.path.join(input_path, filename + ".wav"), os.path.join(input_path, "train", filename + ".wav"))
    
for filename in filename_test:
    shutil.move(os.path.join(input_path, filename + ".wav"), os.path.join(input_path, "test", filename + ".wav"))
    
for filename in filename_dev:
    shutil.move(os.path.join(input_path, filename + ".wav"), os.path.join(input_path, "dev", filename + ".wav"))

## 2. Feature Extraction with openSMILE

File for feature extractions.

In [82]:
%%file feature.py

# Import the required modules
import argparse
import os
from subprocess import call
import csv
import sys
import numpy as np
from time import gmtime, strftime, time

# Global variables
data_path = "/work/peiyun/data"

# Get the ground_truth label number of the file
def get_label(label_file, filename):
    
    with open(label_file, mode = "r") as f:
        reader = csv.reader(f)
        for row in reader:
            name, data_type, label = row
            if name == filename:
                return label

# Check input and output directories
def check_dirs(args):
    
    # Check input directory (if not exist -> error)
    if not os.path.exists(os.path.join(data_path, args.input_dir)):
        print "Error: input directory not exist"
        return False
    for data_type in ["train", "test"]:
        path = os.path.join(data_path, args.input_dir, data_type)
        if not (os.path.exists(path)):
            print "Error: input directory missing train or test directories"
            return False
    
    # Check output directory (if not exist -> create one)
    if not os.path.exists(os.path.join(data_path, args.output_dir)):
        os.makedirs(os.path.join(data_path, args.output_dir))
    for data_type in ["train", "test"]:
        path = os.path.join(data_path, args.output_dir, data_type)
        if not os.path.exists(path):
            os.makedirs(path)
    
    return True

# Function for extracting features with openSMILE (Return whether successed)
def extract_features(args):
    if not check_dirs(args):
        return False   # failed to read inputs
            
    # Iterate over wav audio files in input directory
    for data_type in ["train", "test", "dev"]:
        path_in = os.path.join(data_path, args.input_dir, data_type)
        
        for filename in os.listdir(path_in):
            
            # Only interested in wav files
            if filename.endswith(".wav"):
                # in
                file_in = os.path.join(path_in, filename)
                config = os.path.join(data_path, "config", args.config)
                
                filename = filename[:-4]  # [:-4] for removing .wav
                
                # out
                path_out = os.path.join(data_path, args.output_dir, data_type)
                csv_out = os.path.join(path_out, filename + "_" + args.config[:-5] + ".csv")
                arff_out = os.path.join(path_out, filename + "_" + args.config[:-5] + ".arff")  # [:-5] for removing .conf
                label = get_label(os.path.join(data_path, args.label), filename)
                
                # use openSMILE
                call(["SMILExtract", "-l", "0", "-noconsoleoutput", "-I", file_in, 
                      "-C", config, "-D", csv_out, "-O", arff_out, "-instname", filename, "-label", label])
                
    return True

# Obtaining args from terminal
def get_args():
    
    parser = argparse.ArgumentParser(description='Extract features for files in the directory using openSMILE')
    
    parser.add_argument("-i",                   # optional argument (no "-" for positional)
                        "--input_dir",   # name of the attribute (dest)
                        action = "store",       # can be "store", "store_const", "store_true", etc.
                        # nargs = N for associating N args with a single action
                        # const = ... to hold constant values
                        # default = ... to set default value
                        type = str,             # check arg type
                        # choice = [.., .., ..] # restrict set of values
                        required = True,        # make an option required
                        # metavar = "XXX" for changing display name
                        help = "The directory of input audio files (wav)")
    
    parser.add_argument("-o", "--output_dir", type = str, required = True, help = "The directory of results")
    parser.add_argument("-c", "--config", type = str, required = True, help = "Configuration filename")
    parser.add_argument("-l", "--label", type = str, required = True, help = "Label filename")
    args = parser.parse_args()
    
    return args

def main():
    # Obtaining terminal args
    args = get_args()
    
    start_time = time()
    
    # Extracting features according to args
    if not extract_features(args):
        print "Failed to extract features"
    else:
        end_time = time()
        print("Time taken for extracting features:", strftime("%H:%M:%S", gmtime(end_time - start_time)))
        print "Successfully extracted features"

# If running the file directly
if __name__ == "__main__":
    main()

Overwriting feature.py


Running script for extracting features. 

In [None]:
%%!
python feature.py -i "input" -o "output" -c "IS09_emotion.conf" -l "label.csv"

In [83]:
%%!
python feature.py -i "input" -o "output" -c "IS10_paraling.conf" -l "label.csv"

["('Time taken for extracting features:', '00:35:29')",
 'Successfully extracted features']

## 3. Gated Convolutional Network for emotion recognition

Import the required modules.

In [107]:
from keras.models import Sequential
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.preprocessing import sequence

### 3.1. Set parameters for GCN

In [120]:
# Parameters
MAX_LEN = 22500 # 22499 (for each utterance)
DEF_BATCH_SIZE = 4
DEF_NB_EPOCH = 32
DEF_VALIDATION_SPLIT = 0
SHUFFLE = True
DEFAULT_THRESHOLD = 0.5

### 3.2. Extracting the matrices and labels for all data types

Function for obtaining label according to the label.csv file.

In [8]:
# Get the ground_truth label number of the file
def get_label(filename):
    label_file = os.path.join(data_path, "label.csv")
    
    with open(label_file, mode = "r") as f:
        reader = csv.reader(f)
        for row in reader:
            name, data_type, label = row
            if name == filename:
                return label

Function for extracting the corresponding normalised input and the ground truth output of a file.

In [177]:
def extract_Xy(file_path):
    
    # Obtain dataframe for the csv file (one utterance/instance)
    df = pd.read_csv(file_path, sep = ";")
    
    # Get filename
    filename = df["name"][0]
    
    # Clean unnecessary columns
    df = df.drop(columns = ["name", "frameTime"])
    
    # Normalise data
    x = df.values                # dataframe to a numpy array
    min_max_scalar = MinMaxScaler()  
    x_scaled = min_max_scalar.fit_transform(x)     # scaling each feature (each column) to range: (0,1)
    df = pd.DataFrame(x_scaled)  # numpy array back to dataframe
    
    # Obtain matrix X and label y
    x_data = sequence.pad_sequences(df.values.T, padding='post', dtype='float64', maxlen=MAX_LEN).T
    y_data = get_label(filename)
    
    return x_data, y_data

For tests.

In [181]:
# file_path = os.path.join("/Users/peiyuns/Desktop/mydata", "Ses01F_impro06_F021_IS10_paraling.csv")
# print extract_Xy(file_path)

Extract the required matrix and label pairs for each data type and store as numpy files. 
<br>
NOTE: np.load(filename.npy) to load data

In [None]:
for data_type in ["train", "test", "dev"]:
    X = []
    Y = []
    path = os.path.join(data_path, "output", data_type)
    for filename in os.listdir(path):
        x_data, y_data = extract_Xy(os.path.join(path, filename))
        X.append(x_data)
        Y.append(y_data)
        
    # save as numpy file
    np.save(data_type + "_X.npy", X)
    np.save(data_type + "_Y.npy", Y)

Obtain the required data for each data type.

In [None]:
# train
X_train = np.load("train_X.npy")
y_train = np.load("train_Y.npy")

# test
X_test = np.load("test_X.npy")
y_test = np.load("test_Y.npy")

# dev
X_dev = np.load("dev_X.npy")
y_dev = np.load("dev_Y.npy")

Generate the model for Gated Convolutional Network. (1-dimensional)

In [None]:
def generate_model():
    """ Generate model GCNN """
    input_ = Input(shape=(input_shape[1], input_shape[2]))
    gated_dilcnn = gated_block(1, nb_filter, 1)(input_)


    for i in range(0, n_stack - 1):
        block = int((i+1)/(n_stack+1)) + 1
        if not is_dilated:
            block = str(block) + "_" + str(i+1)
        else:
            block = str(block)
        dil = dilation_rate**((i+1)%(n_stack+1))
        
        print(block)
        gated_dilcnn = gated_block(block, nb_filter, dil)(gated_dilcnn)

    gated_dilcnn = Flatten()(gated_dilcnn)
    gated_dilcnn = Dense(256, kernel_initializer='random_uniform')(gated_dilcnn) 
    gated_dilcnn = BatchNormalization()(gated_dilcnn)
    gated_dilcnn = Activation('relu')(gated_dilcnn)
    
    gated_dilcnn = Dropout(0.5)(gated_dilcnn)
    gated_dilcnn = Dense(1, activation='sigmoid')(gated_dilcnn)

    all_model = Model(input_, gated_dilcnn)

    print('\nCompile gated_dilcnn...')
    sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
    all_model.compile(loss='binary_crossentropy',
            optimizer='sgd',
            metrics=['accuracy', precision, recall, fscore])
    all_model.summary()
    return all_model

Function for training a GCN model.

In [None]:
def training()

Begin training model by calling the training function.

In [None]:
training(args.model_name, 0,
         x_train, y_train,
         x_dev, y_dev,
         x_test, y_test,
         DEF_BATCH_SIZE,
         DEF_NB_EPOCH,
         args.split,
         args.params)

In [94]:
model = Sequential()