<a href="https://colab.research.google.com/github/manasarthak/Emotion-classification-using-physiological-signal/blob/main/SAE_Bi_LSTM_Based_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from IPython.utils import io
import numpy as np
import collections

from sklearn.preprocessing import StandardScaler,MinMaxScaler
from sklearn.utils import shuffle

import scipy.io
from scipy import signal,integrate
import matplotlib.pyplot as plt

import keras
from keras.models import Model
from keras.layers import Input,Dense,LSTM,Dropout

import mne
import math

import pickle


n_seg=125
n_points=8064
bottleneck=128

In [None]:
# Load your data
pyt = np.load('DEAP/s01.dat', allow_pickle=True, encoding='latin1')
data = pyt['data'][:, 0:32, 3 * 128:]
labels = pyt['labels'][:, :3]
print(data.shape)
print(labels.shape)
print(np.amax(data))
print(np.amin(data))

In [None]:
def standardize(a,multiple):
    std=np.std(a)
    mean=np.mean(a)
    a=(a-mean)/std
    return multiple*a

In [None]:
def convertAllData():
    valence_labels_all, valence_data_all = [], []
    arousal_labels_all, arousal_data_all = [], []
    dominance_labels_all, dominance_data_all = [], []

    for i in range(32):
        file_number = str(i+1).zfill(2)  # Add leading zero if needed
        file_name = 'DEAP/s' + file_number + '.dat'
        print(file_name)

        valence_labels, valence_data, arousal_labels, arousal_data, dominance_labels, dominance_data = convertOneSubjectData(file_name)

        valence_labels_all.extend(valence_labels)
        valence_data_all.extend([standardize(d, 1) for d in valence_data])

        arousal_labels_all.extend(arousal_labels)
        arousal_data_all.extend([standardize(d, 1) for d in arousal_data])

        dominance_labels_all.extend(dominance_labels)
        dominance_data_all.extend([standardize(d, 1) for d in dominance_data])

    valence_labels_all = np.array(valence_labels_all)
    valence_data_all = np.array(valence_data_all)
    arousal_labels_all = np.array(arousal_labels_all)
    arousal_data_all = np.array(arousal_data_all)
    dominance_labels_all = np.array(dominance_labels_all)
    dominance_data_all = np.array(dominance_data_all)

    print("Valence trial data for all subjects:", valence_labels_all.shape, valence_data_all.shape)
    print("Arousal trial data for all subjects:", arousal_labels_all.shape, arousal_data_all.shape)
    print("Dominance trial data for all subjects:", dominance_labels_all.shape, dominance_data_all.shape)

    # Save numpy arrays of total data to files
    np.save('DEAP/valence/all_valence_labels.npy', valence_labels_all)
    np.save('DEAP/valence/all_valence_data.npy', valence_data_all)
    np.save('DEAP/arousal/all_arousal_labels.npy', arousal_labels_all)
    np.save('DEAP/arousal/all_arousal_data.npy', arousal_data_all)
    np.save('DEAP/dominance/all_dominance_labels.npy', dominance_labels_all)
    np.save('DEAP/dominance/all_dominance_data.npy', dominance_data_all)

In [None]:
convertAllData()

In [None]:
def load_data(dim):
    if dim=='valence':
        labels_all=np.load('DEAP/valence/' + 'all_valence_labels.npy',allow_pickle=True)
        data_all=np.load('DEAP/valence/' + 'all_valence_data.npy',allow_pickle=True)
        print("Valence :",labels_all.shape,data_all.shape)
    elif dim=='arousal':
        labels_all=np.load('DEAP/arousal/'+'all_arousal_labels.npy',allow_pickle=True)
        data_all=np.load('DEAP/arousal/'+'all_arousal_data.npy',allow_pickle=True)
        print("Arousal: ",labels_all.shape,data_all.shape)
    elif dim=='dominance':
        labels_all=np.load('DEAP/dominance/all_dominance_labels.npy',allow_pickle=True)
        data_all=np.load('DEAP/dominance/all_dominance_data.npy',allow_pickle=True)
        print("Dominance: ",labels_all.shape,data_all.shape)
    return labels_all,data_all

In [None]:
#Feature extraction using PSD
def PSD_extraction(data):
    info = mne.create_info(ch_names=['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12'], sfreq=128)
    raw = mne.io.RawArray(data, info, first_samp=0, copy='auto', verbose=None)
    # Changes for hanning window
    n_overlap = n_points // 2  # 50% overlap
    n_per_seg = n_points  # Window size is the same as the data segment size

    psd_origin, f = mne.time_frequency.psd_welch(raw, fmin=0, fmax=60, n_fft=n_points, n_overlap=n_overlap,
                                                 n_per_seg=n_per_seg, picks='all', window='hann', average=None)
    psd = np.moveaxis(psd_origin, -1, 0)
    band_power = []
    for segment in psd:
        segment_band_p = []  # band power for all channels in one segment
        for psd_channel in segment:
            y_int = integrate.cumtrapz(psd_channel, f, initial=0)  # integrate this to calculate bandpower
            one_band_p = np.array([y_int[7] - y_int[4], y_int[13] - y_int[8], y_int[30] - y_int[14], y_int[51] - y_int[31]])
            segment_band_p.append(one_band_p)
        band_power.append(segment_band_p)

    band_power = np.array(band_power)
    band_power = np.moveaxis(band_power, -1, 1)
    band_power = band_power.reshape((n_seg, bottleneck * 4))
    band_power = 10 * band_power
    return band_power

In [None]:
#code for transforming eeg data shape
# change dimension from (849, 32, 7680)to (849, 8064, 32) then to (6846336, 32) for input 32-dimension vector to autoencoder
def vector_transform(data):
    vectors = np.moveaxis(data, 1, -1)
    vectors = vectors.reshape((vectors.shape[0]*vectors.shape[1], vectors.shape[2]))
    return vectors

# change output of autoencoder dimension from (6846336, 12) to (849, 8064, 12) then to (849, 12, 8064)
def inverse_vector_transform(vectors):
    data = vectors.reshape((int(vectors.shape[0]/n_points), n_points, vectors.shape[1]))
    data = np.moveaxis(data, -1, 1)
    return data

In [None]:
#10 fold cross verification(dividing the data so that it beconmes a multiple of 10)
data_all, labels_all = shuffle(data_all_ar, labels_all_ar, random_state=42)
n = len(labels_all)
fold_n = math.floor(n / 10)
data_all, labels_all = data_all[:10 * fold_n], labels_all[:10 * fold_n]

In [None]:
def process(test_fold_number):
    train_data = np.concatenate((data_all[:test_fold_number * fold_n], data_all[fold_n + test_fold_number * fold_n:]), axis=0)
    train_labels = np.concatenate((labels_all[:test_fold_number * fold_n], labels_all[fold_n + test_fold_number * fold_n:]), axis=0)
    test_data = data_all[test_fold_number * fold_n:fold_n + test_fold_number * fold_n]
    test_labels = labels_all[test_fold_number * fold_n:fold_n + test_fold_number * fold_n]

    train_vectors = vector_transform(train_data)
    test_vectors = vector_transform(test_data)

    input_ = Input(shape=(32,))
    encoded = Dense(128, activation=None)(input_)
    bottleneck_layer = Dense(bottleneck, activation=None)(encoded)
    decoded = Dense(128, activation=None)(bottleneck_layer)
    decoded = Dense(32, activation=None)(decoded)
    autoencoder = Model(input_, decoded)

    encoder = Model(input_, bottleneck_layer)

    decoder_input_layer = Input(shape=(bottleneck,))
    decoder_layer = autoencoder.layers[-2](decoder_input_layer)
    decoder_layer = autoencoder.layers[-1](decoder_layer)
    decoder = Model(decoder_input_layer, decoder_layer)

    autoencoder.compile(optimizer='SGD', loss='mse', metrics=['accuracy'])
    autoencoder.fit(train_vectors, train_vectors, epochs=1, batch_size=64, shuffle=True, validation_data=(test_vectors, test_vectors))
    autoencoder.save("../Results/autoencoder_model/autoencoder_model_test_fold_" + str(test_fold_number))

    train_data_encoded = encoder.predict(train_vectors)
    train_data_encoded = inverse_vector_transform(train_data_encoded)
    test_data_encoded = encoder.predict(test_vectors)
    test_data_encoded = inverse_vector_transform(test_data_encoded)

    train_band_power = []
    for data in train_data_encoded:
        with io.capture_output() as captured:
            trial_band_power = PSD_extraction(data)
        train_band_power.append(trial_band_power)
    train_band_power = np.array(train_band_power)

    test_band_power = []
    for data in test_data_encoded:
        with io.capture_output() as captured:
            trial_band_power = PSD_extraction(data)
        test_band_power.append(trial_band_power)
    test_band_power = np.array(test_band_power)

    x = Input(shape=(n_seg, bottleneck * 4))
    x1 = Bidirectional(LSTM(n_seg))(x)
    x2 = Dense(n_seg)(x1)
    output = Dense(1, activation="sigmoid")(x2)
    model = Model(x, output)

    model.compile(optimizer='SGD', loss='mse', metrics=['accuracy'])
    history = model.fit(train_band_power, train_labels, epochs=30, validation_data=(test_band_power, test_labels))
    print("Highest accuracy: " + str(max(history.history['val_accuracy'])))
    model.save("../Results/LSTM_model/LSTM_model_test_fold_" + str(test_fold_number))

In [None]:
for i in range(10):
    print("********** Test Fold " + str(i) + " ************")
    process(i)

Barycenters are loaded
