In [None]:
#Basic imports
import numpy as np
import pandas as pd

from dataframe.csv_utils import (
    load_data_from_csv,
    get_labels_from_result,
    get_features_from_result,
)

from labels import get_tranformed_labels, binary_label, print_label_count, get_categorical_labels

# For machine learning modeling
from sklearn.model_selection import GroupKFold
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import shap

In [None]:
"""
    load features from csv
"""

dir_name = "extracted_features_v1"
result = load_data_from_csv(dir_name)

all_label_array, label_list = get_labels_from_result(result)
all_feature_array, feature_names = get_features_from_result(result, False)
all_feature_array = all_feature_array.drop(["index"], axis=1)
feature_names = all_feature_array.columns
print(all_feature_array.shape, len(feature_names), len(label_list))

In [None]:
transformed = get_tranformed_labels(all_label_array)
label_list = get_categorical_labels(all_label_array, valence_threshold=0.6)
valence_lables = binary_label(all_label_array['valence'], 0.6)
is_multi = True

#label_list = valence_lables
print_label_count(label_list)

In [None]:
colors = ["c", "y", "m", "r"]

NUM_LABEL_PER_SUBJECT = 130
accuracy = []
gkf = GroupKFold()
label_array = np.array(label_list)
groups_list = [[i/NUM_LABEL_PER_SUBJECT] for i, j in enumerate(label_list)]
group_array = np.hstack(groups_list)

# normalize
scaler = StandardScaler()
normalized_all_feature_array = pd.DataFrame(
    scaler.fit_transform(all_feature_array), columns=all_feature_array.columns
)

list_shap_values = list()
list_test_sets = list()
for train_index, val_index in gkf.split(
    normalized_all_feature_array, label_array, groups=group_array
):
    train_features, train_labels = (
        normalized_all_feature_array.iloc[train_index],
        label_array[train_index],
    )
    val_features, val_labels = (
        normalized_all_feature_array.iloc[val_index],
        label_array[val_index],
    )

    # create model instance
    model = XGBClassifier(n_estimators=2)
    # fit model
    model.fit(train_features, train_labels)
    # Print accuracy.
    acc = model.score(val_features, val_labels)
    print("Accuracy: %.2f%%" % (acc * 100.0))
    accuracy.append(acc)

    # Summary plot
    shap_values = shap.TreeExplainer(model).shap_values(val_features)
     #for each iteration we save the test_set index and the shap_values
    list_shap_values.append(shap_values)
    list_test_sets.append(val_index)

In [None]:
print(accuracy)
print(np.mean(accuracy))

In [None]:
# combining results from all iterations
test_set = list_test_sets[0]
shap_values = np.array(list_shap_values[0])
for i in range(1,len(list_test_sets)):
    test_set = np.concatenate((test_set,list_test_sets[i]),axis=0)
    shap_values = np.concatenate((shap_values,np.array(list_shap_values[i])),axis=1) if is_multi else np.concatenate((shap_values,np.array(list_shap_values[i])),axis=0) # for binary

# bringing back variable names    
X_test = pd.DataFrame(normalized_all_feature_array.iloc[test_set],columns=all_feature_array.columns)

# creating explanation plot for the whole experiment, the first dimension from shap_values indicate the class we are predicting (0=0, 1=1)
if is_multi:
    shap.summary_plot(shap_values[1], X_test)  # for multi i = class_num
else:
    shap.summary_plot(shap_values, X_test)  # for binary

In [None]:
from data_utils import (
    load_data_from_dir,
    concatenate_all_data,
)

def get_raw_signal(dir_names, markers):
    dir_to_data = {}
    for dir_name in dir_names:
        all_data = load_data_from_dir(dir_name)
        dir_to_data[dir_name] = all_data


    """ 
        concatenate raw signal (x)
    """
    data_list = []
    block_names = list(dir_to_data[dir_names[0]].keys())
    block_names.sort()

    for m in markers:  # EOG.__name__, , EMG.__name__, EGG.__name__
        all_participants_data, condition_to_labels = concatenate_all_data(dir_to_data, m)
        all_epoch_data = np.swapaxes(
            all_participants_data, 0, -1
        )  # (num_channels, num_data_points, num_epochs) => (num_epochs, num_data_points, num_channels)

        data_list.append(all_epoch_data)

    return np.concatenate(data_list, axis=2), condition_to_labels



ALL_DIRS = [
    "../CleandDataV1/2017",
    "../CleandDataV1/2018",
]

data_array, condition_to_labels = get_raw_signal(ALL_DIRS, ['EEG'])

In [None]:
from tensorflow.keras import utils as np_utils
from models import train_with_cnn

""" 
    prepare labels (y)
"""
# to one-hot encoding vector
# label_array = np_utils.to_categorical(
#     label_list, num_classes=4
# )  # nvla, nvha, hvla, hvha
label_list = binary_label(condition_to_labels['valence'])
label_array = np.array(label_list)

groups_list = [[i/NUM_LABEL_PER_SUBJECT] for i, j in enumerate(label_list)]
group_array = np.hstack(groups_list)

print(data_array.shape, label_array.shape, group_array.shape)

In [None]:
""" 
    with CNN
"""
num_channel = data_array.shape[2]
accuracy = train_with_cnn(12288, num_channel, data_array, label_array, group_array)
print(accuracy)
print(np.mean(accuracy))

In [None]:
""" 
    with logistic regression
"""
# from models import train_with_logistic

# label_name = "attention"
# best_model = train_with_logistic(all_features, label_name, condition_to_labels, group_array)
# name_to_transformed = get_tranformed_labels(condition_to_labels)

# # assume bigger coefficents has more contribution to the model
# # but have to be sure that the features has THE SAME SCALE otherwise this assumption is not correct.
# importance = best_model["classifier"].coef_[0]

# feat_importances = pd.Series(importance, index=get_feature_names(importance))
# feat_importances.nlargest(10).plot(
#     kind="barh", title=f"{label_name} Feature Importance"
# )
