# EMG based GR - Analysis

# Connecting to Drive, basic defenitions (always run)

In [None]:
%cd '/content/drive/MyDrive/' # folder path
import os
path = os.getcwd() 
print('path: ' + path)

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

**Import packages**

In [None]:
!pip install mne
!pip install hmmlearn

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter, iirnotch, welch, wiener
from math import floor
import mne
import time
import copy
from matplotlib.colors import LogNorm
import pandas as pd
import tensorflow as tf
from hmmlearn import hmm
import pickle
import os
from sklearn.neighbors import KNeighborsClassifier
from sklearn import svm

**File name**

In [None]:
subject_num = '007'
position = '2'
session = '2'
two_files = False
filename = 'data_files/subject_' + subject_num + '/session_' + session + '/GR_' + subject_num + '_pos' + position + '_S' + session + '_Recording_00_SD.edf'

filename1 = 'data_files/subject_' + subject_num + '/session_' + session + '/GR_' + subject_num + '_pos' + position + '_S' + session + '_part1_Recording_00_SD_edited.edf'
filename2 = 'data_files/subject_' + subject_num + '/session_' + session + '/GR_' + subject_num + '_pos' + position + '_S' + session + '_part2_Recording_00_SD.edf'

**Parameters defenitions and helper functions**

In [None]:
#sync_annotations = ['Start', 'Release'] # old annotations
sync_annotations = ['Start_TwoFingers', 'Release_TwoFingers', 'Start_ThreeFingers', 'Release_ThreeFingers', 'Start_Abduction', 'Release_Abduction', 'Start_Fist', 'Release_Fist', 'Start_Bet', 'Release_Bet', 'Start_Gimel', 'Release_Gimel', 'Start_Het', 'Release_Het', 'Start_Tet', 'Release_Tet', 'Start_Kaf', 'Release_Kaf', 'Start_Nun', 'Release_Nun'] # new annotations
sync_annotations_2 = [a + ' ' + str(n) for n in range(1,11) for a in sync_annotations]
sync_annotations.extend(sync_annotations_2)

window_len = 800
gestures_num = 10
repetitions = 10

# Filter parameters:
fl = 20
fh = 400
fs = 4000
order = 4

# Decimation:
decimation_factor = 1

def bandpass_filter(low, high, fs, order):
    nyq = 0.5 * fs
    low = low / nyq
    high = high / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


# Raw files processing

Loading the data

In [None]:
actions = []
actions1 = []
actions2 = []
files_num = 2 if two_files else 1
for file_i in range(files_num):
  if two_files and file_i==0:
    filename = filename1
  if two_files and file_i==1:
    filename = filename2
  # read the raw data and split it into events:
  rawEDF = mne.io.read_raw_edf(filename)
  rawEDF.load_data()
  events = mne.events_from_annotations(rawEDF)
  #print(events)
  annotations = mne.read_annotations(filename)
  event_annotations = annotations.description
  events_types = events[0][:,2]
  events_indices = events[0][:,0]
  if event_annotations.shape[0] != events_indices.shape[0]:
      dif = event_annotations.shape[0] - events_indices.shape[0]
      event_annotations = event_annotations[dif:]
  mask = [a in sync_annotations for a in event_annotations]
  events_types = events_types[mask]
  events_indices = events_indices[mask]
  #relevant_annotations = ['thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end']
  #relevant_annotations = sync_annotations*(gestures_num*repetitions) # old annotations
  relevant_annotations = sync_annotations*(repetitions) # new annotations
  print('Number of events:')
  print(len(events_indices))
  #print('Relevant annotations in files:')
  files_annotations = event_annotations[mask]
  #print(files_annotations)
  print('Wrong annotations in file:')
  flag = False
  for i in range(len(files_annotations)-1):
    if files_annotations[i]==files_annotations[i+1]:
      print('annotation number '+str(i)+', at '+str(events_indices[i]/4000)+' seconds')
      flag = True
  if not flag:
    print('None')
  
  data_pd = rawEDF.to_data_frame(picks='all')
  annotations_dict = {}
  count = 0
  for i in range(len(events_indices)-1):
      if 'Start' in files_annotations[i]:
          count+=1
          event_key = files_annotations[i][6:]
          if event_key[-1]>='1' and event_key[-1]<='9':
            event_key = event_key[:-2]
          if event_key[-1]=='0':
            event_key = event_key[:-3]
          if event_key not in annotations_dict:
            annotations_dict[event_key] = [data_pd.iloc[events_indices[i]:events_indices[i+1],:17]]
          else:
            annotations_dict[event_key].append(data_pd.iloc[events_indices[i]:events_indices[i+1],:17])
          #actions.append(data_pd.iloc[events_indices[i]:events_indices[i+1],:17])
  if two_files and file_i==0:
    annotations_dict1 = copy.deepcopy(annotations_dict)
  if two_files and file_i==1:
    annotations_dict2 = copy.deepcopy(annotations_dict)

if two_files: # merge actions from the two files
  annotations_dict = {}
  for key in annotations_dict1.keys():
    annotations_dict[key] = annotations_dict1[key]
    if key in annotations_dict2.keys():
      annotations_dict[key].extend(annotations_dict2[key])

actions_order = ['TwoFingers', 'ThreeFingers', 'Abduction', 'Fist', 'Bet', 'Gimel', 'Het', 'Tet', 'Kaf', 'Nun']
for a in actions_order:
    actions.extend(annotations_dict[a])
for k in range(len(actions)):
    actions[k] = actions[k].to_numpy()
# Finally we have "actions" list, whose length is the number of events. Each element is a 16-dimensional time series of the EMG data of a specific realization.
events_indices_seg_df = pd.DataFrame(data=events_indices)
events_indices_seg_df.to_pickle('segmentation/events_indices_'+subject_num+'_pos'+position+'_session'+session+'.pkl', protocol=4)
files_annotations_df = pd.DataFrame(data=files_annotations)
files_annotations_df.to_pickle('segmentation/files_annotations_'+subject_num+'_pos'+position+'_session'+session+'.pkl', protocol=4)

Preprocessing - filtering, calculating RMS values (for heat-maps), normalizing

In [None]:
preprocessed_actions = []
preprocessed_actions_rms = []
preprocessed_actions_heatmaps = []
b, a = bandpass_filter(fl, fh, fs, order)
bnotch, anotch = iirnotch(50, 30, fs)
bnotch2, anotch2 = iirnotch(100, 30, fs)
for ac in range(len(actions)):
    realization = np.transpose(actions[ac])
    realization = realization[1:,:]
    # apply a notch filter and a bandpass filter:
    for h in range(realization.shape[0]):
        realization[h, :] = lfilter(bnotch, anotch, realization[h, :])
        realization[h, :] = lfilter(bnotch2, anotch2, realization[h, :])
        realization[h, :] = lfilter(b, a, realization[h, :])

    final_array = copy.deepcopy(realization)
    final_array = final_array[:,::decimation_factor] # decimation
    preprocessed_actions.append(final_array)
    preprocessed_actions_rms.append(np.sqrt(np.mean(final_array**2, axis=1)))
    max_val = np.max(np.abs(preprocessed_actions_rms[ac]))
    min_val = np.min(np.abs(preprocessed_actions_rms[ac]))
    preprocessed_actions_rms[ac] = (preprocessed_actions_rms[ac] - min_val) / (max_val - min_val) # normalization
    #real_locations = [3, 4, 11, 12, 2, 7, 8, 13, 1, 6, 9, 14, 0, 5, 10, 15] # old setup
    real_locations = [9, 10, 7, 8, 16, 13, 4, 1, 15, 12, 5, 2, 14, 11, 6, 3] # new setup
    real_locations = [location-1 for location in real_locations] # new setup
    preprocessed_actions_heatmaps.append(np.reshape(preprocessed_actions_rms[ac][real_locations],(4,4)))

min_window = np.min([int(action.shape[1] / window_len) for action in preprocessed_actions])
min_window_index = np.argmin([int(action.shape[1] / window_len) for action in preprocessed_actions])
print('Minimum number of windows per realization:')
print(str(min_window)+', at realization number '+str(min_window_index))
print('Preprocessing completed!')

Visualize the heatmaps

In [None]:
plt.rcParams["figure.figsize"] = (25,12)

fig, axs = plt.subplots(gestures_num,repetitions+1)
min = 0.01
max = 1
for ac in range(len(actions)):
    im = axs[int(ac/repetitions), ac%repetitions+1].imshow(preprocessed_actions_heatmaps[ac],cmap='hot')#,norm=LogNorm(vmin=min, vmax=max))
for i in range(gestures_num):
    for j in range(repetitions+1):
        axs[i,j].set_xticks([])
        axs[i,j].set_yticks([])
gnames = ['Two fingers', 'Three fingers', 'Abduction', 'Fist', 'Bet', 'Gimel', 'Het', 'Tet', 'Kaf', 'Nun']
for i in range(gestures_num):
    axs[i,0].text(-0.5,0.5,gnames[i], fontsize=18)
    axs[i, 0].spines['top'].set_visible(False)
    axs[i, 0].spines['right'].set_visible(False)
    axs[i, 0].spines['bottom'].set_visible(False)
    axs[i, 0].spines['left'].set_visible(False)

cbar = fig.colorbar(im, ax=axs.ravel().tolist())
cbar.ax.tick_params(labelsize=18) 
plt.savefig('heatmaps/heatmaps_'+subject_num+'_pos'+position+'_s'+session+'.png')
plt.show()

In [None]:
min_window = 11

Creating data files for the network

In [None]:

value_list = []
for ac in range(len(actions)):
    for i in range(4):
        for j in range(4):
            value_list.append(preprocessed_actions_heatmaps[ac][i][j])

output_data = None
output_labels = None
test_data = None
test_labels = None
val_data = None
val_labels = None
actions_dict = {}
for act in range(len(preprocessed_actions)):
    actions_dict[act] = int(act/10)


# Dividing the data to time windows:
#print('Number of windows per realization:')
#print([int(action.shape[1] / window_len) for action in preprocessed_actions])

for ac in range(len(preprocessed_actions)):
    preprocessed_actions[ac] = preprocessed_actions[ac][:, :window_len * floor(preprocessed_actions[ac].shape[1] / window_len)]
    windows_rms = np.zeros((preprocessed_actions[ac].shape[0], min_window))
    for j in range(windows_rms.shape[1]):
        mean_square = np.mean(preprocessed_actions[ac][:, window_len * j:window_len * (j + 1)] ** 2, axis=1)
        windows_rms[:, j:j + 1] = np.sqrt(np.reshape(mean_square, (mean_square.shape[0], 1)))
    preprocessed_actions[ac] = windows_rms
    preprocessed_actions[ac] = preprocessed_actions[ac] / np.max(np.abs(preprocessed_actions[ac]))
print('The data was divided to time windows!')
test_lengths = []
for ac in range(len(preprocessed_actions)):
    if ac%repetitions == 0:
        rand_nums = np.random.randint(0,repetitions,2)
        while rand_nums[0]==rand_nums[1]:
            rand_nums = np.random.randint(0, repetitions, 2)
        index = ac+rand_nums
    if ac not in index:
        if output_data is None:
            output_data = np.transpose(preprocessed_actions[ac])
            output_labels = np.transpose(0 * np.ones(preprocessed_actions[ac].shape[1]))
        else:
            output_data = np.concatenate((output_data, np.transpose(preprocessed_actions[ac])), axis=0)
            output_labels = np.concatenate((output_labels, np.transpose(actions_dict[ac] * np.ones(preprocessed_actions[ac].shape[1]))), axis=0)
    else:
        if test_data is None:
            test_data = np.transpose(preprocessed_actions[ac])
            test_labels = np.transpose(0 * np.ones(preprocessed_actions[ac].shape[1]))
        else:
            test_data = np.concatenate((test_data, np.transpose(preprocessed_actions[ac])), axis=0)
            test_labels = np.concatenate((test_labels, np.transpose(actions_dict[ac] * np.ones(preprocessed_actions[ac].shape[1]))), axis=0)

val_data = copy.deepcopy(output_data[:2*min_window,:])
val_labels = copy.deepcopy(output_labels[:2*min_window])
for i in range(1,gestures_num):
    val_data = np.concatenate((val_data, output_data[(8*i)*min_window:(8*i+2)*min_window,:]))
    val_labels = np.concatenate((val_labels, output_labels[(8*i)*min_window:(8*i+2)*min_window]))
t = [[j for j in range((8*i+2)*min_window,(8*i+8)*min_window)] for i in range(gestures_num)]
flat_list = [item for sublist in t for item in sublist]
output_data = output_data[flat_list,:]
output_labels = output_labels[flat_list]
output_data_df = pd.DataFrame(data=output_data)
output_labels_df = pd.DataFrame(data=output_labels)
test_data_df = pd.DataFrame(data=test_data)
test_labels_df = pd.DataFrame(data=test_labels)
lengths_df = pd.DataFrame(data=np.array(test_lengths))
val_data_df = pd.DataFrame(data=val_data)
val_labels_df = pd.DataFrame(data=val_labels)


print('shapes of train data, train labels, validation data, validation labels, test data, test labels:')
print(output_data_df.shape)
print(output_labels_df.shape)
print(val_data.shape)
print(val_labels.shape)
print(test_data_df.shape)
print(test_labels_df.shape)
folder_path = 'segmented_files/subject_'+subject_num
if not os.path.exists(folder_path):
  os.makedirs(folder_path)
output_data_df.to_pickle(name_start+'_train_data.pkl', protocol=4)
output_labels_df.to_pickle(name_start+'_train_labels.pkl', protocol=4)
test_data_df.to_pickle(name_start+'_test_data.pkl', protocol=4)
test_labels_df.to_pickle(name_start+'_test_labels.pkl', protocol=4)
val_data_df.to_pickle(name_start+'_val_data.pkl', protocol=4)
val_labels_df.to_pickle(name_start+'_val_labels.pkl', protocol=4)
print('Files saved!')

Train HMM models:

In [None]:
start_time = time.time()
#subject_num = '004' ######
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session
for filename in [(name_start+'_train_data.pkl', name_start+'_train_labels.pkl', name_start+'_val_data.pkl', name_start+'_val_labels.pkl')]:
    data = pd.read_pickle(filename[0])
    data = data.to_numpy()
    labels = pd.read_pickle(filename[1])
    labels = labels.to_numpy()
    test_data_part = pd.read_pickle(filename[2])
    test_data_part = test_data_part.to_numpy()
    test_labels_part = pd.read_pickle(filename[3])
    test_labels_part = test_labels_part.to_numpy()

    if train_data is None:
        train_data = copy.deepcopy(data)
        train_labels = copy.deepcopy(labels)
        test_data = copy.deepcopy(test_data_part)
        test_labels = copy.deepcopy(test_labels_part)
    else:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))

windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
train_labels = train_labels.flatten()
test_labels = test_labels[::windows_num]
test_labels = test_labels.flatten()
print('Original shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
print('test labels:')
print(test_labels.T)
print('train labels:')
print(train_labels.T)

#for comp_num in [4,6,8,10,12]:
#    for iterations_num in [1,5,10,50,100]:
d = {}
for comp_num in [4]:
    for iterations_num in [10]:
        res = []
        tries_num = 1
        gestures_num = 10
        repetitions = 6
        #repetitions = 5
        val_num = 2
        lengths = [train_data.shape[1] for j in range(repetitions)]
        final_res = []
        models = []

        for i in range(gestures_num):
            #X = np.concatenate([train_data[repetitions*i+j,:,:] for j in range(repetitions)])
            X = np.concatenate([train_data[repetitions*i+j,:,:] for j in [0,1,2,3,4,5]])

            model = hmm.GaussianHMM(n_components=comp_num, covariance_type="tied", n_iter=iterations_num)
            model.fit(X, lengths)
            models.append(model)
            folder_path = 'hmm_models/subject_'+subject_num+'/hmm_'+subject_num+'_pos'+position+'_s'+session 
            if not os.path.exists(folder_path):
                os.makedirs(folder_path)
            with open(folder_path+'/hmm_gest'+str(i)+'.pkl', "wb") as file: pickle.dump(model, file)
        test_res = []
        for s in range(gestures_num*val_num):
            test_res.append(np.argmax(np.array([models[i].score(test_data[s, :, :]) for i in range(gestures_num)])))

        print('test results:')
        print(test_res)
        print('real labels:')
        print(test_labels.T)
        acc = np.mean(np.array(test_res)==test_labels)
        print('HMM classification accuracy:')
        print(np.mean(np.array(test_res)==test_labels))

print('total run time: '+str(time.time()-start_time))

Generate artificial training data from the traind HMM, and add it to the real training data:

In [None]:
num_gen = 6

In [None]:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session
for filename in [(name_start+'_train_data.pkl', name_start+'_train_labels.pkl', name_start+'_val_data.pkl', name_start+'_val_labels.pkl')]:
    data = pd.read_pickle(filename[0])
    data = data.to_numpy()
    labels = pd.read_pickle(filename[1])
    labels = labels.to_numpy()
    test_data_part = pd.read_pickle(filename[2])
    test_data_part = test_data_part.to_numpy()
    test_labels_part = pd.read_pickle(filename[3])
    test_labels_part = test_labels_part.to_numpy()

    if train_data is None:
        train_data = copy.deepcopy(data)
        train_labels = copy.deepcopy(labels)
        test_data = copy.deepcopy(test_data_part)
        test_labels = copy.deepcopy(test_labels_part)
    else:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))


res = []
comp_num = 4
tries_num = 1
gestures_num = 10
num_windows = min_window
final_res = []
generated_data = None
generated_labels = None
for t in range(tries_num):
    models = []
    for i in range(gestures_num):
        folder_path = 'hmm_models/subject_'+subject_num+'/hmm_'+subject_num+'_pos'+position+'_s'+session 
        file = open(folder_path+'/hmm_gest'+str(i)+'.pkl', "rb")
        model = pickle.load(file)
        for j in range(num_gen):
            X, Z = model.sample(num_windows)
            if generated_data is None:
                generated_data = copy.deepcopy(X)
            else:
                generated_data = np.concatenate((generated_data,X))
        labels = i*np.ones(num_gen*num_windows)
        if generated_labels is None:
            generated_labels = copy.deepcopy(labels)
        else:
            generated_labels = np.concatenate((generated_labels, labels))
print('generated shapes:')
print(generated_data.shape)
print(generated_labels.shape)

generated_data = np.concatenate((train_data, generated_data))
generated_labels = np.concatenate((train_labels.flatten(), generated_labels.flatten()))
print('final shapes:')
print(generated_data.shape)
print(generated_labels.shape)

output_data_df = pd.DataFrame(data=generated_data)
output_labels_df = pd.DataFrame(data=generated_labels)
folder_path = 'hmm_generated_files/subject_'+subject_num
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
name_start_gen = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session
output_data_df.to_pickle(name_start_gen+'_train_data.pkl', protocol=4)
output_labels_df.to_pickle(name_start_gen+'_train_labels.pkl', protocol=4)

# Raw files processing with automatic segmentation - new

Loading the data

In [None]:
actions = []
actions1 = []
actions2 = []
files_num = 2 if two_files else 1
for file_i in range(files_num):
  if two_files and file_i==0:
    filename = filename1
  if two_files and file_i==1:
    filename = filename2
  # read the raw data and split it into events:
  rawEDF = mne.io.read_raw_edf(filename)
  rawEDF.load_data()
  events = mne.events_from_annotations(rawEDF)
  #print(events)
  annotations = mne.read_annotations(filename)
  event_annotations = annotations.description
  events_types = events[0][:,2]
  events_indices = events[0][:,0]
  if event_annotations.shape[0] != events_indices.shape[0]:
      dif = event_annotations.shape[0] - events_indices.shape[0]
      event_annotations = event_annotations[dif:]
  mask = [a in sync_annotations for a in event_annotations]
  events_types = events_types[mask]
  events_indices = events_indices[mask]
  # Read automatic segmentation:
  ###########################
  seg_filename = 'segmentation/segmentation_final_indices_007_pos2_S2.pkl'
  events_indices_auto = pd.read_pickle(seg_filename)
  events_indices_auto = events_indices_auto.to_numpy()
  for g in range(10):
    events_indices[g*20+16:g*20+20] = events_indices_auto[g*20+16:g*20+20,0]
  ###########################
  #relevant_annotations = ['thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','thumb up start','thumb up end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend23 start','extend23 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','extend123 start','extend123 end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','abduction start','abduction end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','fist start','fist end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end','index start','index end']
  #relevant_annotations = sync_annotations*(gestures_num*repetitions) # old annotations
  relevant_annotations = sync_annotations*(repetitions) # new annotations
  print('Number of events:')
  print(len(events_indices))
  #print('Relevant annotations in files:')
  files_annotations = event_annotations[mask]
  #print(files_annotations)
  print('Wrong annotations in file:')
  flag = False
  for i in range(len(files_annotations)-1):
    if files_annotations[i]==files_annotations[i+1]:
      print('annotation number '+str(i)+', at '+str(events_indices[i]/4000)+' seconds')
      flag = True
  if not flag:
    print('None')
  
  data_pd = rawEDF.to_data_frame(picks='all')
  annotations_dict = {}
  count = 0
  for i in range(len(events_indices)-1):
      if 'Start' in files_annotations[i]:
          count+=1
          event_key = files_annotations[i][6:]
          if event_key[-1]>='1' and event_key[-1]<='9':
            event_key = event_key[:-2]
          if event_key[-1]=='0':
            event_key = event_key[:-3]
          if event_key not in annotations_dict:
            annotations_dict[event_key] = [data_pd.iloc[events_indices[i]:events_indices[i+1],:17]]
          else:
            annotations_dict[event_key].append(data_pd.iloc[events_indices[i]:events_indices[i+1],:17])
          #actions.append(data_pd.iloc[events_indices[i]:events_indices[i+1],:17])
  if two_files and file_i==0:
    annotations_dict1 = copy.deepcopy(annotations_dict)
  if two_files and file_i==1:
    annotations_dict2 = copy.deepcopy(annotations_dict)

if two_files: # merge actions from the two files
  annotations_dict = {}
  for key in annotations_dict1.keys():
    annotations_dict[key] = annotations_dict1[key]
    if key in annotations_dict2.keys():
      annotations_dict[key].extend(annotations_dict2[key])

actions_order = ['TwoFingers', 'ThreeFingers', 'Abduction', 'Fist', 'Bet', 'Gimel', 'Het', 'Tet', 'Kaf', 'Nun']
for a in actions_order:
    actions.extend(annotations_dict[a])
for k in range(len(actions)):
    actions[k] = actions[k].to_numpy()
# Finally we have "actions" list, whose length is the number of events. Each element is a 16-dimensional time series of the EMG data of a specific realization.
# events_indices_seg_df = pd.DataFrame(data=events_indices)
# events_indices_seg_df.to_pickle('segmentation/events_indices_'+subject_num+'_pos'+position+'_session'+session+'.pkl', protocol=4)
# files_annotations_df = pd.DataFrame(data=files_annotations)
# files_annotations_df.to_pickle('segmentation/files_annotations_'+subject_num+'_pos'+position+'_session'+session+'.pkl', protocol=4)

Preprocessing - filtering, calculating RMS values (for heat-maps), normalizing

In [None]:
preprocessed_actions = []
preprocessed_actions_rms = []
preprocessed_actions_heatmaps = []
b, a = bandpass_filter(fl, fh, fs, order)
bnotch, anotch = iirnotch(50, 30, fs)
bnotch2, anotch2 = iirnotch(100, 30, fs)
for ac in range(len(actions)):
    realization = np.transpose(actions[ac])
    realization = realization[1:,:]
    # apply a notch filter and a bandpass filter:
    for h in range(realization.shape[0]):
        realization[h, :] = lfilter(bnotch, anotch, realization[h, :])
        realization[h, :] = lfilter(bnotch2, anotch2, realization[h, :])
        realization[h, :] = lfilter(b, a, realization[h, :])

    final_array = copy.deepcopy(realization)
    final_array = final_array[:,::decimation_factor] # decimation
    preprocessed_actions.append(final_array)
    preprocessed_actions_rms.append(np.sqrt(np.mean(final_array**2, axis=1)))
    max_val = np.max(np.abs(preprocessed_actions_rms[ac]))
    min_val = np.min(np.abs(preprocessed_actions_rms[ac]))
    preprocessed_actions_rms[ac] = (preprocessed_actions_rms[ac] - min_val) / (max_val - min_val) # normalization
    #real_locations = [3, 4, 11, 12, 2, 7, 8, 13, 1, 6, 9, 14, 0, 5, 10, 15] # old setup
    real_locations = [9, 10, 7, 8, 16, 13, 4, 1, 15, 12, 5, 2, 14, 11, 6, 3] # new setup
    real_locations = [location-1 for location in real_locations] # new setup
    preprocessed_actions_heatmaps.append(np.reshape(preprocessed_actions_rms[ac][real_locations],(4,4)))

min_window = np.min([int(action.shape[1] / window_len) for action in preprocessed_actions])
min_window_index = np.argmin([int(action.shape[1] / window_len) for action in preprocessed_actions])
print('Minimum number of windows per realization:')
print(str(min_window)+', at realization number '+str(min_window_index))
print('Preprocessing completed!')

Visualize the heatmaps

In [None]:
plt.rcParams["figure.figsize"] = (25,12)

fig, axs = plt.subplots(gestures_num,repetitions+1)
min = 0.01
max = 1
for ac in range(len(actions)):
    im = axs[int(ac/repetitions), ac%repetitions+1].imshow(preprocessed_actions_heatmaps[ac],cmap='hot')#,norm=LogNorm(vmin=min, vmax=max))
for i in range(gestures_num):
    for j in range(repetitions+1):
        axs[i,j].set_xticks([])
        axs[i,j].set_yticks([])
gnames = ['Two fingers', 'Three fingers', 'Abduction', 'Fist', 'Bet', 'Gimel', 'Het', 'Tet', 'Kaf', 'Nun']
for i in range(gestures_num):
    axs[i,0].text(-0.5,0.5,gnames[i], fontsize=18)
    axs[i, 0].spines['top'].set_visible(False)
    axs[i, 0].spines['right'].set_visible(False)
    axs[i, 0].spines['bottom'].set_visible(False)
    axs[i, 0].spines['left'].set_visible(False)

cbar = fig.colorbar(im, ax=axs.ravel().tolist())
cbar.ax.tick_params(labelsize=18) 
plt.savefig('heatmaps/heatmaps_auto_seg_'+subject_num+'_pos'+position+'_s'+session+'.png')
plt.show()

In [None]:
min_window = 11

Creating data files for the network

In [None]:

value_list = []
for ac in range(len(actions)):
    for i in range(4):
        for j in range(4):
            value_list.append(preprocessed_actions_heatmaps[ac][i][j])

output_data = None
output_labels = None
test_data = None
test_labels = None
val_data = None
val_labels = None
actions_dict = {}
for act in range(len(preprocessed_actions)):
    actions_dict[act] = int(act/10)


# Dividing the data to time windows:
#print('Number of windows per realization:')
#print([int(action.shape[1] / window_len) for action in preprocessed_actions])

for ac in range(len(preprocessed_actions)):
    preprocessed_actions[ac] = preprocessed_actions[ac][:, :window_len * floor(preprocessed_actions[ac].shape[1] / window_len)]
    windows_rms = np.zeros((preprocessed_actions[ac].shape[0], min_window))
    for j in range(windows_rms.shape[1]):
        mean_square = np.mean(preprocessed_actions[ac][:, window_len * j:window_len * (j + 1)] ** 2, axis=1)
        windows_rms[:, j:j + 1] = np.sqrt(np.reshape(mean_square, (mean_square.shape[0], 1)))
    preprocessed_actions[ac] = windows_rms
    preprocessed_actions[ac] = preprocessed_actions[ac] / np.max(np.abs(preprocessed_actions[ac]))
print('The data was divided to time windows!')
test_lengths = []
for ac in range(len(preprocessed_actions)):
    if ac%repetitions == 0:
        rand_nums = np.array([8, 9])
        # rand_nums = np.random.randint(0,repetitions,2)
        # while rand_nums[0]==rand_nums[1]:
        #     rand_nums = np.random.randint(0, repetitions, 2)
        index = ac+rand_nums
    if ac not in index:
        if output_data is None:
            output_data = np.transpose(preprocessed_actions[ac])
            output_labels = np.transpose(0 * np.ones(preprocessed_actions[ac].shape[1]))
        else:
            output_data = np.concatenate((output_data, np.transpose(preprocessed_actions[ac])), axis=0)
            output_labels = np.concatenate((output_labels, np.transpose(actions_dict[ac] * np.ones(preprocessed_actions[ac].shape[1]))), axis=0)
    else:
        if test_data is None:
            test_data = np.transpose(preprocessed_actions[ac])
            test_labels = np.transpose(0 * np.ones(preprocessed_actions[ac].shape[1]))
        else:
            test_data = np.concatenate((test_data, np.transpose(preprocessed_actions[ac])), axis=0)
            test_labels = np.concatenate((test_labels, np.transpose(actions_dict[ac] * np.ones(preprocessed_actions[ac].shape[1]))), axis=0)

val_data = copy.deepcopy(output_data[:2*min_window,:])
val_labels = copy.deepcopy(output_labels[:2*min_window])
for i in range(1,gestures_num):
    val_data = np.concatenate((val_data, output_data[(8*i)*min_window:(8*i+2)*min_window,:]))
    val_labels = np.concatenate((val_labels, output_labels[(8*i)*min_window:(8*i+2)*min_window]))
t = [[j for j in range((8*i+2)*min_window,(8*i+8)*min_window)] for i in range(gestures_num)]
flat_list = [item for sublist in t for item in sublist]
output_data = output_data[flat_list,:]
output_labels = output_labels[flat_list]
output_data_df = pd.DataFrame(data=output_data)
output_labels_df = pd.DataFrame(data=output_labels)
# miss_indices = [82]
# miss_samples = []
# for m in miss_indices:
#   m_rep = (m//2)%10
#   m_gest = (m//2)//10
#   if m_rep == 8 or m_rep == 9:
#     miss_samples.append(m_gest*2+m_rep-8)
#     #test_data[m_gest*2+m_rep-8] = -1*np.ones(16)
#     #test_labels[m_gest*2+m_rep-8] = -1
# relevant_samples = [item for item in list(range(20)) if item not in miss_samples]
# test_data = test_data[relevant_samples, :]
# test_labels = test_labels[relevant_samples]
print(test_data.shape)
print(test_labels.shape)
test_data_df = pd.DataFrame(data=test_data)
test_labels_df = pd.DataFrame(data=test_labels)
lengths_df = pd.DataFrame(data=np.array(test_lengths))
val_data_df = pd.DataFrame(data=val_data)
val_labels_df = pd.DataFrame(data=val_labels)


print('shapes of train data, train labels, validation data, validation labels, test data, test labels:')
print(output_data_df.shape)
print(output_labels_df.shape)
print(val_data.shape)
print(val_labels.shape)
print(test_data_df.shape)
print(test_labels_df.shape)
folder_path = 'segmented_files/subject_'+subject_num
if not os.path.exists(folder_path):
  os.makedirs(folder_path)
###########################
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
###########################
output_data_df.to_pickle(name_start+'_train_data.pkl', protocol=4)
output_labels_df.to_pickle(name_start+'_train_labels.pkl', protocol=4)
test_data_df.to_pickle(name_start+'_test_data.pkl', protocol=4)
test_labels_df.to_pickle(name_start+'_test_labels.pkl', protocol=4)
val_data_df.to_pickle(name_start+'_val_data.pkl', protocol=4)
val_labels_df.to_pickle(name_start+'_val_labels.pkl', protocol=4)
print('Files saved!')

Train HMM models:

In [None]:
start_time = time.time()
#subject_num = '004' ######
train_data = None
train_labels = None
test_data = None
test_labels = None
###########################
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
###########################
#name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session
for filename in [(name_start+'_train_data.pkl', name_start+'_train_labels.pkl', name_start+'_val_data.pkl', name_start+'_val_labels.pkl')]:
    data = pd.read_pickle(filename[0])
    data = data.to_numpy()
    labels = pd.read_pickle(filename[1])
    labels = labels.to_numpy()
    test_data_part = pd.read_pickle(filename[2])
    test_data_part = test_data_part.to_numpy()
    test_labels_part = pd.read_pickle(filename[3])
    test_labels_part = test_labels_part.to_numpy()

    if train_data is None:
        train_data = copy.deepcopy(data)
        train_labels = copy.deepcopy(labels)
        test_data = copy.deepcopy(test_data_part)
        test_labels = copy.deepcopy(test_labels_part)
    else:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))

windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
train_labels = train_labels.flatten()
test_labels = test_labels[::windows_num]
test_labels = test_labels.flatten()
print('Original shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
print('test labels:')
print(test_labels.T)
print('train labels:')
print(train_labels.T)

#for comp_num in [4,6,8,10,12]:
#    for iterations_num in [1,5,10,50,100]:
d = {}
for comp_num in [4]:
    for iterations_num in [10]:
        res = []
        tries_num = 1
        gestures_num = 10
        repetitions = 6
        #repetitions = 5
        val_num = 2
        lengths = [train_data.shape[1] for j in range(repetitions)]
        final_res = []
        models = []

        for i in range(gestures_num):
            #X = np.concatenate([train_data[repetitions*i+j,:,:] for j in range(repetitions)])
            X = np.concatenate([train_data[repetitions*i+j,:,:] for j in [0,1,2,3,4,5]])

            model = hmm.GaussianHMM(n_components=comp_num, covariance_type="tied", n_iter=iterations_num)
            model.fit(X, lengths)
            models.append(model)
            folder_path = 'hmm_models/subject_'+subject_num+'/hmm_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
            if not os.path.exists(folder_path):
                os.makedirs(folder_path)
            with open(folder_path+'/hmm_gest'+str(i)+'.pkl', "wb") as file: pickle.dump(model, file)
        test_res = []
        for s in range(gestures_num*val_num):
            test_res.append(np.argmax(np.array([models[i].score(test_data[s, :, :]) for i in range(gestures_num)])))

        print('test results:')
        print(test_res)
        print('real labels:')
        print(test_labels.T)
        acc = np.mean(np.array(test_res)==test_labels)
        print('HMM classification accuracy:')
        print(np.mean(np.array(test_res)==test_labels))

print('total run time: '+str(time.time()-start_time))

Generate artificial training data from the traind HMM, and add it to the real training data:

In [None]:
num_gen = 6

In [None]:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
for filename in [(name_start+'_train_data.pkl', name_start+'_train_labels.pkl', name_start+'_val_data.pkl', name_start+'_val_labels.pkl')]:
    data = pd.read_pickle(filename[0])
    data = data.to_numpy()
    labels = pd.read_pickle(filename[1])
    labels = labels.to_numpy()
    test_data_part = pd.read_pickle(filename[2])
    test_data_part = test_data_part.to_numpy()
    test_labels_part = pd.read_pickle(filename[3])
    test_labels_part = test_labels_part.to_numpy()

    if train_data is None:
        train_data = copy.deepcopy(data)
        train_labels = copy.deepcopy(labels)
        test_data = copy.deepcopy(test_data_part)
        test_labels = copy.deepcopy(test_labels_part)
    else:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))


res = []
comp_num = 4
tries_num = 1
gestures_num = 10
num_windows = min_window
final_res = []
generated_data = None
generated_labels = None
for t in range(tries_num):
    models = []
    for i in range(gestures_num):
        folder_path = 'hmm_models/subject_'+subject_num+'/hmm_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
        file = open(folder_path+'/hmm_gest'+str(i)+'.pkl', "rb")
        model = pickle.load(file)
        for j in range(num_gen):
            X, Z = model.sample(num_windows)
            if generated_data is None:
                generated_data = copy.deepcopy(X)
            else:
                generated_data = np.concatenate((generated_data,X))
        labels = i*np.ones(num_gen*num_windows)
        if generated_labels is None:
            generated_labels = copy.deepcopy(labels)
        else:
            generated_labels = np.concatenate((generated_labels, labels))
print('generated shapes:')
print(generated_data.shape)
print(generated_labels.shape)

generated_data = np.concatenate((train_data, generated_data))
generated_labels = np.concatenate((train_labels.flatten(), generated_labels.flatten()))
print('final shapes:')
print(generated_data.shape)
print(generated_labels.shape)

output_data_df = pd.DataFrame(data=generated_data)
output_labels_df = pd.DataFrame(data=generated_labels)
folder_path = 'hmm_generated_files/subject_'+subject_num
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
name_start_gen = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session+'_auto_seg'
output_data_df.to_pickle(name_start_gen+'_train_data.pkl', protocol=4)
output_labels_df.to_pickle(name_start_gen+'_train_labels.pkl', protocol=4)

# Classification task 1

**Task 1:** 4 classification algorithms

## CNN

**Task 1** - Training the CNN: okay

In [None]:
use_hmm = True
run = '1'
position = '2'
train_sessions = ['2']
subject_num = '007'
pos = -30
min_window = 11

test_events = 20*len(train_sessions)

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_auto_seg_train_data.pkl', name_start+session+'_auto_seg_train_labels.pkl', name_start_val+session+'_auto_seg_val_data.pkl', name_start_val+session+'_auto_seg_val_labels.pkl') for session in train_sessions]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

train_data = np.reshape(train_data, (train_data.shape[0], 4, 4))
test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of train data, train labels, test data, test labels:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]
# Data standartization:
train_data = train_data - np.mean(train_data, axis=0)
train_data = train_data / np.std(train_data, axis=0)
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

train_data = tf.expand_dims(train_data, axis=-1)
test_data = tf.expand_dims(test_data, axis=-1)

# Hyper-parameters:
if use_hmm:
  algo_type='CNN+HMM'
else:
  algo_type='CNN'
reg_strength = 0.0001
lr = 5e-4
d1 = 0.3
epochs_num = 1000 #2000
batch_size = 600#1000 #3400
#min_window = real_min_window
print('min_window='+str(min_window))

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(2, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='1'),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(name='2'),
    tf.keras.layers.Conv2D(4, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='3'),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(name='4'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(40, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='5'),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(name='6'),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(20, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='7'),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(name='8'),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

#####model.load_weights('weights/CNN/weights_task1_initial_004_run1')
history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/CNN/weights_task1_auto_seg_'+subject_num+'_pos'+position+'_run'+run)
#model.load_weights('weights/CNN/weights_task1_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based validation accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(train_sessions))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.show()

print('Validation accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()



**Task 1:** testing the CNN: okay

In [None]:
# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_auto_seg_test_data.pkl', name_start+session+'_auto_seg_test_labels.pkl') for session in train_sessions]:
    if filename[0] is not None:
        test_data_part = pd.read_pickle(filename[0])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[1] is not None:
        test_labels_part = pd.read_pickle(filename[1])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of test data, test labels:')
print(test_data.shape)
print(test_labels.shape)

# Data standartization:
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

test_data = tf.expand_dims(test_data, axis=-1)

model.load_weights('weights/CNN/weights_task1_auto_seg_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based test accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
test_events = 20*len(train_sessions)
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(train_sessions))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)

print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
mat = confusion_matrix(real_labels, predictions_majority)
print(repr(mat))
plt.imshow(mat)
#plt.colorbar()
plt.show()

folder_path = 'summaries/subject_'+subject_num+'/task1_auto_seg'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task1_1s/summary_task1_auto_seg_'+subject_num+'_pos'+position+'_type_'+algo_type+'_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 1, subject number '+subject_num+', position '+position+', algorithm type: '+algo_type+', run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('position: '+position+'\n')
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Single frame based validation accuracy: '+str(test_acc)+'\n')
f.write('Test accuracy after majority voting: '+str(accuracy)+'\n')
f.write('weights file: weights_task1_'+subject_num+'_pos'+position+'_run'+run+'\n')
f.close()

confusion_matrix_df = pd.DataFrame(data=mat)
confusion_matrix_df.to_pickle(folder_path+'/CNN_mat_run'+run+'.pkl', protocol=4)

## RNN

Training the RNN

In [None]:
use_hmm = True
run = '2'

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_train_data.pkl', name_start+session+'_train_labels.pkl', name_start_val+session+'_val_data.pkl', name_start_val+session+'_val_labels.pkl') for session in train_sessions]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 5e-3
d1 = 0
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 1500 #1500
batch_size = 1000 # full batch
print('min_window='+str(min_window))


windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/RNN/weights_task1_1s_'+subject_num+'_pos'+position+'_run'+run)
#model.load_weights('weights/RNN/weights_task1_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.figure()

print('Confusion matrix:')
plt.imshow(cmat)
plt.colorbar()
#plt.show()

Testing the RNN

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_train_data.pkl', name_start+session+'_train_labels.pkl', name_start_val+session+'_test_data.pkl', name_start_val+session+'_test_labels.pkl') for session in train_sessions]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 2000 #1500
batch_size = 500
print('min_window='+str(min_window))


windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

#history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
#model.save_weights('weights/RNN/weights_task1_'+subject_num+'_pos'+position+'_run'+run)
model.load_weights('weights/RNN/weights_task1_1s_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

folder_path = 'summaries/subject_'+subject_num+'/task1_1s'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task1_1s/summary_task1_1s_'+subject_num+'_pos'+position+'_type_'+algo_type+'_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 1, subject number '+subject_num+', position '+position+', algorithm type: '+algo_type+', run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('position: '+position+'\n')
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Test accuracy after majority voting: '+str(test_acc)+'\n')
f.write('weights file: weights_task1_'+subject_num+'_pos'+position+'_run'+run+'\n')
f.close()

confusion_matrix_df = pd.DataFrame(data=mat)
confusion_matrix_df.to_pickle(folder_path+'/RNN_mat.pkl', protocol=4)

## KNN

**Task 1** - KNN

In [None]:
use_hmm = False
run = '2'

In [None]:
k_KNN = 1

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_train_data.pkl', name_start+session+'_train_labels.pkl', name_start_val+session+'_test_data.pkl', name_start_val+session+'_test_labels.pkl') for session in train_sessions]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

KNN = KNeighborsClassifier(n_neighbors=k_KNN)
KNN.fit(train_data, train_labels)
predictions = KNN.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('KNN single frame accuracy: '+str(single_acc))

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(train_sessions))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()

summary_name = 'summaries/subject_'+subject_num+'/task1_1s/summary_task1_1s_'+subject_num+'_pos'+position+'_type_KNN_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 1, subject number '+subject_num+', position '+position+', algorithm type: KNN, run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('position: '+position+'\n')
f.write('k: '+str(k_KNN)+'\n')
f.write('Single frame accuracy: '+str(single_acc)+'\n')
f.write('Accuracy after majority voting: '+str(accuracy)+'\n')
f.close()

## SVM

**Task 1** - SVM

In [None]:
use_hmm = False
run = '2'

In [None]:
c = 100

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'
for filename in [(name_start+session+'_train_data.pkl', name_start+session+'_train_labels.pkl', name_start_val+session+'_test_data.pkl', name_start_val+session+'_test_labels.pkl') for session in train_sessions]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

#svm_classifier = pickle.load(open('svm_model.sav', 'rb'))
svm_classifier = svm.SVC(C=c)
svm_classifier.fit(train_data, train_labels)
predictions = svm_classifier.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('SVM single frame accuracy:')
print(single_acc)
SVM_model_name = 'weights/SVM/SVM_task1_1s_'+subject_num+'_pos'+position+'_run'+run
pickle.dump(svm_classifier, open(SVM_model_name+'.sav', 'wb'))


# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(train_sessions))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()

summary_name = 'summaries/subject_'+subject_num+'/task1_1s/summary_task1_1s_'+subject_num+'_pos'+position+'_type_SVM_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 1, subject number '+subject_num+', position '+position+', algorithm type: SVM, run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('position: '+position+'\n')
f.write('C: '+str(c)+'\n')
f.write('model name: '+SVM_model_name+'.sav'+'\n')
f.write('single frame accuracy: '+str(single_acc)+'\n')
f.write('accuracy after majority voting: '+str(accuracy)+'\n')
f.close()

# Classification task 2

4 classification algorithms

## CNN

Training the CNN:

In [None]:
use_hmm = True
run = '1'
train_sessions = ['1', '2']
subject_num = '004'
train_positions = ['1', '2', '3']
min_window = 11

pos_session_list = []
for s in train_sessions:
  for p in train_positions:
    pos_session_list.append((p, s))

test_events = 20*len(pos_session_list)

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

train_data = np.reshape(train_data, (train_data.shape[0], 4, 4))
test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of train data, train labels, test data, test labels:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]
# Data standartization:
train_data = train_data - np.mean(train_data, axis=0)
train_data = train_data / np.std(train_data, axis=0)
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

train_data = tf.expand_dims(train_data, axis=-1)
test_data = tf.expand_dims(test_data, axis=-1)

# Hyper-parameters:
if use_hmm:
  algo_type='CNN+HMM'
else:
  algo_type='CNN'
reg_strength = 0.0001
lr = 5e-4
d1 = 0.3
epochs_num = 1500 #1500
batch_size = 10000
#min_window = real_min_window
print('min_window='+str(min_window))

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(2, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(4, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(40, kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(20, kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/CNN/weights_task2_'+subject_num+'_run'+run)
#model.load_weights('weights/CNN/weights_task2_'+subject_num+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based validation accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.show()

print('Validation accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()



**Task 1:** testing the CNN

In [None]:
# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start_val+'_pos'+position+'_s'+session+'_test_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_test_labels.pkl') for position, session in pos_session_list]:    
    if filename[0] is not None:
        test_data_part = pd.read_pickle(filename[0])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[1] is not None:
        test_labels_part = pd.read_pickle(filename[1])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of test data, test labels:')
print(test_data.shape)
print(test_labels.shape)

# Data standartization:
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

test_data = tf.expand_dims(test_data, axis=-1)

model.load_weights('weights/CNN/weights_task2_'+subject_num+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based test accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)

print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
mat = confusion_matrix(real_labels, predictions_majority)
print(repr(mat))
plt.imshow(mat)
#plt.colorbar()
plt.show()

folder_path = 'summaries/subject_'+subject_num+'/task2'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task2/summary_task2_'+subject_num+'_type_'+algo_type+'_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 2, subject number '+subject_num+', algorithm type: '+algo_type+', run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Single frame based validation accuracy: '+str(test_acc)+'\n')
f.write('Test accuracy after majority voting: '+str(accuracy)+'\n')
f.write('weights file: weights_task2_'+subject_num+'_run'+run+'\n')
f.close()

confusion_matrix_df = pd.DataFrame(data=mat)
confusion_matrix_df.to_pickle(folder_path+'/CNN_mat.pkl', protocol=4)

## RNN

Training the RNN

In [None]:
use_hmm = True
run = '1'

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 1000 #1500
batch_size = 150
print('min_window='+str(min_window))


windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/RNN/weights_task2_'+subject_num+'_run'+run)
#model.load_weights('weights/RNN/weights_task2_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.figure()
print('Confusion matrix:')
plt.imshow(cmat)
plt.colorbar()
#plt.show()

Testing the RNN

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 2000 #1500
batch_size = 500
print('min_window='+str(min_window))

windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

#history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
#model.save_weights('weights/RNN/weights_task1_'+subject_num+'_pos'+position+'_run'+run)
model.load_weights('weights/RNN/weights_task2_'+subject_num+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

folder_path = 'summaries/subject_'+subject_num+'/task2'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task2/summary_task2_'+subject_num+'_type_'+algo_type+'_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 2, subject number '+subject_num+', algorithm type: '+algo_type+', run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Test accuracy after majority voting: '+str(test_acc)+'\n')
f.write('weights file: weights_task2_'+subject_num+'_run'+run+'\n')
f.close()

confusion_matrix_df = pd.DataFrame(data=mat)
confusion_matrix_df.to_pickle(folder_path+'/RNN_mat.pkl', protocol=4)

## KNN

**Task 1** - KNN

In [None]:
use_hmm = False
run = '1'

In [None]:
k_KNN = 1

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

KNN = KNeighborsClassifier(n_neighbors=k_KNN)
KNN.fit(train_data, train_labels)
predictions = KNN.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('KNN single frame accuracy: '+str(single_acc))

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()
summary_name = 'summaries/subject_'+subject_num+'/task2/summary_task2_'+subject_num+'_type_KNN_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 2, subject number '+subject_num+', algorithm type: KNN, run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('k: '+str(k_KNN)+'\n')
f.write('Single frame accuracy: '+str(single_acc)+'\n')
f.write('Accuracy after majority voting: '+str(accuracy)+'\n')
f.close()

## SVM

**Task 2** - SVM

In [None]:
use_hmm = False
run = '1'

In [None]:
c = 100

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

#svm_classifier = pickle.load(open('svm_model.sav', 'rb'))
svm_classifier = svm.SVC(C=c)
svm_classifier.fit(train_data, train_labels)
predictions = svm_classifier.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('SVM single frame accuracy:')
print(single_acc)
SVM_model_name = 'weights/SVM/SVM_task2_'+subject_num+'_run'+run
pickle.dump(svm_classifier, open(SVM_model_name+'.sav', 'wb'))


# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()
summary_name = 'summaries/subject_'+subject_num+'/task2/summary_task2_'+subject_num+'_type_SVM_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 2, subject number '+subject_num+', algorithm type: SVM, run '+run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('C: '+str(c)+'\n')
f.write('model name: '+SVM_model_name+'.sav'+'\n')
f.write('single frame accuracy: '+str(single_acc)+'\n')
f.write('accuracy after majority voting: '+str(accuracy)+'\n')
f.close()

# Classification task 3

4 classification algorithms

## CNN

Training the backbone CNN:

In [None]:
use_hmm = True
run = '3'
train_sessions = ['1']
subject_nums = ['001', '004', '007']
train_positions = ['1', '2', '3']
min_window = 11

sub_pos_session_list = []
for sub in subject_nums:
  for s in train_sessions:
    for p in train_positions:
      sub_pos_session_list.append((sub, p, s))

test_events = 20*len(sub_pos_session_list)

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'#+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_labels.pkl') for subject_num, position, session in sub_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

train_data = np.reshape(train_data, (train_data.shape[0], 4, 4))
test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of train data, train labels, test data, test labels:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]
# Data standartization:
train_data = train_data - np.mean(train_data, axis=0)
train_data = train_data / np.std(train_data, axis=0)
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

train_data = tf.expand_dims(train_data, axis=-1)
test_data = tf.expand_dims(test_data, axis=-1)

# Hyper-parameters:
if use_hmm:
  algo_type='CNN+HMM'
else:
  algo_type='CNN'
reg_strength = 0#0.0001
lr = 1e-2#1e-3#5e-4
d1 = 0.2
epochs_num = 500#1000#500#1500#2000 #1500
batch_size = 1000#3500# #5000
#min_window = 11 #17
#min_window = real_min_window
print('min_window='+str(min_window))

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(2, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(4, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(40, kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(20, kernel_regularizer=tf.keras.regularizers.l2(reg_strength)),
    tf.keras.layers.ReLU(),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(d1),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch>100:
    return lr/2
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/CNN/weights_task3_bb_full_sub_run'+run)
#model.load_weights('weights/CNN/weights_task3_bb_full_sub_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based validation accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print(len(predictions))
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(test_events)
prev = 0
for i in range(test_events):
    # if i < test_events/2:
    #   min_window = 11
    # else:
    #   min_window = 11
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(sub_pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.show()

print('Validation accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()



### CNN fine tuning:

In [None]:
use_hmm = False
run = '3'
ft_run = '1'
ft_train_sessions = ['2']
subject_num = '004'
ft_train_positions = ['1']

ft_pos_session_list = []
for s in ft_train_sessions:
  for p in ft_train_positions:
    ft_pos_session_list.append((p, s))

ft_test_events = 20*len(ft_pos_session_list)

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_test_data.pkl', name_start+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in ft_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

# # One shot:
# train_mask1 = [True for i in range(min_window)]
# train_mask2 = [False for i in range(min_window)]
# train_mask = np.array((train_mask1+train_mask2)*10)
# train_data = train_data[train_mask,:]
# train_labels = train_labels[train_mask]

train_data = np.reshape(train_data, (train_data.shape[0], 4, 4))
test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
# Add hmm data:
d = {}
for comp_num in [4]:
    for iterations_num in [10]:
        res = []
        tries_num = 1
        gestures_num = 10
        repetitions = 2
        val_num = 2
        lengths = [train_data.shape[1] for j in range(repetitions)]
        final_res = []
        models = []

        for i in range(gestures_num):
            X = np.concatenate([train_data[repetitions*i+j,:,:] for j in range(repetitions)])
            model = hmm.GaussianHMM(n_components=comp_num, covariance_type="diag", n_iter=iterations_num)
            model.fit(X, lengths)
            models.append(model)
            #folder_path = 'hmm_models/subject_'+subject_num+'/hmm_'+subject_num+'_pos'+position+'_s'+session 
            #if not os.path.exists(folder_path):
            #    os.makedirs(folder_path)
            #with open(folder_path+'/hmm_gest'+str(i)+'.pkl', "wb") as file: pickle.dump(model, file)
        test_res = []
        for s in range(gestures_num*val_num):
            test_res.append(np.argmax(np.array([models[i].score(test_data[s, :, :]) for i in range(gestures_num)])))

        print('test results:')
        print(test_res)
        print('real labels:')
        print(test_labels.T)
        acc = np.mean(np.array(test_res)==test_labels)
        print('HMM classification accuracy:')
        print(np.mean(np.array(test_res)==test_labels))


print('shapes of train data, train labels, test data, test labels:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)
per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]
# Data standartization:
train_data = train_data - np.mean(train_data, axis=0)
train_data = train_data / np.std(train_data, axis=0)
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

train_data = tf.expand_dims(train_data, axis=-1)
test_data = tf.expand_dims(test_data, axis=-1)

# Hyper-parameters:
if use_hmm:
  algo_type='CNN+HMM'
else:
  algo_type='CNN'
reg_strength = 0.001
lr = 1e-3
d1 = 0.3
epochs_num = 400
batch_size = 150#250
#min_window = 11 #17
#min_window = real_min_window
print('min_window='+str(min_window))

layer1 = tf.keras.layers.Conv2D(2, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength))
layer1.trainable = False
layer2 = tf.keras.layers.BatchNormalization()
layer2.trainable = False
layer3 =  tf.keras.layers.Conv2D(4, 2, padding='same', kernel_regularizer=tf.keras.regularizers.l2(reg_strength))
layer3.trainable = False
layer4 = tf.keras.layers.BatchNormalization()
layer4.trainable = True
layer5 = tf.keras.layers.Dense(40, kernel_regularizer=tf.keras.regularizers.l2(reg_strength))
layer5.trainable = True
layer6 = tf.keras.layers.BatchNormalization()
layer6.trainable = True
layer7 = tf.keras.layers.Dense(20, kernel_regularizer=tf.keras.regularizers.l2(reg_strength))
layer7.trainable = True
layer8 = tf.keras.layers.BatchNormalization()
layer8.trainable = True
layer9 = tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new')
layer9.trainable = True
min_angle = 1/18
max_shift_h = 0.05
max_shift_w = 0.05
model = tf.keras.Sequential([
    #tf.keras.layers.RandomTranslation(((-1)*max_shift_h, max_shift_h), ((-1)*max_shift_w, max_shift_w), fill_mode='nearest'),
    #tf.keras.layers.RandomRotation(((-1)*min_angle, min_angle)),
    layer1,
    #tf.keras.layers.BatchNormalization(),
    tf.keras.layers.ReLU(),
    #tf.keras.layers.MaxPooling2D(pool_size=(2,2), strides=1),
    layer2,
    layer3,
    #tf.keras.layers.BatchNormalization(),
    tf.keras.layers.ReLU(),
    layer4,
    #tf.keras.layers.MaxPooling2D(pool_size=(2,2), strides=1),
    tf.keras.layers.Flatten(),
    layer5,
    tf.keras.layers.ReLU(),
    layer6,
    ##tf.keras.layers.Dropout(d1),
    layer7,
    tf.keras.layers.ReLU(),
    layer8,
    tf.keras.layers.Dropout(d1),
    layer9,
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])


model.load_weights('weights/CNN/weights_task3_bb_full_sub_run'+run)
history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/CNN/weights_task3_ft_'+subject_num+'_pos'+ft_train_positions[0]+'_bb_run'+run+'_ft_run'+ft_run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based validation accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(ft_test_events)
prev = 0
for i in range(ft_test_events):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9]*len(ft_pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.show()

print('Validation accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()



test fine tuned CNN

In [None]:
# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start_val+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:    
    if filename[0] is not None:
        test_data_part = pd.read_pickle(filename[0])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[1] is not None:
        test_labels_part = pd.read_pickle(filename[1])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

test_data = np.reshape(test_data, (test_data.shape[0], 4, 4))
print('shapes of test data, test labels:')
print(test_data.shape)
print(test_labels.shape)

# Data standartization:
test_data = test_data - np.mean(test_data, axis=0)
test_data = test_data / np.std(test_data, axis=0)

test_data = tf.expand_dims(test_data, axis=-1)

model.load_weights('weights/CNN/weights_task3_ft_'+subject_num+'_pos'+ft_train_positions[0]+'_bb_run'+run+'_ft_run'+ft_run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)
print('\nSingle frame based test accuracy:', test_acc)
predictions = np.argmax(model.predict(test_data), axis=1)
print('Network predictions:')
print(predictions)
print('Real labels:')
print(test_labels.T)

# majority voting:
predictions_majority = np.zeros(60*len(ft_pos_session_list))
prev = 0
for i in range(60*len(ft_pos_session_list)):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('Network predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,0,0,0,0,1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,4,4,4,4,4,4,5,5,5,5,5,5,6,6,6,6,6,6,7,7,7,7,7,7,8,8,8,8,8,8,9,9,9,9,9,9]*len(ft_pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)
print(len(real_labels))

print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
mat = confusion_matrix(real_labels, predictions_majority)
print(repr(mat))
plt.imshow(mat)
#plt.colorbar()
plt.show()

folder_path = 'summaries/subject_'+subject_num+'/task3'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task3/summary_task3_ft_'+subject_num+'_pos'+ft_train_positions[0]+'_type_'+algo_type+'_bb_run'+run+'_ft_run'+ft_run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 3 ft, subject number '+subject_num+', algorithm type: '+algo_type+',backbone run '+run+', fine tuning run'+ft_run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Single frame based validation accuracy: '+str(test_acc)+'\n')
f.write('Test accuracy after majority voting: '+str(accuracy)+'\n')
f.write('weights file: weights/CNN/weights_task3_ft_'+subject_num+'_bb_run'+run+'_ft_run'+ft_run)
f.close()

confusion_matrix_df = pd.DataFrame(data=mat)
confusion_matrix_df.to_pickle(folder_path+'/CNN_ft_mat_bb_run'+run+'_ft_run'+ft_run+'subject_'+subject_num+'_pos'+ft_train_positions[0]+'.pkl', protocol=4)

## RNN

Training the backbone RNN

In [None]:
use_hmm = True
run = '1'

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
# if use_hmm:
#   name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
#   name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
# else:
#   name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
#   name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
# for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in pos_session_list]:
if use_hmm:
  name_start = 'hmm_generated_files/subject_'#+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/gf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_labels.pkl') for subject_num, position, session in sub_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 1000 #1500
batch_size = 800#400
#min_window = 11 #17
print('min_window='+str(min_window))

windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/RNN/weights_task3_bb_full_sub_run'+run)
#model.load_weights('weights/RNN/weights_task2_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.figure()
print('Confusion matrix:')
plt.imshow(cmat)
plt.colorbar()
#plt.show()

### fine tune RNN:

In [None]:
use_hmm = False
ft_run = '1'

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
if use_hmm:
  name_start = 'hmm_generated_files/subject_'+subject_num+'/gf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
else:
  name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
  name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_test_data.pkl', name_start+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_val_labels.pkl') for position, session in ft_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 2000 #1500
batch_size = 20 #full
#min_window = 11 #17
print('min_window='+str(min_window))

windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


layer1 = tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2')
layer1.trainable = False

model = tf.keras.Sequential([
    layer1,
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

model.load_weights('weights/RNN/weights_task3_bb_full_sub_run'+run)
history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
model.save_weights('weights/RNN/weights_task3_ft_'+subject_num+'_pos'+ft_train_positions[0]+'_bb_run'+run+'_ft_run'+ft_run)
#model.load_weights('weights/RNN/weights_task2_'+subject_num+'_pos'+position+'_run'+run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

plt.rcParams["figure.figsize"] = (12,6)
fig, axs = plt.subplots(1, 2)
axs[0].plot(history.history['accuracy'])
axs[0].set(title='Training Accuracy', xlabel='epochs', ylabel='accuracy')
axs[1].plot(history.history['loss'])
axs[1].set(title='Training Loss', xlabel='epochs', ylabel='loss')
#model.summary()
plt.figure()
print('Confusion matrix:')
plt.imshow(cmat)
plt.colorbar()
#plt.show()

In [None]:
import time
from sklearn.metrics import confusion_matrix

start_time = time.time()

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None

name_start = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start_val = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


# Hyper-parameters:
if use_hmm:
  algo_type='RNN+HMM'
else:
  algo_type='RNN'
reg_strength = 0.0001
lr = 1e-2
d1 = 0.1
#min_angle = 1/18
#max_shift_h = 0.05
#max_shift_w = 0.05
epochs_num = 2000 #1500
batch_size = 20 #full
#min_window = 11 #17
print('min_window='+str(min_window))


windows_num = min_window
reshaped_train_data = np.zeros((int(train_data.shape[0]/windows_num), windows_num, 16))
reshaped_test_data = np.zeros((int(test_data.shape[0]/windows_num), windows_num, 16))
for i in range(reshaped_train_data.shape[0]):
    reshaped_train_data[i,:,:] = train_data[windows_num*i:windows_num*(i+1)]
for i in range(reshaped_test_data.shape[0]):
    reshaped_test_data[i,:,:] = test_data[windows_num*i:windows_num*(i+1)]
train_data = reshaped_train_data
test_data = reshaped_test_data
train_labels = train_labels[::windows_num]
test_labels = test_labels[::windows_num]
print('shapes:')
print(train_data.shape)
print(train_labels.shape)
print(test_data.shape)
print(test_labels.shape)


per = np.random.permutation(len(train_data))
train_data = train_data[per, :]
train_labels = train_labels[per]


model = tf.keras.Sequential([
    tf.keras.layers.LSTM(12, dropout=d1, input_shape=(windows_num,16), name='lstm2'),
    tf.keras.layers.Dense(10, kernel_regularizer=tf.keras.regularizers.l2(reg_strength), name='new'),
    tf.keras.layers.Softmax()
])

def scheduler(epoch):
  if epoch < 600:
    return lr
  else:
    return lr

callback = tf.keras.callbacks.LearningRateScheduler(scheduler)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
model.compile(optimizer=optimizer,
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

#history = model.fit(train_data, train_labels, epochs=epochs_num,batch_size=batch_size, callbacks=[callback])
#model.save_weights('weights/RNN/weights_task1_'+subject_num+'_pos'+position+'_run'+run)
model.load_weights('weights/RNN/weights_task3_ft_'+subject_num+'_pos'+ft_train_positions[0]+'_bb_run'+run+'_ft_run'+ft_run)
test_loss, test_acc = model.evaluate(test_data,  test_labels)#, verbose=2)
print('\nTest accuracy:', test_acc)

predictions = np.argmax(model.predict(test_data), axis=1)
cmat = confusion_matrix(test_labels, predictions)

folder_path = 'summaries/subject_'+subject_num+'/task3'
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
summary_name = 'summaries/subject_'+subject_num+'/task3/summary_task3_ft_'+subject_num+'_type_'+algo_type+'_bb_run'+run+'_ft_run'+ft_run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 3 ft, subject number '+subject_num+', algorithm type: '+algo_type+',backbone run '+run+', fine tuning run'+ft_run+'\n')
f.write('sessions: ')
for s_ind, s in enumerate(train_sessions):
  if s_ind != len(train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('positions: ')
for p_ind, p in enumerate(train_positions):
  if p_ind != len(train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('regularization strength: '+str(reg_strength)+'\n')
f.write('learning rate: '+str(lr)+'\n')
f.write('dropout: '+str(d1)+'\n')
f.write('number of epochs: '+str(epochs_num)+'\n')
f.write('batch size: '+str(batch_size)+'\n')
f.write('minimal window length (in samples): '+str(min_window)+'\n')
f.write('Test accuracy after majority voting: '+str(test_acc)+'\n')
f.write('weights file: weights/RNN/weights_task3_ft_'+subject_num+'_bb_run'+run+'_ft_run'+ft_run)
f.close()

confusion_matrix_df = pd.DataFrame(data=cmat)
confusion_matrix_df.to_pickle(folder_path+'/RNN_mat.pkl', protocol=4)

## KNN

**Task 1** - KNN

In [None]:
run = '1'

In [None]:
k_KNN = 1

from sklearn.metrics import confusion_matrix

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None

name_start_old = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start_val_old = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
name_start_val = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
#for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', None, None) for position, session in pos_session_list] + [(name_start+'_pos'+position+'_s'+session+'_test_data.pkl', name_start+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:
for filename in [(name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', None, None) for subject_num, position, session in sub_pos_session_list] + [(name_start_old+'_pos'+position+'_s'+session+'_test_data.pkl', name_start_old+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val_old+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val_old+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:
#for filename in [(name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_labels.pkl') for subject_num, position, session in sub_pos_session_list]:
#for filename in [(pre+'_generated_train_data_10gest.pkl', pre+'_generated_train_labels_10gest.pkl', pre+'_val_data_10gest.pkl', pre+'_val_labels_10gest.pkl') for pre in ['grc1pos1', 'grc1pos2']]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')
    # if filename[3] is not None:
    #   train_mask1 = [True for i in range(min_window)]
    #   train_mask2 = [False for i in range(min_window)]
    #   train_mask = np.array((train_mask1+train_mask2)*10)
    #   data = data[train_mask,:]
    #   labels = labels[train_mask]
      #data = None
      #labels = None

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)


#test_mask = np.array((train_mask1*2+train_mask2*6)*4)
##test_data = test_data[test_mask,:,:]
##test_labels = test_labels[test_mask]
print(train_data.shape)
print(test_data.shape)
KNN = KNeighborsClassifier(n_neighbors=k_KNN)
KNN.fit(train_data, train_labels)
predictions = KNN.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('KNN single frame accuracy: '+str(single_acc))

# majority voting:
predictions_majority = np.zeros(60*len(ft_pos_session_list))
prev = 0
for i in range(60*len(ft_pos_session_list)):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,0,0,0,0,1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,4,4,4,4,4,4,5,5,5,5,5,5,6,6,6,6,6,6,7,7,7,7,7,7,8,8,8,8,8,8,9,9,9,9,9,9]*len(ft_pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()
summary_name = 'summaries/subject_'+subject_num+'/task3/summary_task3_'+subject_num+'_type_KNN_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 3, subject number '+subject_num+', algorithm type: KNN, run '+run+'\n')
f.write('fine tuning sessions: ')
for s_ind, s in enumerate(ft_train_sessions):
  if s_ind != len(ft_train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('fine tuning positions: ')
for p_ind, p in enumerate(ft_train_positions):
  if p_ind != len(ft_train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('k: '+str(k_KNN)+'\n')
f.write('Single frame accuracy: '+str(single_acc)+'\n')
f.write('Accuracy after majority voting: '+str(accuracy)+'\n')
f.close()

## SVM

**Task 1** - SVM

In [None]:
run = '1'

In [None]:
c = 100

# Read the files:
train_data = None
train_labels = None
test_data = None
test_labels = None
name_start_old = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start_val_old = 'segmented_files/subject_'+subject_num+'/sf_'+subject_num
name_start = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
name_start_val = 'segmented_files/subject_'#+subject_num+'/sf_'+subject_num
#for filename in [(name_start+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+'_pos'+position+'_s'+session+'_train_labels.pkl', None, None) for position, session in pos_session_list] + [(name_start+'_pos'+position+'_s'+session+'_test_data.pkl', name_start+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:
for filename in [(name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', None, None) for subject_num, position, session in sub_pos_session_list] + [(name_start_old+'_pos'+position+'_s'+session+'_test_data.pkl', name_start_old+'_pos'+position+'_s'+session+'_test_labels.pkl', name_start_val_old+'_pos'+position+'_s'+session+'_train_data.pkl', name_start_val_old+'_pos'+position+'_s'+session+'_train_labels.pkl') for position, session in ft_pos_session_list]:
#for filename in [(name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_data.pkl', name_start+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_train_labels.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_data.pkl', name_start_val+subject_num+'/sf_'+subject_num+'_pos'+position+'_s'+session+'_val_labels.pkl') for subject_num, position, session in sub_pos_session_list]:
    if filename[0] is not None:
        data = pd.read_pickle(filename[0])
        data = data.to_numpy()
    else:
        data = None
    print('train data read!')
    if filename[1] is not None:
        labels = pd.read_pickle(filename[1])
        labels = labels.to_numpy()
    else:
        labels = None
    print('train labels read!')
    if filename[2] is not None:
        test_data_part = pd.read_pickle(filename[2])
        test_data_part = test_data_part.to_numpy()
    else:
        test_data_part = None
    print('test data read!')
    if filename[3] is not None:
        test_labels_part = pd.read_pickle(filename[3])
        test_labels_part = test_labels_part.to_numpy()
    else:
        test_labels_part = None
    print('test labels read!')

    if train_data is not None and data is not None:
        train_data = np.concatenate((train_data, data))
        train_labels = np.concatenate((train_labels, labels))
    if test_data is not None and test_data_part is not None:
        test_data = np.concatenate((test_data, test_data_part))
        test_labels = np.concatenate((test_labels, test_labels_part))
    if train_data is None or test_data is None:
        if data is not None and train_data is None:
            train_data = copy.deepcopy(data)
            train_labels = copy.deepcopy(labels)
        if test_data_part is not None and test_data is None:
            test_data = copy.deepcopy(test_data_part)
            test_labels = copy.deepcopy(test_labels_part)

#svm_classifier = pickle.load(open('svm_model.sav', 'rb'))
svm_classifier = svm.SVC(C=c)
svm_classifier.fit(train_data, train_labels)
predictions = svm_classifier.predict(test_data).astype('int')
single_acc = np.mean(predictions==test_labels.T)
print('SVM single frame accuracy:')
print(single_acc)
SVM_model_name = 'weights/SVM/SVM_task3_'+subject_num+'_pos'+ft_train_positions[0]+'_run'+run
pickle.dump(svm_classifier, open(SVM_model_name+'.sav', 'wb'))


# majority voting:
predictions_majority = np.zeros(60*len(ft_pos_session_list))
prev = 0
for i in range(60*len(ft_pos_session_list)):
    relevant_pred = predictions[prev:prev+min_window]
    predictions_majority[i] = np.argmax(np.bincount(relevant_pred))
    prev = prev+int(min_window)
print('predictions after majority voting:')
print(predictions_majority)
predictions_majority = predictions_majority.astype(int)
real_labels = np.array([0,0,0,0,0,0,1,1,1,1,1,1,2,2,2,2,2,2,3,3,3,3,3,3,4,4,4,4,4,4,5,5,5,5,5,5,6,6,6,6,6,6,7,7,7,7,7,7,8,8,8,8,8,8,9,9,9,9,9,9]*len(ft_pos_session_list))
accuracy = np.mean(predictions_majority==real_labels)
print('Real labels:')
print(real_labels)


plt.rcParams["figure.figsize"] = (12,6)
print('Test accuracy after majority voting:')
print(accuracy)
print('Confusion matrix:')
plt.imshow(confusion_matrix(real_labels, predictions_majority))
plt.colorbar()
#plt.show()
summary_name = 'summaries/subject_'+subject_num+'/task3/summary_task3_'+subject_num+'_pos'+position+'_type_SVM_run'+run
f = open(summary_name+'.txt', 'w')
f.write('Run summary: task 3, subject number '+subject_num+', algorithm type: SVM, run '+run+'\n')
f.write('fine tuning sessions: ')
for s_ind, s in enumerate(ft_train_sessions):
  if s_ind != len(ft_train_sessions) - 1:
    f.write(s+', ')
  else:
    f.write(s+'\n')
f.write('fine tuning positions: ')
for p_ind, p in enumerate(ft_train_positions):
  if p_ind != len(ft_train_positions) - 1:
    f.write(p+', ')
  else:
    f.write(p+'\n')   
f.write('C: '+str(c)+'\n')
f.write('model name: '+SVM_model_name+'.sav'+'\n')
f.write('single frame accuracy: '+str(single_acc)+'\n')
f.write('accuracy after majority voting: '+str(accuracy)+'\n')
f.close()