<a href="https://colab.research.google.com/github/thesalmonification/DSCI400_Revamp/blob/master/MLP_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Imports

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

Add Google Shared Drive

In [0]:
#Written in Google Collab: add drive
from google.colab import drive
drive.mount('/content/drive')

Load Pos/Neg Emotion Sessions

In [0]:
#Load the pos/emo sessions
#Purposefully limit the pos emotions to the first 200; this equalizes the pos/neg amounts
PosEmoSessions = list(pd.read_csv('/content/drive/Shared drives/DSCI400_Revamp/PosEmoSessions.csv',header=None)[0])[:200]
NegEmoSessions = list(pd.read_csv('/content/drive/Shared drives/DSCI400_Revamp/NegEmoSessions.csv',header=None)[0])

#Concatenate all pos/neg emo sessions
all_sessions = PosEmoSessions + NegEmoSessions

Centering Helper Function

In [0]:
#Helper function for centering a data matrix
def center_matrix(pandas_matrix):
  data_array = pandas_matrix.T.to_numpy()

  mean_array = np.mean(data_array,axis=1)
  mean_array = np.reshape(mean_array,(-1,1))
  mean_array = np.repeat(mean_array,6080,axis=1)


  data_array = data_array - mean_array

  return data_array

Process Waveforms using Centering and FFT

In [0]:
#List of channels and conversion dictionary (pos = 0, neg = 1)
chs = ['Fp1','Fp2','AF3','AF4','F7','F3','Fz','F4','F8','FC5','FC1','FC2','FC6','T7','C3','Cz','C4','T8','CP5','CP1','CP2','CP6','P7','P3','Pz','P4','P8','PO3','PO4','O1','Oz','O2']
to_binary_dict = {'0':0,'4':0,'6':0,'11':0,'1':1,'2':1,'3':1,'5':1,'12':1}


###############################################################################
#Iterate over the first session

#Center Data and FFT it
data_pd = pd.read_hdf('/content/drive/Shared drives/DSCI400_Revamp/Waveform_Vocoded_Equalized_Downsampled_Data.h5',key=all_sessions[0])
data_array = center_matrix(data_pd)
data_array = np.fft.fft(data_array)

#Treat electrodes separately; add 32 label values 
label_data = pd.read_hdf('/content/drive/Shared drives/DSCI400_Revamp/Label_Data.h5',key=all_sessions[0]+'_labels').T
label_list = [to_binary_dict[label_data['feltEmo'].to_numpy()[0]]] * 32
###############################################################################
#Iterate over the other sessions

for session in all_sessions[1:]:
  #Center and FFT
  data_pd = pd.read_hdf('/content/drive/Shared drives/DSCI400_Revamp/Waveform_Vocoded_Equalized_Downsampled_Data.h5',key=session)
  session_array = center_matrix(data_pd)
  session_array = np.fft.fft(session_array)
  data_array = np.vstack((data_array,session_array))

  #Add label values
  label_data = pd.read_hdf('/content/drive/Shared drives/DSCI400_Revamp/Label_Data.h5',key=session+'_labels').T
  label_list = label_list + [to_binary_dict[label_data['feltEmo'].to_numpy()[0]]] * 32


Truncate FFT Coefficients

In [0]:
#Truncate to first 50 coefficients (approx 0-3 Hz)
data_array = data_array[:,:50]

#Display some random FFT magnitudes to verify
for i in range(467,478):
  plt.plot(abs(data_array[i,:]))

plt.title('FFT Magnitude')
plt.xlabel('FFT Coefficient')
plt.ylabel('Magnitude')
plt.show() #Shows successfully truncated waveforms

Define Keras Model

In [0]:
#Make a Keras Model
def KerasModel(lr):
  """
  Creates a Keras MLP model for FFT input.
  """
  #Paste the Keras Sequential model code here...
  #Ensure that you have a learning rate varible "lr" in the optimizer declaration

  model = tf.keras.Sequential(name='MLP')

  model.add(tf.keras.layers.Dense(320,input_shape=(50,),activation='relu',name='50_Nodes'))

  model.add(tf.keras.layers.Dropout(0.6))

  model.add(tf.keras.layers.Dense(80,activation='relu',name='80_Nodes')) #80

  model.add(tf.keras.layers.Dropout(0.5,name='0.5_Dropout'))

  model.add(tf.keras.layers.Dense(30,name='30_Nodes')) #30

  model.add(tf.keras.layers.Dropout(0.6,name='0.6_Dropout2'))

  model.add(tf.keras.layers.Dense(1, activation='sigmoid',name='1_Node'))

  #YOUR OPTIMIZER MUST CONTAIN A LEARNING RATE!
  sgd = tf.keras.optimizers.Adam(lr = lr, decay = 1e-5)

  model.compile(loss='binary_crossentropy',optimizer=sgd,metrics=['accuracy'])
  model.summary()

  return model


Make Test Train Split

In [0]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(data_array, label_list, test_size=0.33)

Train and Validate Model

In [0]:
## Generates a FFT input MLP and creates train vs validation graphs for loss 
## and accuracy over 100 epochs
lr = 0.001
#Make Model
model2 = KerasModel(lr)
model2.summary()

#Train Model
history = model2.fit(X_train, np.array(y_train), epochs=100, batch_size=32, validation_data = (X_test, np.array(y_test)), verbose=1)
test_loss2, test_acc2 = model2.evaluate(X_test,np.array(y_test))

# Plot training & validation accuracy values
plt.figure(figsize=(12,10))
plt.subplot(2, 1, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Time Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(2, 1, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Time Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='lower left')

#Save figure and plot
#plt.savefig('/content/drive/Shared drives/DSCI400_Revamp/Code/Modeling/MLP_TrainVal_Curves.png')
plt.show()

#Destroy the old model to prevent overloading GPU
tf.keras.backend.clear_session()

Plot ROC Curve

In [0]:
## Trains a model using the epochs determined from the two cells above and 
## generates an ROC curve
from sklearn.metrics import roc_curve, auc
# Model training
model_auc = KerasModel(lr)
model_auc.fit(X_train, np.array(y_train), epochs=25, batch_size=32, validation_data = (X_test, np.array(y_test)), verbose=1)

# ROC creation
y_pred_auc = model_auc.predict(X_test).ravel()

#print(sum(y_pred_auc > 0.5))

fpr_auc, tpr_auc, threshold_auc = roc_curve(np.array(y_test), y_pred_auc)
auc_forimg = auc(fpr_auc, tpr_auc)

plt.plot(fpr_auc, tpr_auc, label = 'Model AUC = {:.3f}'.format(auc_forimg))
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC Curve')
plt.legend(loc='best')
#plt.savefig('/content/drive/Shared drives/DSCI400_Revamp/Code/Modeling/MLP_ROC.png')
plt.show()

Perform and Plot 10-Fold Cross Validation

In [0]:
#Perform n-fold cross validation
from sklearn.model_selection import KFold


label_array = np.array(label_list)
fold_accuracies = []
n_split=10
 
#For each fold, generate model and test it
for train_index,test_index in KFold(n_split,shuffle=True).split(data_array):
  x_train,x_test=data_array[train_index],data_array[test_index]
  y_train,y_test=label_array[train_index],label_array[test_index]


  model = KerasModel(0.001)

  history = model.fit(x_train,y_train,epochs=25,batch_size=BATCH_SIZE, verbose=1)
  test_loss, test_acc = model.evaluate(x_test,y_test)
  fold_accuracies.append(test_acc)

  #Destroy the old model to prevent overloading GPU
  tf.keras.backend.clear_session()
  
  #Print the number of 0's/1's produced to verify actual classification
  print("The Number of 1's is:" + str(np.count_nonzero(model.predict_classes(X_test))))
  print("The Number of 0's is:" + str(np.count_nonzero(model.predict_classes(X_test) ==0 )))
  print('-------------------------------------------')

#Display histogram of 10-fold values
plt.hist(np.array(fold_accuracies) * 100)
plt.title('N-fold Cross Validation: ' + str(int(np.mean(fold_accuracies) * 100)) + '% Avg. Accuracy')
plt.xlabel('Accuracy (%)')
plt.ylabel('Number of Folds')