## BudsID

In [4]:
import math
import glob
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import find_peaks


# Deep Learning
import tensorflow as tf
import keras
from keras import Sequential, optimizers, initializers, regularizers, metrics
from keras.layers import Conv1D, Conv2D, MaxPooling1D, MaxPooling2D, Input, Dense, Flatten, concatenate, BatchNormalization, Activation, Dropout, LSTM, Reshape, GlobalAveragePooling1D
from keras.models import Model
from sklearn.model_selection import train_test_split, KFold, cross_val_score, StratifiedKFold, ShuffleSplit
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Set path for saved data
dataFolderPath = ''

#### Data Load

In [None]:
# handling for self-reported errors
def errorhandling(df, e):
  for i in range(len(df)):
    if df.trial.iloc[i] in e :
      df.isCorrect.iloc[i] = False

def individual_data_load(participant_num, errors=[]):
  participant_data = glob.glob(dataFolderPath+f'P{participant_num}/{participant_num}_*.csv')
  participant_data.sort()
  participant_data_df_list = []

  for participant_data_file in participant_data:
      temp_df = pd.read_csv(participant_data_file)
      participant_data_df_list.append(temp_df)

  participant_data_df = pd.concat(participant_data_df_list, ignore_index=True)
  errorhandling(participant_data_df, errors)
  return participant_data_df

In [None]:
# participants data load examples
p1_df = individual_data_load(1, [])
p24_df = individual_data_load(24, [62, 109]) # if trial number 62 and 109 are self reported errors.

In [None]:
p1_df

Unnamed: 0,timestamp,subject,ear,block,condition,trial,finger,magX,magY,magZ,accX,accY,accZ,gyrX,gyrY,gyrZ,capacitance,time,isCorrect
0,33383.0,1,right,0,sitting,0,0,-4622,-2849,-7287,-536,-853,-208,-213,48,1226,27,605,True
1,33398.8,1,right,0,sitting,0,0,-4622,-2849,-7287,-536,-853,-208,-213,48,1226,27,605,True
2,33414.0,1,right,0,sitting,0,0,-4998,-1354,-6585,-545,-861,-214,-152,323,933,27,605,True
3,33433.7,1,right,0,sitting,0,0,-4998,-1354,-6585,-502,-896,-182,-30,42,1434,27,637,True
4,33447.8,1,right,0,sitting,0,0,-5539,1363,-4326,-525,-894,-190,-335,-42,988,28,692,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
66945,2930447.9,1,left,5,walking,359,1,319,-2821,-1629,-408,884,-55,677,-2258,-622,83,6796,True
66946,2930464.2,1,left,5,walking,359,1,341,-2801,-1613,-408,884,-55,537,-1379,-268,83,6809,True
66947,2930481.4,1,left,5,walking,359,1,341,-2801,-1613,-408,884,-55,537,-1379,-268,83,6809,True
66948,2930498.7,1,left,5,walking,359,1,321,-2838,-1514,-408,884,-55,146,-1629,-579,83,6858,True


In [None]:
participants_list = [p1_df, p2_df, p3_df, p4_df, p5_df, p6_df, p7_df, p8_df, p9_df, p10_df,
                     p11_df, p12_df, p13_df, p14_df, p15_df, p16_df, p17_df, p18_df, p19_df, p20_df, p21_df, p22_df, p23_df, p24_df]

#### Preprocessing

In [None]:
# Get touch start and end index
def getTouchIndex(input_data):
    threshold = input_data.capacitance.median()+20
    touch_occuring = input_data[input_data.capacitance > threshold]
    if (len(touch_occuring)==0):
      print("no touch")
      return input_data.index[0], input_data.index[0]
    return touch_occuring.index[0], touch_occuring.index[-1]

def sensor_preprocessing(df, before_padding=40, after_padding=40, mag=True, acc=False, gyro=False):
    sensors = []
    fingers = []

    mag_max = np.percentile(df[['magX',  'magY',  'magZ']], 99, axis=0).max()
    mag_min = np.percentile(df[['magX',  'magY',  'magZ']], 1, axis=0).min()

    for trial_index in range(int(df.trial.min()), int(df.trial.max())):
        subject = df.subject.iloc[0]
        temp_data = df[df['trial']==trial_index]
        down, up = getTouchIndex(temp_data) - temp_data.index[0]
        touch_index = int((down+up)/2)

        temp_data_mag = temp_data[['magX',  'magY',  'magZ']]
        mag_parts = temp_data_mag[touch_index-before_padding:touch_index+after_padding]
        if down != up and len(mag_parts) == before_padding+after_padding:
          mag_parts = (mag_parts - mag_min)/(mag_max-mag_min)
          # flip magY for right earbud.
          if temp_data.ear.iloc[0] == 'right':
            mag_parts[['magY']] = 1 - mag_parts[['magY']]

        temp_data_acc = temp_data[['accX',  'accY',  'accZ']]
        acc_parts = temp_data_acc[touch_index-before_padding:touch_index+after_padding]
        if down != up and len(mag_parts) == before_padding+after_padding:
          acc_parts = (acc_parts - acc_parts.min(axis=0).min())/(acc_parts.max(axis=0).max()-acc_parts.min(axis=0).min())

        temp_data_gyr = temp_data[['gyrX',  'gyrY',  'gyrZ']]
        gyr_parts = temp_data_gyr[touch_index-before_padding:touch_index+after_padding]
        if down != up and len(mag_parts) == before_padding+after_padding:
          gyr_parts = (gyr_parts - gyr_parts.min(axis=0).min())/(gyr_parts.max(axis=0).max()-gyr_parts.min(axis=0).min())

        sensor_parts = pd.concat([mag_parts, acc_parts, gyr_parts], axis=1)

        if temp_data.ear.iloc[0] == 'left':
          ear_loc = np.reshape([0], (1,1)).astype(float)
        else:
          ear_loc = np.reshape([1], (1,1)).astype(float)

        if mag == True and acc == True and gyro == True:
          sensor_parts = np.array(sensor_parts[['magX',  'magY',  'magZ',  'accX',  'accY',  'accZ', 'gyrX',  'gyrY',  'gyrZ']]).astype(float)
          if sensor_parts.shape == (before_padding+after_padding,9) and temp_data.isCorrect.iloc[0]:
            sensors.append([sensor_parts, ear_loc])
            fingers.append(temp_data.finger.iloc[0])
        elif mag == True and acc == True and gyro == False:
          sensor_parts = np.array(sensor_parts[['magX',  'magY',  'magZ',  'accX',  'accY',  'accZ']]).astype(float)
          if sensor_parts.shape == (before_padding+after_padding,6) and temp_data.isCorrect.iloc[0]:
            sensors.append([sensor_parts, ear_loc])
            fingers.append(temp_data.finger.iloc[0])
        elif mag == True and acc == False and gyro == True:
          sensor_parts = np.array(sensor_parts[['magX',  'magY',  'magZ', 'gyrX',  'gyrY',  'gyrZ']]).astype(float)
          if sensor_parts.shape == (before_padding+after_padding,6) and temp_data.isCorrect.iloc[0]:
            sensors.append([sensor_parts, ear_loc])
            fingers.append(temp_data.finger.iloc[0])
        elif mag == True and acc == False and gyro == False:
          sensor_parts = np.array(sensor_parts[['magX',  'magY',  'magZ']]).astype(float)
          if sensor_parts.shape == (before_padding+after_padding,3) and temp_data.isCorrect.iloc[0]:
            sensors.append([sensor_parts, ear_loc])
            fingers.append(temp_data.finger.iloc[0])
        elif mag == False and acc == True and gyro == True:
          sensor_parts = np.array(sensor_parts[['accX',  'accY',  'accZ', 'gyrX',  'gyrY',  'gyrZ']]).astype(float)
          if sensor_parts.shape == (before_padding+after_padding,6) and temp_data.isCorrect.iloc[0]:
            sensors.append([sensor_parts, ear_loc])
            fingers.append(temp_data.finger.iloc[0])

    return sensors, fingers

def extend_participants_list(p_list, before_padding=40, after_padding=40, mag=True, acc=False, gyro=False):
    sensors = []
    fingers = []
    sensors_list = []
    fingers_list = []
    for i in range(len(p_list)):
      sensor, finger = sensor_preprocessing(p_list[i], before_padding, after_padding, mag, acc, gyro)
      sensors.extend(sensor)
      fingers.extend(finger)
      sensors_list.append(sensor)
      fingers_list.append(finger)

    return sensors, fingers, sensors_list, fingers_list

In [None]:
def load_global_dataset(sensor_data, labels):
    label_check = 3
    sensors = []
    ears = []
    fingers = []

    for i in range(0, len(labels)):
      sensors.append(sensor_data[i][0])
      ears.append(sensor_data[i][1])
      fingers.append(labels[i])

    sensors = np.array(sensors)
    ears = np.array(ears)
    fingers = np.array(fingers)
    fingers = pd.DataFrame(fingers)

    n_split = 10

    skf = StratifiedKFold(n_splits=n_split, shuffle=True, random_state=1)

    sensor_train = []
    sensor_test = []
    ear_train = []
    ear_test = []
    finger_train = []
    finger_test = []

    for train_index, test_index in skf.split(sensors, fingers):
      sensor_train.append(sensors[train_index])
      sensor_test.append(sensors[test_index])
      ear_train.append(ears[train_index])
      ear_test.append(ears[test_index])
      finger_train.append(fingers.loc[train_index])
      finger_test.append(fingers.loc[test_index])

    for i in range(n_split):
      finger_train[i] = tf.keras.utils.to_categorical(finger_train[i])
      finger_test[i] = tf.keras.utils.to_categorical(finger_test[i])

    return sensor_train, ear_train, finger_train, sensor_test, ear_test, finger_test

### Deep Learning

In [None]:
def prepare_model(before_padding=40, after_padding=40, mag=True, acc=False, gyro=False):
    if mag == True and acc == True and gyro == True:
      sensorInput = keras.Input(shape=np.array([before_padding+after_padding, 9]))
    elif mag == True and acc == True and gyro == False:
      sensorInput = keras.Input(shape=np.array([before_padding+after_padding, 6]))
    elif mag == True and acc == False and gyro == True:
      sensorInput = keras.Input(shape=np.array([before_padding+after_padding, 6]))
    elif mag == True and acc == False and gyro == False:
      sensorInput = keras.Input(shape=np.array([before_padding+after_padding, 3]))
    elif mag == False and acc == True and gyro == True:
      sensorInput = keras.Input(shape=np.array([before_padding+after_padding, 6]))

    earInput = keras.Input(shape=np.array([1, 1]), name="earInput")

    s = Conv1D(32, 3, padding="same", activation='relu')(sensorInput)
    s = MaxPooling1D(pool_size=2)(s)
    s = Flatten()(s)
    e = Flatten()(earInput)
    concat = keras.layers.concatenate([s]) # when we only use sensor data
    concat = keras.layers.concatenate([s, e]) # when we include touching ear location data for classifier
    concat = Dense(128, activation="relu")(concat)
    concat = Dense(32, activation="relu")(concat)

    out = Dense(3, activation="softmax")(concat)

    model = Model(inputs=[sensorInput, earInput], outputs=out)
    model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
    return model

def run_general_model(sensors, fingers, before_padding=40, after_padding=40, mag=True, acc=False, gyro=False):
  x1_train, x2_train, x3_train, y_train, x1_test, x2_test, x3_test, y_test = load_global_dataset(sensors, fingers)

  batch_size = 32
  num_classes = 3
  epochs = 60

  accuracy_list = []
  matrix_list = []
  for i in range(len(x1_train)):
    sensors_train = x1_train[i]
    ear_train = x2_train[i]
    sensors_test = x1_test[i]
    ear_test = x2_test[i]

    sensors_train_lists = []
    ear_train_lists = []
    sensors_test_lists = []
    ear_test_lists = []

    for j in range(len(x1_train[i])):
      sensors_train_lists.append(x1_train[i][j])
      ear_train_lists.append(x2_train[i][j])

    for j in range(len(x1_test[i])):
      sensors_test_lists.append(x1_test[i][j])
      ear_test_lists.append(x2_test[i][j])

    model = prepare_model(before_padding, after_padding, mag, acc, gyro)
    model.fit([np.array(sensors_train_lists), np.array(ear_train_lists)], y_train[i], batch_size=batch_size, epochs=epochs, verbose=0,  validation_split=0.2)
    score = model.evaluate([np.array(sensors_test_lists), np.array(ear_test_lists)], y_test[i], verbose=0)
    y_pred = model.predict([np.array(sensors_test_lists), np.array(ear_test_lists)])
    matrix = confusion_matrix(y_test[i].argmax(axis=1), y_pred.argmax(axis=1))
    print('Test Result:', score)
    accuracy_list.append(score[1])
    matrix_list.append(matrix)

  mean_matrix = np.round(np.mean(matrix_list, axis=0),2)

  return np.mean(accuracy_list), mean_matrix

#### General Model

In [None]:
sensors, fingers, sensors_list, fingers_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=False, gyro=False)
print(len(sensors), len(fingers), len(sensors_list), len(fingers_list))

In [None]:
general_score, general_matrix = run_general_model(sensors, fingers, 40, 40, mag=True, acc=False, gyro=False)

#### Individual Model

In [None]:
individual_result_list = []
individual_matrix_list = []
for i in range(len(sensors_list)):
  result, matrix = run_general_model(sensors_list[i], fingers_list[i], 40, 40, mag=True, acc=False, gyro=False)
  print(f'participants {i+1}: {result}')
  individual_result_list.append(result)
  individual_matrix_list.append(matrix)

#### LOOCV Model

In [None]:
def run_loocv_model(sensors_list, fingers_list, test_num, before_padding=40, after_padding=40):
  sensor_train = []
  ear_train = []
  y_train = []
  sensor_test = []
  ear_test = []
  y_test = []
  for i in range(len(sensors_list)):
    if test_num == i:
      for j in range(len(sensors_list[i])):
        sensor_test.append(sensors_list[i][j][0])
        ear_test.append(sensors_list[i][j][1])
      y_test = fingers_list[i]
    else:
      for j in range(len(sensors_list[i])):
        sensor_train.append(sensors_list[i][j][0])
        ear_train.append(sensors_list[i][j][1])
      y_train.extend(fingers_list[i])

  sensors_train = np.array(sensor_train)
  sensors_test = np.array(sensor_test)

  ear_train = np.array(ear_train)
  ear_test = np.array(ear_test)

  y_train = np.array(y_train)
  y_train = pd.DataFrame(y_train)
  y_train = tf.keras.utils.to_categorical(y_train)

  y_test = np.array(y_test)
  y_test = pd.DataFrame(y_test)
  y_test = tf.keras.utils.to_categorical(y_test)

  sensors_train_lists = []
  ear_train_lists = []
  sensors_test_lists = []
  ear_test_lists = []

  for j in range(len(sensor_train)):
    sensors_train_lists.append(sensor_train[j])
    ear_train_lists.append(ear_train[j])

  for j in range(len(sensor_test)):
    sensors_test_lists.append(sensor_test[j])
    ear_test_lists.append(ear_test[j])

  batch_size = 32
  num_classes = 3
  epochs = 50

  model = prepare_model(before_padding, after_padding)
  model.fit([np.array(sensors_train_lists), np.array(ear_train_lists)], y_train, batch_size=batch_size, epochs=epochs, verbose=0,  validation_split=0.2)
  score = model.evaluate([np.array(sensors_test_lists), np.array(ear_test_lists)], y_test, verbose=0)
  y_pred = model.predict([np.array(sensors_test_lists), np.array(ear_test_lists)])
  matrix = confusion_matrix(y_test.argmax(axis=1), y_pred.argmax(axis=1))
  print(f'**** P{test_num+1} ****')
  print('Test Result:', score)
  return(score[1], matrix)

In [None]:
loocv_result_list = []
loocv_matrix_list = []
for i in range(len(sensors_list)):
  result, matrix = run_loocv_model(sensors_list, fingers_list, i)
  loocv_result_list.append(result)
  loocv_matrix_list.append(matrix)

#### Input feature variation


In [None]:
mag_sensors, mag_fingers, mag_sensors_list, mag_fingers_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=False, gyro=False)
acc_sensors, acc_fingers, acc_sensors_list, acc_fingers_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=True, gyro=False)
gyro_sensors, gyro_fingers, gyro_sensors_list, gyro_fingers_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=False, gyro=True)
acc_gyro_sensors, acc_gyro_fingers, acc_gyro_sensors_list, acc_gyro_fingers_list = extend_participants_list(participants_list, 40, 40, mag=False, acc=True, gyro=True)
mag_acc_gyro_sensors, mag_acc_gyro_fingers, mag_acc_gyro_sensors_list, mag_acc_gyro_fingers_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=True, gyro=True)

#### Input duration variation

In [None]:
#30/0 (0.5s/0s)
sensors_30_0, fingers_30_0, sensors_30_0_list, fingers_30_0_list = extend_participants_list(participants_list, 30, 0, mag=True, acc=False, gyro=False)
#40/0 (0.66s/0s)
sensors_40_0, fingers_40_0, sensors_40_0_list, fingers_40_0_list = extend_participants_list(participants_list, 40, 0, mag=True, acc=False, gyro=False)
#50/0 (0.83s/0s)
sensors_50_0, fingers_50_0, sensors_50_0_list, fingers_50_0_list = extend_participants_list(participants_list, 50, 0, mag=True, acc=False, gyro=False)

#30/30 (0.5s/0.5s)
sensors_30_30, fingers_30_30, sensors_30_30_list, fingers_30_30_list = extend_participants_list(participants_list, 30, 30, mag=True, acc=False, gyro=False)
#40/40 (0.66s/0.66s)
sensors_40_40, fingers_40_40, sensors_40_40_list, fingers_40_40_list = extend_participants_list(participants_list, 40, 40, mag=True, acc=False, gyro=False)
#50/50 (0.83s/0.83s)
sensors_50_50, fingers_50_50, sensors_50_50_list, fingers_50_50_list = extend_participants_list(participants_list, 50, 50, mag=True, acc=False, gyro=False)

#0/30 (0s/0.5s)
sensors_0_30, fingers_0_30, sensors_0_30_list, fingers_0_30_list = extend_participants_list(participants_list, 0, 30, mag=True, acc=False, gyro=False)
#0/40 (0s/0.66s)
sensors_0_40, fingers_0_40, sensors_0_40_list, fingers_0_40_list = extend_participants_list(participants_list, 0, 40, mag=True, acc=False, gyro=False)
#0/50 (0s/0.83s)
sensors_0_50, fingers_0_50, sensors_0_50_list, fingers_0_50_list = extend_participants_list(participants_list, 0, 50, mag=True, acc=False, gyro=False)