# Begin

In [None]:
## installing some packages
!pip install mlxtend==0.17
!pip install -U keras-tuner

In [None]:
# save dir
import os
root = os.getcwd()
print(root)

import numpy as np
import scipy

from scipy import signal
from scipy.signal import find_peaks, convolve
import pandas as pd
from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
%matplotlib inline

emotion_keys = ["No Emotion", "Anger", "Hate", "Grief","P-Love", "R-Love","Joy", "Reverence"]
neu_keys = ["Neutral", "Negative", "Positive"]

results_fold = "results/7s-overlapping"
fold_dir = os.path.join(root, results_fold)
print(fold_dir)

In [None]:
# function to plot confusion matrix
def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize = False,
                          savefig = False,
                          figname = "confusion_matrix.png"):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions
    savefig:      If False, do not save the figure
                  If True, plot the figure with the given name defined by figname argument
    figname:      name of the figure to save

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy
    height, width = cm.shape
    offset = 0.5
    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.hlines(y=np.arange(height+1)- offset, xmin=-offset, xmax=width-offset)
    plt.vlines(x=np.arange(width+1) - offset, ymin=-offset, ymax=height-offset)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    cmn = 100 * cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize == True:
            plt.text(j, i, "{:0.2f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        elif normalize == False:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}\n {:0.2f} %".format(cm[i, j], cmn[i, j]),
                     horizontalalignment="center",
                     verticalalignment='center',
                     color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    # plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.xlabel('Predicted label\naccuracy={:0.2f} %; misclass={:0.2f} %'.format(100*accuracy, 100*misclass))

    if savefig:
        plt.savefig(figname, bbox_inches = "tight")
        print("saved image to drive")
    plt.show()

In [None]:
def segmenting_overlapping(data, slice_length = 10, overlap = 7, sampling_rate = 20):
    # function to segment 1d vector signal
    # data: all data -shape (N, channels, signal_length)
    # slice_length: length of the slice in seconds
    # overlap: length of the overlapping in seconds
    # sampling_rate: sampling rate of the signal
    segmented_data = []
    data_shape = data.shape
    len_signal = data_shape[2]
    len_std = sampling_rate * slice_length
    for sset in data:
        segmented_set = []
        for signal in sset:
            segments_list = []
            
            slices = np.arange(0, int(len_signal/sampling_rate)+1, slice_length-overlap, dtype=np.int) # 26 is arbitrary
            # print(slices)
            for start, end in zip(slices[:-1], slices[1:]):
                start_segment = start * sampling_rate
                end_segment = min((end + overlap)*sampling_rate, len_signal) 
#                 print(start_audio, end_audio)
                if end_segment-start_segment == len_std:
                    segments_list.append(signal[start_segment:end_segment])
            segmented_set.append(segments_list)
            segments_list = []
        segmented_data.append(segmented_set)
        segmented_set = []
    segmented_data = np.array(segmented_data)
    return segmented_data

# features extraction

In [None]:
# Extract features from signals

class Function(object):
    """Wraper class for funcions"""

    def __init__(self, func, description="Simple function", fun_type='transform'):
        self.func = func
        self.description = description
        self.func_type = fun_type

    def __call__(self, data):
        return self.func(data)

    def __repr__(self):
        return self.func_type + ":" + self.description
    
    ## statistical features
    def calculate_statistics(self, signal):
        # signal = signal.reshape((-1, 1))
        mean = np.nanmean(signal)
        median = np.median(signal, axis = 0)
        maximum = np.max(signal, axis = 0)
        minimum = np.min(signal, axis = 0)
        std = np.std(signal, axis = 0)
        variance = np.var(signal, axis = 0)
        _range = np.ptp(signal, axis = 0)
        skewness = scipy.stats.skew(signal, axis = 0)
        kurtosis = scipy.stats.kurtosis(signal, axis = 0)
        
        return [mean, median, maximum, minimum, std, variance, _range, skewness, kurtosis]

    def difference_absolute_values_mean(self, signal, different_num, round_number = 2):
        value = 0
        for i in range(len(signal)- different_num):
            value += abs(signal[i+different_num] - signal[i])
        mean_value = round(value/(len(signal) - different_num), round_number )

        return mean_value


    def normalize_zeromean_variance(self, signal, mean, std):
        normalized_signal = (signal - mean)/(std + np.finfo(float).eps)

        return normalized_signal
    
    def normalize_signal(self, signal):
        mean, std = np.nanmean(signal), np.nanstd(signal)
        normalized_signal = self.normalize_zeromean_variance(signal, mean, std)
        
        return normalized_signal
    
#     def BVP_features(signal):

class FeatureExtractor(Function):
    def __init__(self):
        fnc = Function(self)
        # self.all_data = all_data
        self.features_list = []


    def fit(self, all_data):
        print(all_data.shape)
        # assert self.all_data.shape[2] > self.all_data.shape[1], "data shape must be (N, channels, data)" 
        for signal_set in all_data:
            features = []
            for signal in signal_set:

                statistic_features = self.calculate_statistics(signal)
                first_diff_mean = self.difference_absolute_values_mean(signal,1)
                second_diff_mean = self.difference_absolute_values_mean(signal, 2)
                normalized_signal = self.normalize_signal(signal)
                first_diff_mean_normalized = self.difference_absolute_values_mean(normalized_signal, 1)
                second_diff_mean_normalized = self.difference_absolute_values_mean(normalized_signal, 2)

                features += statistic_features
                features.append(first_diff_mean)
                features.append(second_diff_mean)
                features.append(first_diff_mean_normalized)
                features.append(second_diff_mean_normalized)        

            self.features_list.append(features)
        return np.array(self.features_list)

In [None]:
# load data
data = np.load("data.npy")
print(data.shape)
labels  = np.load("labels.npy")
labels_number = np.ndarray((160))

# EMG = data[:, :, 0]
# BVP = data[:, :, 1]
# GSR = data[:, :, 2]
# RES = data[:, :, 3]
for i,key in enumerate(emotion_keys):
    labels_number[np.where(labels == key)] = i

# cut original data to segments using segmententing_overlapping function
slice_length = 10 # length of the segment = 10s
sampling_rate = 20 # sampling rate of raw signal
overlap = 0 # overlapping ratio percentage

data_cut = segmenting_overlapping(np.transpose(data, (0, 2, 1)), slice_length = slice_length, overlap = overlap, sampling_rate = sampling_rate)    
print("Data_cut", data_cut.shape)

## transpose to get (160, 31, 4, 200) - (N_samples, N_segments, N_channels, segment_length) 
datax = np.transpose(data_cut, (0, 2, 1, 3))
print("Datax", datax.shape)
## generate labels
y = np.zeros((20, datax.shape[1]))
labelx = y
for i in range(1,8):
    labelx = np.concatenate((labelx, y+i))

# reshape to get data in the expected form (N_samples * N_segments, N_channels, segment_length)
# each segment is now considered as a new sample. 
dataxx = np.reshape(datax, (-1, 4, datax.shape[3]))
print("Dataxx", dataxx.shape)
labelxx = np.reshape(labelx, (-1))
print("Labelxx", labelxx.shape)
print(len(list(np.where(labelxx==7))[0]))

# Extracting features
feature_list = FeatureExtractor().fit(dataxx)
print("Feature", feature_list.shape)

# feature_list_original = FeatureExtractor().fit(np.transpose(data, (0, 2, 1)))
# y_test_original = labels_number

# save features
np.save("Feature data/features_" + str(overlap) + "s.npy", feature_list)
np.save("Label/" + str(overlap) + "sec_labels.npy", labelxx)

# Split data

In [None]:
from sklearn.svm import SVC
from sklearn import svm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier, plot_tree

from sklearn.model_selection import GridSearchCV, train_test_split 
from sklearn.model_selection import LeaveOneOut,  KFold, StratifiedKFold
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score
from sklearn.preprocessing import StandardScaler
from mlxtend.feature_selection import SequentialFeatureSelector as sfs
from sklearn.feature_selection import RFE

from sklearn.pipeline import make_pipeline
from sklearn.pipeline import Pipeline

import pandas as pd

In [None]:
overlap = 0
feature_list = np.load("Feature data/features_" + str(overlap) + "s.npy")
labelxx = np.load("Label/" + str(overlap) + "sec_labels.npy")
len_data = dataxx.shape[2]
print(feature_list.shape)
print(len_data)

next_day = (int)(labelxx.shape[0]/160)
emo_step = (int)(labelxx.shape[0]/8)
seg = (int)(next_day/2)
no_sample = 1
no_exclude = (int)((len_data - 1)/(len_data - overlap*len_data/10))
print("No exclude ", no_exclude)

X_features_train = feature_list[0:1]
X_features_test = feature_list[0:1]
y_train = labelxx[0:1]
y_test = labelxx[0:1]
for i in range(0, 8):
    for j in range(0, 20):
        X_features_train = np.concatenate((X_features_train, feature_list[emo_step*i + next_day*j: emo_step*i + next_day*j + seg - no_exclude]))
        X_features_test = np.concatenate((X_features_test, feature_list[emo_step*i + next_day*j + seg: emo_step*i + next_day*j + seg + no_sample]))
#         X_features_train = np.concatenate((X_features_train, feature_list[emo_step*i + next_day*j + seg + no_sample: emo_step*i + next_day*j + next_day]))     
        X_features_train = np.concatenate((X_features_train, feature_list[emo_step*i + next_day*j + seg + no_sample + no_exclude: emo_step*i + next_day*j + next_day]))
        
        y_train = np.concatenate((y_train, labelxx[emo_step*i + next_day*j: emo_step*i + next_day*j + seg - no_exclude]))
        y_test = np.concatenate((y_test, labelxx[emo_step*i + next_day*j + seg: emo_step*i + next_day*j + seg + no_sample]))
#         y_train = np.concatenate((y_train, labelxx[emo_step*i + next_day*j + seg + no_sample: emo_step*i + next_day*j + next_day]))
        y_train = np.concatenate((y_train, labelxx[emo_step*i + next_day*j + seg + no_sample + no_exclude: emo_step*i + next_day*j + next_day]))

#         X_features_train = np.concatenate((X_features_train, feature_list[emo_step*i + next_day*j + seg + 2: emo_step*i + next_day*j + next_day]))
#         X_features_test = np.concatenate((X_features_test, feature_list[emo_step*i + next_day*j : emo_step*i + next_day*j + seg]))
#         y_train = np.concatenate((y_train, labelxx[emo_step*i + next_day*j + seg + 2: emo_step*i + next_day*j + next_day]))
#         y_test = np.concatenate((y_test, labelxx[emo_step*i + next_day*j : emo_step*i + next_day*j + seg]))

X_features_train = X_features_train[1:]
X_features_test = X_features_test[1:]
y_train = y_train[1:]
y_test = y_test[1:]
# standardize data
sc = StandardScaler()
sc.fit(X_features_train)
X_features_train, X_features_test = sc.transform(X_features_train), sc.transform(X_features_test)

print("Number of train samples: ", X_features_train.shape[0])
print("Number of test samples: ", X_features_test.shape[0])

In [None]:
# check whether the data contains nan or infinity values
print(np.where(np.isinf(X_features_train)))

na = np.where(np.isnan(X_features_train))
print(na)
for i in na:
    print(i)
    print(np.where(np.isnan(X_features_train[i])))

In [None]:
neu = [0, 7]
pos = [4, 5, 6]
negative = [1, 2, 3]
for i in range(len(y_train)):
    if y_train[i] in neu:
        y_train[i] = 0
    elif y_train[i] in pos:
        y_train[i] = 2
    else:
        y_train[i] = 1

for i in range(len(y_test)):
    if y_test[i] in neu:
        y_test[i] = 0
    elif y_test[i] in pos:
        y_test[i] = 2
    else:
        y_test[i] = 1
print(y_test)

In [None]:
rfe_selector = RFE(estimator = RandomForestClassifier(), n_features_to_select = 15, step = 1)
# rfe_selector = RFE(estimator = RandomForestClassifier(random_state = 0, n_estimators = 500, max_depth = 20), n_features_to_select = 30, step = 1)
rfe_selector.fit(X_features_train, y_train)
rfe_support = rfe_selector.get_support()

# feat_cols = [i if rfe_support[i] is True for i in range(rfe_support.size)]
feat_cols = []
for i in range(rfe_support.size):
    if rfe_support[i]:
        feat_cols.append(i)
# [0, 1, 4, 5, 10, 14, 15, 18, 19, 20, 22, 26, 27, 28, 29, 31, 39, 40, 42] 0%
# [0, 1, 4, 5, 6, 10, 14, 15, 17, 18, 19, 20, 26, 27, 28, 29, 39, 40, 42] 10%
print(feat_cols)

X_train = X_features_train[:, feat_cols]
X_test = X_features_test[:, feat_cols]

clf = RandomForestClassifier()
clf.fit(X_train, y_train)
CM = confusion_matrix(y_test, clf.predict(X_test))

fig = plot_confusion_matrix(cm  = CM,
                      normalize    = None,
                      target_names = emotion_keys,
                      title        = "Confusion Matrix - Overlapped data- RandomForest",
                      savefig = True,
                      figname = "Random Forest confusion matrix-both.png")

In [None]:
resDT = []
resLDA = []
for i in range(1, 53):
    rfe_selector = RFE(estimator = DecisionTreeClassifier(), n_features_to_select = i, step = 1)
    rfe_selector.fit(X_features_train, y_train)
    rfe_support = rfe_selector.get_support()
    feat_cols = []
    for i in range(rfe_support.size):
        if rfe_support[i]:
            feat_cols.append(i)
    X_train = X_features_train[:, feat_cols]
    X_test = X_features_test[:, feat_cols]
    
    clf = DecisionTreeClassifier()
    clf.fit(X_train, y_train)
    resDT.append(clf.score(X_test, y_test))
    
    rfe_selector = RFE(estimator = LDA(), n_features_to_select = i, step = 1)
    rfe_selector.fit(X_features_train, y_train)
    rfe_support = rfe_selector.get_support()
    feat_cols = []
    for i in range(rfe_support.size):
        if rfe_support[i]:
            feat_cols.append(i)
    X_train = X_features_train[:, feat_cols]
    X_test = X_features_test[:, feat_cols]
    
    clf = LDA()
    clf.fit(X_train, y_train)
    resLDA.append(clf.score(X_test, y_test))
    
print("DT:", resDT)
print("LDA:", resLDA)

In [None]:
feat_cols = [0, 1, 2, 4, 11, 14, 15, 16, 20, 22, 24, 26, 28, 29, 32, 40, 41, 42, 51]
X_train = X_features_train[:, feat_cols]
X_test = X_features_test[:, feat_cols]

## Choosing only best features
 take only best features set to feed to models

In [None]:
feat_cols = [0, 1, 3, 4, 5, 9, 14, 15, 16, 20, 26, 27, 28, 29, 39, 40, 41, 42, 48]
X_train = X_features_train[:, feat_cols]
X_test = X_features_test[:, feat_cols]

n_features = X_train.shape[1]
print(n_features)

In [None]:
## grid search for SVM classifier
clf = SVC()

params_grid = {"kernel":["rbf"],
               "gamma": [1e-2, 1e-1, 3e-1, 5e-1, 7e-1],
               "C":[10, 100, 1000]}
gs = GridSearchCV(clf, param_grid = params_grid, scoring = "accuracy", cv = 10)
gs.fit(X_train, y_train)
for i in range(len(gs.cv_results_['params'])):
    print(gs.cv_results_['params'][i], 'test acc.:', gs.cv_results_['mean_test_score'][i])

print("Best parameters via GridSearch", gs.best_params_)
print(gs.cv_results_)

In [None]:
## save SVM grid search results
df = pd.DataFrame.from_dict(gs.cv_results_, orient="index")
df.head()
result_dir = os.path.join(root, results_fold)
if mode == 'Forward':
    csv_name = f'Classifier/stratify-split-SFFS-{total_features}-{n_features}-features-SVM-gridsearchcv.csv'
else:
    csv_name = f'Classifier/stratify-split-SBFS-{total_features}-{n_features}-features-SVM-gridsearchcv.csv'
print("SFAs mode: ", mode)
df.to_csv(os.path.join(result_dir, csv_name))

In [None]:
print("overlap", overlap)
C_param = np.arange(100, 1100, 100)
b = [0.1, 1, 10, 10000]
C_param = np.append(C_param, b)
print(C_param)
gamma_param = np.arange(0.1, 2.6, 0.1)
best_res = [0,0,0]

for C in C_param:
    for gamma in gamma_param:
        clf = SVC(kernel = 'rbf', C = C, gamma = gamma);
        clf.fit(X_train, y_train)
        temp = clf.score(X_test, y_test)
        if ( temp > best_res[0]):
            best_res = [temp, C, gamma]
print("Best res:", best_res)

clf = SVC(kernel = 'rbf', C = best_res[1], gamma = best_res[2])
# clf = SVC(kernel = 'rbf', C = 500, gamma = 0.1)

clf.fit(X_train, y_train)
CM = confusion_matrix(y_test, clf.predict(X_test))

fig = plot_confusion_matrix(cm  = CM,
                      normalize    = None,
                      target_names = emotion_keys,
                      title        = "Confusion Matrix - Overlapped data- SVM",
                      savefig = True,
                      figname = "SVM-overlapped-confusionmatrix - both.png")

In [None]:
# Softmax classifier
clf = LogisticRegression(random_state=1, solver='lbfgs', multi_class='multinomial')

clf.fit(X_train, y_train)
print(clf.score(X_train, y_train))
print(clf.score(X_test, y_test))

print(confusion_matrix(clf.predict(X_test), y_test))

In [None]:
# KNN classifier
clf = KNeighborsClassifier(n_neighbors= 1)

clf.fit(X_train, y_train)

print(clf.score(X_test, y_test))
print(confusion_matrix(clf.predict(X_test), y_test))

In [None]:
# Decision tree
clf = DecisionTreeClassifier()
clf.fit(X_train, y_train)
CM = confusion_matrix(y_test, clf.predict(X_test))

print(clf.score(X_test, y_test))
print(CM)

# Random forest classifier


In [None]:
# search params for RF classifier using grid search
# Create the parameter grid 
param_grid = {
    'n_estimators'      : [64, 100, 200, 300, 400, 500, 600],
    'max_depth'         : [30, 40, 50, 100],
    'random_state'      : [0],
    #'max_features': ['auto'],
    #'criterion' :['gini']
}
# Create a based model
rf = RandomForestClassifier()
# Instantiate the grid search model
gs = GridSearchCV(estimator = rf, param_grid = param_grid, 
                          cv = 5, n_jobs = -1, verbose = 2)
gs.fit(X_train, y_train)   

for i in range(len(gs.cv_results_['params'])):
    print(gs.cv_results_['params'][i], 'test acc.:', gs.cv_results_['mean_test_score'][i])

print("Best parameters via GridSearch", gs.best_params_)

In [None]:
# save gridsearch result
df = pd.DataFrame.from_dict(gs.cv_results_, orient="index")
df.head()
result_dir = os.path.join(root, results_fold)
csv_name = f'Classifier/stratify-split-{mode}-{total_features}-{n_features}-features-RF-gridsearchcv.csv'
df.to_csv(os.path.join(result_dir, csv_name))

In [None]:
print("overlap", overlap)
n_param = np.arange(100, 1000, 100)
depth_param = np.arange(20, 150, 10)
best_res = [0,0,0]
for n in n_param:
    for depth in depth_param:
        clf = RandomForestClassifier(random_state = 0, n_estimators = n, max_depth = depth);
        clf.fit(X_train, y_train)
        temp = clf.score(X_test, y_test)
        if ( temp > best_res[0]):
            best_res = [temp, n, depth]
print("Best res: ", best_res)

clf = RandomForestClassifier(random_state = 0, n_estimators = best_res[1], max_depth = best_res[2])
# clf = RandomForestClassifier(random_state = 0, n_estimators = 400, max_depth = 20)
clf.fit(X_train, y_train)
CM = confusion_matrix(y_test, clf.predict(X_test))

fig = plot_confusion_matrix(cm  = CM,
                      normalize    = None,
                      target_names = emotion_keys,
                      title        = "Confusion Matrix - Overlapped data- RandomForest",
                      savefig = True,
                      figname = "Random Forest confusion matrix-both.png")