In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Activation, Reshape, Conv2D, MaxPooling2D, Flatten
from keras.preprocessing.sequence import pad_sequences
import numpy as np
from sklearn.metrics import confusion_matrix
import random
import timeit
from sklearn.utils import class_weight
from keras import optimizers
start = timeit.default_timer()



EMOTION = {'ang': 0, 'hap' : 1, 'sad' : 2, 'neu' : 3, 'fru' : 4, 'exc': 5,
           'fea' : 6,'sur' : 7,'dis' : 8, 'oth' : 9, 'xxx':10}



METHOD = {'audio_feature':0, 'LSTM':1, 'DRCNN' : 2}

#Method for classification
method = METHOD['DRCNN']

#If data is processed and saved into files, just reload, dont need to re-process
isRawDataProcessed = True

#Development mode. Only run with small data.
dev = False


LABLE = {'W':0, 'L':1, 'E':2, 'A':3, 'F':4, 'T':5, 'N': 6}
LABLE = {'W':0, 'A':1, 'F':2, 'T':3, 'N': 4}





In [None]:
import os
from os.path import basename
import cv2

##Load the array of input files and output lables. 
path_processed_data = "processed-data-Emo"

image_paths = []
labels = []

for f in os.listdir(path_processed_data):
    if not f.startswith(".") and os.path.splitext(f)[1] == ".png":
        
        file_name = f.split(".")[0]
        label_code = file_name[5]
        if (label_code == 'L' or label_code == 'E'):
            continue
            
        lable_int = LABLE[label_code]
        labels.append(lable_int)

        image_paths.append(path_processed_data  + "/" + f)
        
print("num data:", len(image_paths))
#Shuffle data and split into train and test set
c = list(zip(image_paths, labels))
random.shuffle(c)
image_paths, labels = zip(*c)

num_sample_train_data = int(0.6 * len(image_paths))
train_image_paths = image_paths[0:num_sample_train_data]
train_labels = labels[0:num_sample_train_data]




valid_test_image_paths = image_paths[num_sample_train_data:]
valid_test_labels = labels[num_sample_train_data:]
number_sample_valid_data = int(1 * len(valid_test_image_paths))
valid_image_paths = valid_test_image_paths[0:number_sample_valid_data]
valid_labels = valid_test_labels[0:number_sample_valid_data]



In [None]:
num_class = 5
batch_size = 20
width_image = 256
channels = 3 #RGB
num_epochs = 200


def generate_batch_data(data_in, data_out, batch_size):
    while True:
        for i in range(0, len(data_in), batch_size):
            # create Numpy arrays of input data
            # and labels, from each line in the file
            x, y = process_data(data_in[i:i+batch_size], data_out[i:i+batch_size])
            #Normalize data
            x = x / 255.0
            yield (x, y)


##Function to transform these above array into data used for training and testing
def process_data(image_paths, labels):
    input = []
    output = []
    for path,label in zip(image_paths, labels):
        img = cv2.imread(path)
        #img = cv2.resize(img, (256, 256))
        input.append(img)
        
        one_hot_coding_output = np.zeros((1,num_class), dtype=int)[0]
        #print("this is label: ",label )
        one_hot_coding_output[label] = 1
        output.append(one_hot_coding_output)
    
    input = np.array(input)
    output = np.array(output)
    return input, output


if method == METHOD['DRCNN']: 
    
       
    model = Sequential()
    
    #C1,P1
    model.add(Conv2D(8, kernel_size=(3, 3), strides=(1, 1),
                 activation='relu',
                 input_shape=(width_image, width_image, channels)))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    
    #C2,P2
    model.add(Conv2D(16, kernel_size=(3, 3), strides=(1, 1),
                 activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    
    #C3
    model.add(Conv2D(32, kernel_size=(3, 3), strides=(1, 1),
                 activation='relu'))
    
    #C4
    model.add(Conv2D(64, kernel_size=(3, 3), strides=(1, 1),
                 activation='relu'))
    
    #C5, P5
    model.add(Conv2D(128, kernel_size=(3, 3), strides=(1, 1),
                 activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
    
    model.add(Flatten())
    
    #F6, F7, F8
    model.add(Dense(128, activation='relu'))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_class, activation='softmax'))
    
    
    
    sgd = optimizers.SGD(lr=0.001, decay=1e-05, momentum=0.9, nesterov=True)
   
    model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy'])
    

    
    model.fit_generator(generate_batch_data(train_image_paths, train_labels, batch_size),
                    steps_per_epoch = int(len(train_image_paths) / batch_size), epochs=num_epochs, 
                    validation_data = generate_batch_data(valid_image_paths, valid_labels, batch_size), 
                    validation_steps = int(len(valid_labels)/batch_size), shuffle=True)
    

    
        
        
    
  
    #Your statements here

    stop = timeit.default_timer()

    print ("Time for training and testing: ", stop - start, "(s)") 