In [1]:
!pip install hmmlearn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting hmmlearn
  Downloading hmmlearn-0.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (160 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.5/160.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: hmmlearn
Successfully installed hmmlearn-0.3.0


In [2]:
# import libraries
import numpy as np
import pickle
import os
import pandas as pd
import pathlib
import matplotlib.pyplot as plt
%matplotlib inline
import random
import librosa
from hmmlearn import hmm
from scipy.stats import kurtosis, skew
import warnings
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,f1_score,confusion_matrix,roc_auc_score,ConfusionMatrixDisplay,precision_score,recall_score
from preprocess import preprocess
warnings.filterwarnings('ignore')

In [3]:
# get file list for 1 patient
def get_file_list(path):
  file_list = []
  label = []
  for i in os.listdir(path):
    if i[0] == 'E':
      file_list.append(i)
      l = int(i.split("_")[-1].split(".")[0])-1
      label.append(l)
  return label,file_list

In [4]:
# get feature from 1 file and preprocess
def get_feature(path):
    f = []
    a = pd.read_csv(path,names=["vertical","horizontal"])
    a = np.array(a)
    #print(a.shape)
    a = preprocess(a)
    #print(a.shape)
    for j in a[:,0]:
        f.append(j)        
    for j in a[:,1]:
        f.append(j)
    return f

In [5]:
# self identified test_split
def my_train_test_split_user_dependent(path,test_split,val_split,file_list,label):
  X_test = dict({})
  X_train = dict({})
  X_val = dict({})
  for i in range(12):
    X_test[i] = []
    X_train[i] = []
    X_val[i] = []
  for f in range(len(file_list)):
    file = file_list[f]
    file_label = label[f]
    feature = get_feature(str(path+file))
    if file.split('_')[2] in test_split:
      X_test[file_label].append(feature)
    elif file.split('_')[2] == val_split:
      X_val[file_label].append(feature)
    else:
      X_train[file_label].append(feature)
  
  return X_train,X_test,X_val

In [6]:
def evaluate(y_true, y_pred):
    f1_micro = f1_score(y_true, y_pred,average = 'micro')
    f1_macro = f1_score(y_true, y_pred,average = 'macro')
    precision_micro = precision_score(y_true, y_pred, average='micro')
    precision_macro = precision_score(y_true, y_pred, average='macro')
    recall_micro = recall_score(y_true, y_pred, average='micro')
    recall_macro = recall_score(y_true, y_pred, average='macro')
    acc = accuracy_score(y_true, y_pred)

    return f1_micro,f1_macro,precision_micro,precision_macro,recall_micro,recall_macro,acc

## User Dependent

In [7]:
patient = ["001","002","003","004","005","006"]
test_split = [["01","02"],["03","04"],["05","06"],["07","08"],["09","10"]]
val_split = ['03','01',"04","05","06"]

In [8]:
def grid_search(path, file_list, label):
  n_components = [2,3]
  grid_search_output = []
  for ncp in n_components:
    output = dict()
    output["n_components"] = ncp
    
    clf_list = []
    val_pred = []
    val_label = []
    test_pred = []
    test_label = []
    for t in range(len(val_split)):
      X_train,X_test,X_val= my_train_test_split_user_dependent(path,test_split[t],val_split[t],file_list,label)
      for k in range(12):
        X_train_stroke = np.array(X_train[k])
        clf = hmm.GaussianHMM(n_components=ncp,random_state=42)
        clf.fit(X_train_stroke)
        clf_list.append(clf)
      pred = np.zeros((3,12))
      for i in range(12):
        score = np.zeros((3,12))
        for j in range(12):
          X_val_stroke = np.array(X_val[i])
          X_test_stroke = np.array(X_test[i])
          clf = clf_list[j]
          score[0,j] = clf.score(X_val_stroke)
          score[1,j] = clf.score(X_test_stroke[0].reshape(1, -1))
          score[2,j] = clf.score(X_test_stroke[1].reshape(1, -1))
        pred[:,i] = np.argmax(score,axis=1)
      val_pred.extend(list(pred[0,:]))
      val_label.extend(list(np.arange(12)))
      test_pred.extend(list(pred[1,:]))
      test_pred.extend(list(pred[2,:]))
      test_label.extend(list(np.arange(12)))
      test_label.extend(list(np.arange(12)))
    #test_label = np.array(test_label)
    #test_pred = np.array(test_pred)
    val_f1_micro,val_f1_macro,val_precision_micro,val_precision_macro,val_recall_micro,val_recall_macro,val_acc = evaluate(val_label,val_pred)
    test_f1_micro,test_f1_macro,test_precision_micro,test_precision_macro,test_recall_micro,test_recall_macro,test_acc = evaluate(test_label,test_pred)
    output["val_Accuracy"] = val_acc
    output["val_f1_micro"] = val_f1_micro
    output["val_f1_macro"] = val_f1_macro
    output["val_precision_micro"]= val_precision_micro
    output["val_precision_macro"]= val_precision_macro
    output["val_recall_micro"]= val_recall_micro
    output["val_recall_macro"]= val_recall_macro
    
    output["test_Accuracy"] = test_acc
    output["test_f1_micro"] = test_f1_micro
    output["test_f1_macro"] = test_f1_macro
    output["test_precision_micro"]= test_precision_micro
    output["test_precision_macro"]= test_precision_macro
    output["test_recall_micro"]= test_recall_micro
    output["test_recall_macro"]= test_recall_macro
    grid_search_output.append(output)
  return grid_search_output

In [9]:
for p in patient:
    print(p)
    path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+p+"/isolated_strokes/")
    for t in range(len(test_split)):
        print(test_split[t])
        label,file_list = get_file_list(path)
        X_train,X_test,X_val = my_train_test_split_user_dependent(path,test_split[t],val_split[t],file_list,label)

001
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']
002
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']
003
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']
004
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']
005
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']
006
['01', '02']
['03', '04']
['05', '06']
['07', '08']
['09', '10']


In [10]:
p = "006"
path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+p+"/isolated_strokes/")
label,file_list = get_file_list(path)
val_pred = []
val_label = []
test_pred = []
test_label = []
for t in range(len(val_split)):
  X_train,X_test,X_val= my_train_test_split_user_dependent(path,test_split[t],val_split[t],file_list,label)
  clf_list = []
  for k in range(12):
    X_train_stroke = np.array(X_train[k])
    clf = hmm.GaussianHMM(n_components=2, random_state=42)
    clf.fit(X_train_stroke)
    clf_list.append(clf)
  pred = np.zeros((3,12))
  for i in range(12):
    score = np.zeros((3,12))
    for j in range(12):
      X_val_stroke = np.array(X_val[i])
      X_test_stroke = np.array(X_test[i])
      clf = clf_list[j]
      score[0,j] = clf.score(X_val_stroke)
      score[1,j] = clf.score(X_test_stroke[0].reshape(1, -1))
      score[2,j] = clf.score(X_test_stroke[1].reshape(1, -1))
    pred[:,i] = np.argmax(score,axis=1)
  val_pred.extend(list(pred[0,:]))
  val_label.extend(list(np.arange(12)))
  test_pred.extend(list(pred[1,:]))
  test_pred.extend(list(pred[2,:]))
  test_label.extend(list(np.arange(12)))
  test_label.extend(list(np.arange(12)))
val_f1_micro,val_f1_macro,val_precision_micro,val_precision_macro,val_recall_micro,val_recall_macro,val_acc = evaluate(val_label,val_pred)
test_f1_micro,test_f1_macro,test_precision_micro,test_precision_macro,test_recall_micro,test_recall_macro,test_acc = evaluate(test_label,test_pred)
print("validation f1 micro:", val_f1_micro)
print("validation f1 macro:", val_f1_macro)
print("validation precision micro", val_precision_micro)
print("validation precision macro", val_precision_macro)
print("validation recall micro", val_recall_micro)
print("validation recall macro", val_recall_macro)
print("validation accuracy", val_acc)
print("test f1 micro:", test_f1_micro)
print("test f1 macro:", test_f1_macro)
print("test precision micro", test_precision_micro)
print("test precision macro", test_precision_macro)
print("test recall micro", test_recall_micro)
print("test recall macro", test_recall_macro)
print("test accuracy", test_acc)



validation f1 micro: 0.8666666666666667
validation f1 macro: 0.8688552188552188
validation precision micro 0.8666666666666667
validation precision macro 0.8791666666666668
validation recall micro 0.8666666666666667
validation recall macro 0.8666666666666667
validation accuracy 0.8666666666666667
test f1 micro: 0.8583333333333333
test f1 macro: 0.8642874342719544
test precision micro 0.8583333333333333
test precision macro 0.8802188552188553
test recall micro 0.8583333333333333
test recall macro 0.8583333333333333
test accuracy 0.8583333333333333


In [11]:
p = "002"
path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+p+"/isolated_strokes/")
label,file_list = get_file_list(path)
grid_search_output = grid_search(path, file_list, label)



In [12]:
grid_search_output[0]

{'n_components': 2,
 'val_Accuracy': 0.85,
 'val_f1_micro': 0.85,
 'val_f1_macro': 0.8269149831649832,
 'val_precision_micro': 0.85,
 'val_precision_macro': 0.8335137085137085,
 'val_recall_micro': 0.85,
 'val_recall_macro': 0.85,
 'test_Accuracy': 0.875,
 'test_f1_micro': 0.875,
 'test_f1_macro': 0.8650575610273394,
 'test_precision_micro': 0.875,
 'test_precision_macro': 0.917810698073856,
 'test_recall_micro': 0.875,
 'test_recall_macro': 0.875}

In [13]:
grid_search_output[1]

{'n_components': 3,
 'val_Accuracy': 0.8333333333333334,
 'val_f1_micro': 0.8333333333333334,
 'val_f1_macro': 0.805453342953343,
 'val_precision_micro': 0.8333333333333334,
 'val_precision_macro': 0.812037037037037,
 'val_recall_micro': 0.8333333333333334,
 'val_recall_macro': 0.8333333333333334,
 'test_Accuracy': 0.8583333333333333,
 'test_f1_micro': 0.8583333333333333,
 'test_f1_macro': 0.8483744082428294,
 'test_precision_micro': 0.8583333333333333,
 'test_precision_macro': 0.9004569504569505,
 'test_recall_micro': 0.8583333333333333,
 'test_recall_macro': 0.8583333333333334}

In [14]:
X_test = dict({})
X_train = dict({})
X_val = dict({})
for i in range(12):
  X_test[i] = []
  X_train[i] = []
  X_val[i] = []
for f in range(len(file_list)):
  file = file_list[f]
  file_label = label[f]
  feature = get_feature(str(path+file))
  if file.split('_')[2] in ["01", "02","03","04","05","06","07","08","09","10"]:
    X_test[file_label].append(feature)
  elif file.split('_')[2] == ["01", "02","03","04","05","06","07","08","09","10"]:
    X_val[file_label].append(feature)
  else:
    X_train[file_label].append(feature)

In [143]:
# self identified test_split
def my_train_test_split_user_independent(test_patient,val_patient,train_patient):
  X_test = dict({})
  X_train = dict({})
  X_val = dict({})
  for i in range(12):
    X_test[i] = []
    X_train[i] = []
    X_val[i] = []
  for p in train_patient:
    path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+p+"/isolated_strokes/")
    label,file_list = get_file_list(path)
    for i in range(len(file_list)):
      file_p = file_list[i]
      file_label = label[i]
      feature = get_feature(str(path+file_p))
      if file_p.split('_')[2] in ["01", "02","03","04","05","06","07","08","09","10"]:
        X_train[file_label].append(feature)
        
  path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+test_patient+"/isolated_strokes/")
  label,file_list = get_file_list(path)
  for i in range(len(file_list)):
    file_p = file_list[i]
    file_label = label[i]
    feature = get_feature(str(path+file_p))
    if file_p.split('_')[2] in ["01", "02","03","04","05","06","07","08","09","10"]:
      X_test[file_label].append(feature)
    
  path = str("drive/MyDrive/Colab_Notebooks/EOG_data/isolated/"+val_patient+"/isolated_strokes/")
  label,file_list = get_file_list(path)
  for i in range(len(file_list)):
    file_p = file_list[i]
    file_label = label[i]
    feature = get_feature(str(path+file_p))
    if file_p.split('_')[2] in ["01", "02","03","04","05","06","07","08","09","10"]:
      X_val[file_label].append(feature)

  return X_train,X_test, X_val

In [None]:
val_pred = []
val_label = []
test_pred = []
test_label = []
for t in range(len(val_split)):
  X_train,X_test,X_val= my_train_test_split_user_dependent(path,test_split[t],val_split[t],file_list,label)
  clf_list = []
  for k in range(12):
    X_train_stroke = np.array(X_train[k])
    clf = hmm.GaussianHMM(n_components=3, random_state=42)
    clf.fit(X_train_stroke)
    clf_list.append(clf)
  pred = np.zeros((3,12))
  for i in range(12):
    score = np.zeros((3,12))
    for j in range(12):
      X_val_stroke = np.array(X_val[i])
      X_test_stroke = np.array(X_test[i])
      clf = clf_list[j]
      score[0,j] = clf.score(X_val_stroke)
      score[1,j] = clf.score(X_test_stroke[0].reshape(1, -1))
      score[2,j] = clf.score(X_test_stroke[1].reshape(1, -1))
    pred[:,i] = np.argmax(score,axis=1)
  val_pred.extend(list(pred[0,:]))
  val_label.extend(list(np.arange(12)))
  test_pred.extend(list(pred[1,:]))
  test_pred.extend(list(pred[2,:]))
  test_label.extend(list(np.arange(12)))
  test_label.extend(list(np.arange(12)))
val_f1_micro,val_f1_macro,val_precision_micro,val_precision_macro,val_recall_micro,val_recall_macro,val_acc = evaluate(val_label,val_pred)
test_f1_micro,test_f1_macro,test_precision_micro,test_precision_macro,test_recall_micro,test_recall_macro,test_acc = evaluate(test_label,test_pred)
print("validation f1 micro:", val_f1_micro)
print("validation f1 macro:", val_f1_macro)
print("validation precision micro", val_precision_micro)
print("validation precision macro", val_precision_macro)
print("validation recall micro", val_recall_micro)
print("validation recall macro", val_recall_macro)
print("validation accuracy", val_acc)
print("test f1 micro:", test_f1_micro)
print("test f1 macro:", test_f1_macro)
print("test precision micro", test_precision_micro)
print("test precision macro", test_precision_macro)
print("test recall micro", test_recall_micro)
print("test recall macro", test_recall_macro)
print("test accuracy", test_acc)

In [156]:
test_patien_list = ["001","002","003","004","005","006"]
val_patient_list = ["002","003","004","005","006","001"]
#test_patien_list = ["001","002"]
#val_patient_list = ["002","003"]

val_pred = []
val_label = []
test_pred = []
test_label = []
for t in range(len(test_patien_list)):
  train_patient= ["001","002","003","004","005","006"]
  test_patient = test_patien_list[t]
  val_patient = val_patient_list[t]
  train_patient.remove(test_patient)
  train_patient.remove(val_patient)
  X_train,X_test, X_val= my_train_test_split_user_independent(test_patient,val_patient,train_patient)
  clf_list = []
  for k in range(12):
    X_train_stroke = np.array(X_train[k])
    clf = hmm.GaussianHMM(n_components=2, covariance_type="diag", random_state=42)
    clf.fit(X_train_stroke)
    clf_list.append(clf)
  pred_val = np.zeros((10,12))
  pred_test = np.zeros((10,12))
  for i in range(12):
    score_val = np.zeros((10,12))
    score_test = np.zeros((10,12))
    for j in range(12):
      X_val_stroke = np.array(X_val[i])
      X_test_stroke = np.array(X_test[i])
      clf = clf_list[j]
      for d in range(X_val_stroke.shape[0]):
        score_val[d,j] = clf.score(X_val_stroke[d,:].reshape(1, -1))
      for d in range(X_test_stroke.shape[0]):
        score_test[d,j] = clf.score(X_test_stroke[d,:].reshape(1, -1))
    pred_val[:,i] = np.argmax(score_val,axis=1)
    pred_test[:,i] = np.argmax(score_test,axis=1)
  for pred in range(pred_val.shape[0]):
    val_pred.extend(list(pred_val[pred,:]))
    val_label.extend(list(np.arange(12)))
  for pred in range(pred_test.shape[0]):
    test_pred.extend(list(pred_test[pred,:]))
    test_label.extend(list(np.arange(12)))
val_f1_micro,val_f1_macro,val_precision_micro,val_precision_macro,val_recall_micro,val_recall_macro,val_acc = evaluate(val_label,val_pred)
test_f1_micro,test_f1_macro,test_precision_micro,test_precision_macro,test_recall_micro,test_recall_macro,test_acc = evaluate(test_label,test_pred)
print("validation f1 micro:", val_f1_micro)
print("validation f1 macro:", val_f1_macro)
print("validation precision micro", val_precision_micro)
print("validation precision macro", val_precision_macro)
print("validation recall micro", val_recall_micro)
print("validation recall macro", val_recall_macro)
print("validation accuracy", val_acc)
print("test f1 micro:", test_f1_micro)
print("test f1 macro:", test_f1_macro)
print("test precision micro", test_precision_micro)
print("test precision macro", test_precision_macro)
print("test recall micro", test_recall_micro)
print("test recall macro", test_recall_macro)
print("test accuracy", test_acc)

validation f1 micro: 0.45416666666666666
validation f1 macro: 0.4596125878628175
validation precision micro 0.45416666666666666
validation precision macro 0.48979554156904165
validation recall micro 0.45416666666666666
validation recall macro 0.45416666666666666
validation accuracy 0.45416666666666666
test f1 micro: 0.5055555555555555
test f1 macro: 0.5236143113608338
test precision micro 0.5055555555555555
test precision macro 0.5751105235832565
test recall micro 0.5055555555555555
test recall macro 0.5055555555555556
test accuracy 0.5055555555555555


In [153]:
test_patien_list = ["001","002","003","004","005","006"]
val_patient_list = ["002","003","004","005","006","001"]
#test_patien_list = ["001","002"]
#val_patient_list = ["002","003"]

val_pred = []
val_label = []
test_pred = []
test_label = []
for t in range(len(test_patien_list)):
  train_patient= ["001","002","003","004","005","006"]
  test_patient = test_patien_list[t]
  val_patient = val_patient_list[t]
  train_patient.remove(test_patient)
  train_patient.remove(val_patient)
  X_train,X_test, X_val= my_train_test_split_user_independent(test_patient,val_patient,train_patient)
  clf_list = []
  for k in range(12):
    X_train_stroke = np.array(X_train[k])
    clf = hmm.GaussianHMM(n_components=2, covariance_type="spherical", random_state=42)
    clf.fit(X_train_stroke)
    clf_list.append(clf)
  pred_val = np.zeros((10,12))
  pred_test = np.zeros((10,12))
  for i in range(12):
    score_val = np.zeros((10,12))
    score_test = np.zeros((10,12))
    for j in range(12):
      X_val_stroke = np.array(X_val[i])
      X_test_stroke = np.array(X_test[i])
      clf = clf_list[j]
      for d in range(X_val_stroke.shape[0]):
        score_val[d,j] = clf.score(X_val_stroke[d,:].reshape(1, -1))
      for d in range(X_test_stroke.shape[0]):
        score_test[d,j] = clf.score(X_test_stroke[d,:].reshape(1, -1))
    pred_val[:,i] = np.argmax(score_val,axis=1)
    pred_test[:,i] = np.argmax(score_test,axis=1)
  for pred in range(pred_val.shape[0]):
    val_pred.extend(list(pred_val[pred,:]))
    val_label.extend(list(np.arange(12)))
  for pred in range(pred_test.shape[0]):
    test_pred.extend(list(pred_test[pred,:]))
    test_label.extend(list(np.arange(12)))
val_f1_micro,val_f1_macro,val_precision_micro,val_precision_macro,val_recall_micro,val_recall_macro,val_acc = evaluate(val_label,val_pred)
test_f1_micro,test_f1_macro,test_precision_micro,test_precision_macro,test_recall_micro,test_recall_macro,test_acc = evaluate(test_label,test_pred)
print("validation f1 micro:", val_f1_micro)
print("validation f1 macro:", val_f1_macro)
print("validation precision micro", val_precision_micro)
print("validation precision macro", val_precision_macro)
print("validation recall micro", val_recall_micro)
print("validation recall macro", val_recall_macro)
print("validation accuracy", val_acc)
print("test f1 micro:", test_f1_micro)
print("test f1 macro:", test_f1_macro)
print("test precision micro", test_precision_micro)
print("test precision macro", test_precision_macro)
print("test recall micro", test_recall_micro)
print("test recall macro", test_recall_macro)
print("test accuracy", test_acc)

validation f1 micro: 0.5111111111111111
validation f1 macro: 0.5197631658732306
validation precision micro 0.5111111111111111
validation precision macro 0.5639324267590504
validation recall micro 0.5111111111111111
validation recall macro 0.5111111111111111
validation accuracy 0.5111111111111111
test f1 micro: 0.5194444444444445
test f1 macro: 0.5360116176840183
test precision micro 0.5194444444444445
test precision macro 0.6098945369301004
test recall micro 0.5194444444444445
test recall macro 0.5194444444444445
test accuracy 0.5194444444444445
