In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, f1_score, balanced_accuracy_score
# %matplotlib ipympl
# %matplotlib notebook
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, GridSearchCV, PredefinedSplit
from sklearn.preprocessing import StandardScaler
subject_id_test = 'S9'
subject_id_val = 'S10'

def Tuning_Params(model, tune_parameters, X_train, y_train, X_val=None, y_val=None, 
                  scoring_func='balanced_accuracy', cv=5, verbose=1):
    if X_val is not None and y_val is not None: # specific validate set
        val_fold = [1 for x in range(len(y_train))] + [0 for y in range(len(y_val))]
        cv = PredefinedSplit(val_fold)
        X_train_all = np.concatenate((X_train, X_val), axis=0)
        y_train_all = np.concatenate((y_train, y_val), axis=0)
    else:
        X_train_all = X_train
        y_train_all = y_train
    # Cross validate to find best hyper parameters
    clf = GridSearchCV(estimator=model, 
                       param_grid=tune_parameters, 
                       cv=cv, verbose=verbose, scoring=scoring_func,
                       refit=False)
    clf.fit(X_train_all, y_train_all)
    return clf

In [2]:
DATA_DIR = '/home/nvtu/PhD_Work/StressDetection/DATA/MyDataset/WESAD'
NAME_DATASET = 'WESAD'
data_group = np.load(f'{DATA_DIR}/{NAME_DATASET}_WRIST_groups_1.npy')
data_gt = np.load(f'{DATA_DIR}/{NAME_DATASET}_WRIST_ground_truth_1.npy')
data_ft = np.load(f'{DATA_DIR}/{NAME_DATASET}_WRIST_stats_feats_1.npy')
# data_ft_con = np.load(f'EmbededFt/{NAME_DATASET}/{NAME_DATASET}_WRIST_contrastive_embed_{subject_id_test}.npy')
data_ft_con = np.load('Output/WESAD/EmbedFt/EmbedFt_Combine_Euclid_sample_cross_internal_S9.npy')

# Create dataframe for dataset
column_values = [f'f{x}' for x in range(data_ft.shape[1])]
data_full_ori = pd.DataFrame(data = data_ft,  
                         columns = column_values)
data_full_ori['subject_id'] = data_group
data_full_ori['label'] = data_gt

data_full_con = pd.DataFrame(data = data_ft_con,  
                         columns = column_values)
data_full_con['subject_id'] = data_group
data_full_con['label'] = data_gt

list_subject_id = np.unique(data_full_ori['subject_id']).tolist()

# data_train_val = data_full_ori[data_full_ori.subject_id != subject_id_test]
# data_test = data_full_ori[data_full_ori.subject_id == subject_id_test]
# # subject_id_validate = random.Random(1509).choices(list(set(data_train_val.subject_id)),k=1)[0]
# # subject_id_validate = 'RY2'
# # data_train = data_train_val[data_train_val.subject_id != subject_id_validate]
# # data_validate = data_train_val[data_train_val.subject_id == subject_id_validate]
# ft_names = data_full_ori.columns.tolist()

# # Scaler Data
# X_train_val = data_train_val.iloc[:,:-1].to_numpy()
# y_train_val = data_train_val.iloc[:,-1].to_numpy()
# X_test = data_test.iloc[:,:-1].to_numpy()
# y_test = data_test.iloc[:,-1].to_numpy()

# # scaler = RobustScaler()
# # X_train[:,:-1] = scaler.fit_transform(X_train[:,:-1])
# # X_validate[:,:-1] = scaler.transform(X_validate[:,:-1])
# # X_test[:,:-1] = scaler.transform(X_test[:,:-1])
# # joblib.dump(scaler, f'{SAVE_MODEL_DIR}/{NAME_DATASET}/Model/Scaler_LH_{subject_id_test}.joblib')

# # Create Dataframe
# df_train_ori = pd.DataFrame(data = X_train_val, columns = ft_names[:-1])
# df_train_ori['label'] = y_train_val

# df_test_ori = pd.DataFrame(data = X_test, columns = ft_names[:-1])
# df_test_ori['label'] = y_test




# data_train_val = data_full_con[data_full_con.subject_id != subject_id_test]
# data_test = data_full_con[data_full_con.subject_id == subject_id_test]
# # subject_id_validate = random.Random(1509).choices(list(set(data_train_val.subject_id)),k=1)[0]
# # subject_id_validate = 'RY2'
# # data_train = data_train_val[data_train_val.subject_id != subject_id_validate]
# # data_validate = data_train_val[data_train_val.subject_id == subject_id_validate]
# ft_names = data_full_con.columns.tolist()

# # Scaler Data
# X_train_val = data_train_val.iloc[:,:-1].to_numpy()
# y_train_val = data_train_val.iloc[:,-1].to_numpy()
# X_test = data_test.iloc[:,:-1].to_numpy()
# y_test = data_test.iloc[:,-1].to_numpy()

# # scaler = RobustScaler()
# # X_train[:,:-1] = scaler.fit_transform(X_train[:,:-1])
# # X_validate[:,:-1] = scaler.transform(X_validate[:,:-1])
# # X_test[:,:-1] = scaler.transform(X_test[:,:-1])
# # joblib.dump(scaler, f'{SAVE_MODEL_DIR}/{NAME_DATASET}/Model/Scaler_LH_{subject_id_test}.joblib')

# # Create Dataframe
# df_train_con = pd.DataFrame(data = X_train_val, columns = ft_names[:-1])
# df_train_con['label'] = y_train_val

# df_test_con = pd.DataFrame(data = X_test, columns = ft_names[:-1])
# df_test_con['label'] = y_test

In [9]:
# Set the parameters by cross-validation for RF
tuned_parameters = {'n_neighbors': [2, 5, 10, 15 ,20, 25, 50]}
model = KNeighborsClassifier(n_jobs=None)

list_acc_train = []
list_acc_test = []
list_f1_train = []
list_f1_test = []
list_bacc_train = []
list_bacc_test = []

data_test = data_full_con[data_full_con.subject_id == subject_id_test]
data_validate = data_full_con[data_full_con.subject_id == subject_id_val]
data_train = data_full_con[(data_full_con.subject_id != subject_id_test) & (data_full_ori.subject_id != subject_id_val)]

# split test sets
X_test = data_test.iloc[:,:-2].to_numpy()
y_test = data_test.iloc[:,-1].to_numpy()

# split into train - validate
X_train = data_train.iloc[:,:-2].to_numpy()
y_train = data_train.iloc[:,-1].to_numpy()

X_validate = data_validate.iloc[:,:-2].to_numpy()
y_validate = data_validate.iloc[:,-1].to_numpy()

# validate_portion = 0.2
# X_train, X_validate, y_train, y_validate = train_test_split(X_train_val, y_train_val, 
#                                                           test_size=validate_portion, 
#                                                           random_state=1509, stratify=y_train_val)

# Scaler Data
# scaler = StandardScaler()
# X_train = scaler.fit_transform(X_train)
# # X_train = scaler.transform(X_train)
# X_validate = scaler.transform(X_validate)
# X_test = scaler.transform(X_test)
X_train_val = np.concatenate((X_train, X_validate))
y_train_val = np.concatenate((y_train, y_validate))

# GridSearch
gs_model = Tuning_Params(model=model, tune_parameters=tuned_parameters, X_train=X_train, y_train=y_train,
                      X_val=X_validate, y_val=y_validate, scoring_func='balanced_accuracy', verbose=0)

# Train again with best hyperparameter
model_final = KNeighborsClassifier(n_jobs=None, n_neighbors=gs_model.best_params_['n_neighbors'])
model_final.fit(X_train, y_train)

# Prediction
Y_pred_test = model_final.predict(X_test)
Y_pred_train = model_final.predict(X_train_val)
acc_test = accuracy_score(y_test, Y_pred_test)
acc_train = accuracy_score(y_train_val, Y_pred_train)

f1_test = f1_score(y_test, Y_pred_test)
f1_train = f1_score(y_train_val, Y_pred_train)

bacc_test = balanced_accuracy_score(y_test, Y_pred_test)
bacc_train = balanced_accuracy_score(y_train_val, Y_pred_train)

# append to list
list_acc_train.append(acc_train)
list_acc_test.append(acc_test)
list_bacc_train.append(bacc_train)
list_bacc_test.append(bacc_test)
list_f1_train.append(f1_train)
list_f1_test.append(f1_test)

In [10]:
gs_model.best_params_['n_neighbors']

15

In [5]:
acc_train, acc_test

(0.9997914680648237, 0.9451882845188284)

In [6]:
bacc_train, bacc_test

(0.9996342478633407, 0.8752380952380953)

In [7]:
f1_train, f1_test

(0.9995418548334315, 0.8574537540805224)

In [11]:
acc_train, acc_test, bacc_train, bacc_test, f1_train, f1_test

(0.9941313155386082,
 0.9451882845188284,
 0.992324141826189,
 0.8752380952380953,
 0.9871334334791979,
 0.8574537540805224)

In [None]:
# # Create Dataframe
# result = pd.DataFrame(data = list_subject_id,  
#                       columns = ['subject_id'])
# result['acc_train'] = list_acc_train
# result['acc_test'] = list_acc_test
# result['bacc_train'] = list_bacc_train
# result['bacc_test'] = list_bacc_test
# result['f1_train'] = list_f1_train
# result['f1_test'] = list_f1_test

# mean_result = result.iloc[:,1:].mean()
# mean_result = mean_result.to_frame().T
# mean_result['subject_id'] = 'Average'
# result = pd.concat([result, mean_result], axis=0)

# # Save to file
# result.to_csv(f'Result/GENERIC_{NAME_DATASET}_SVM_balance_weight_contrastive_ft.csv', index=False)

In [3]:
# cos_func = ContrastiveLoss_CosineSimilarity(margin=0.1, max_violation=True)
# test_dataset = ContrastiveDataset(df=df_test_con, numb_samples=100000, k=1.5)
# test_dataset.shuffle(seed=1509)
# test_dataloader = make_ContrastiveDataLoader(test_dataset, batch_size=2048)
# loss_total_test = calculate_contrastive_loss(test_dataloader, loss_func_con=cos_func)
# loss_total_test

In [4]:
# full_ori: 0.27 - full_con: 0.21
# test_ori: 0.33 - test_con: 0.22
# train_ori: 0.27 - train_con: 0.21

In [5]:
# Apply PCA to visualize
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

X_train_ori = df_train_ori.iloc[:,:-2].to_numpy()
X_test_ori = df_test_ori.iloc[:,:-2].to_numpy()
X_ori = np.concatenate((X_train_ori, X_test_ori))
scaler = StandardScaler()
X_ori = scaler.fit_transform(X_ori)

pca = PCA(n_components=2)
pca.fit(X_ori)
X_ori_transform = pca.transform(X_ori)
X_ori_train = X_ori_transform[:X_train_ori.shape[0],:]
X_ori_test = X_ori_transform[X_train_ori.shape[0]:,:]
y_train = df_train_ori.iloc[:,-1].to_numpy()
y_test = df_test_ori.iloc[:,-1].to_numpy()

In [6]:
plt.scatter(X_ori_train[:,0], X_ori_train[:,1], marker='o', s=1);

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [7]:
X_train_con = df_train_con.iloc[:,:-2].to_numpy()
X_test_con = df_test_con.iloc[:,:-2].to_numpy()
X_con = np.concatenate((X_train_con, X_test_con))
scaler = StandardScaler()
X_con = scaler.fit_transform(X_con)

pca = PCA(n_components=2)
pca.fit(X_con)
X_con_transform = pca.transform(X_ori)
X_con_train = X_con_transform[:X_train_con.shape[0],:]
X_con_test = X_con_transform[X_train_con.shape[0]:,:]
y_train = df_train_ori.iloc[:,-1].to_numpy()
y_test = df_test_ori.iloc[:,-1].to_numpy()

In [8]:
plt.scatter(X_con_train[:,0], X_con_train[:,1], marker='o', s=1);

In [11]:
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
ax.scatter(X_con_train[:,0], X_con_train[:,1], X_con_train[:,2], marker='o', s=1)
ax.scatter(X_con_test[:,0], X_con_test[:,1], X_con_test[:,2], marker='^', s=1)
ax.set_xlim([-5, 5])
ax.set_ylim([-5, 5])
ax.set_zlim([-10, 20])

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

(-10.0, 20.0)