# Imports

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Import libraries 
import math
import scipy
import json
import copy as cp
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px

# Import sklearn
from sklearn.model_selection import train_test_split
from sklearn import feature_selection as fs
from sklearn import preprocessing
from sklearn import metrics
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.cluster import KMeans
from sklearn import linear_model

from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import cross_val_score

from imblearn.over_sampling import ADASYN 
from collections import Counter

# Functions

In [3]:
## Feature reduction via filtering approaches
def anova_reduction(X, y, n, p):
  # Select n features with the largest F-scores for y
  fs_fit_fscore = fs.SelectKBest(fs.f_classif, k=n)
  fs_fit_fscore.fit_transform(X, y)
  fs_indices_fscore = np.argsort(np.nan_to_num(fs_fit_fscore.scores_))[::-1][0:n]
  best_features_fscore = X.columns[fs_indices_fscore].values
  if p:
    print("Selected: ", best_features_fscore)
  return X.iloc[:, fs_indices_fscore]

def variance_reduction(X, cond):
  # Drop features with less than cond variance 
  vars = X.var(axis=0)
  names = vars.keys()
  remove = []
  for i in range(len(vars)):
    if vars[i] < cond:
      remove.append(names[i])
  print("Removed: ", len(remove))
  return X.drop(remove, axis = 1)

def mutual_reduction(X, y, n, p):
  # Selects n features with the highest mutual information to y
  fs_fit_mutual_info = fs.SelectKBest(fs.mutual_info_classif, k=n)
  fs_fit_mutual_info.fit_transform(X, y)
  fs_indices_mutual_info = np.argsort(fs_fit_mutual_info.scores_)[::-1][0:n]
  best_features_mutual_info = X.columns[fs_indices_mutual_info].values
  if p: 
    print("Selected: ", best_features_mutual_info)
  return X.iloc[:, fs_indices_mutual_info]

def kendall_reduction(X, y, n, p):
  # Select n features with the highest correlation to y 
  scores = []
  for feat in X.columns:
    # absolute for positive values
    s = abs(scipy.stats.kendalltau(X[feat], y)[0])
    scores.append(s)
  relevant_kfeatures_i = np.argsort(scores)[0::n]
  relevant_kfeatures = X.columns[relevant_kfeatures_i].values
  if p: 
    print("Selected: ", relevant_kfeatures)
  return X.iloc[:, relevant_kfeatures_i]
  
## Additional Preprocessing
# Merging Classes that are less than or equal to the inputted score
def mergeLowerClasses(df, score):
  for s in ['score_Gr', 'score_Al', 'score_Fl']:
    for i in range(df.shape[0]):
      if df.at[i, s] <= score:
        df.at[i, s] = score
  return df

# Floor values
def floorYs(y):
  y = [math.floor(s) for s in y]
  return y 

In [4]:
## Scoring 
def get_scores(yactual, yhat, met):
  acc = accuracy_score(yactual, yhat)
  f1 = f1_score(yactual, yhat, average=met)
  re = recall_score(yactual, yhat, average=met)
  pre = precision_score(yactual, yhat, average=met)
  print("Accuracy: ", acc * 100)
  for i in range(3):
    print("Class " + str(i + 3))
    print("F1: " + str(f1[i] * 100) + ", Recall: " + str(re[i] * 100) + ", Precision: " + str(pre[i] * 100))
  return acc, f1, re, pre

In [5]:
# LOSO Prediction kNN 
def loso_predict_knn(model, X, y, r, s, ada, n):
    model_ = cp.deepcopy(model)
    cv = LeaveOneOut()
    # enumerate splits
    y_true, y_pred = list(), list()
    for train_ix, test_ix in cv.split(X):
      # split data - X is in a numpy array 
      X_train, X_test = X.iloc[train_ix, :], X.iloc[test_ix, :]
      y_train, y_test = np.array(y)[train_ix], np.array(y)[test_ix]

      if r == 'anova':
        X_train = anova_reduction(X_train, y_train, s, False)
      elif r == 'mutual':
        X_train = mutual_reduction(X_train, y_train, s, False)
      elif r == 'kendall':
        X_train = kendall_reduction(X_train, y_train, s, False)

      if ada:
        ada = ADASYN(random_state=42, n_neighbors=n)
        X_train, y_train = ada.fit_resample(X_train, y_train)

      scaler = preprocessing.MinMaxScaler()
      X_trainS = scaler.fit_transform(X_train)

      # fit model
      model_.fit(X_trainS, y_train)

      # evaluate model
      X_testR = X_test[X_train.columns]
      X_testS = scaler.transform(X_testR)
      yhat = model_.predict(X_testS)

      # store
      y_true.append(y_test[0])
      y_pred.append(yhat[0])

    return y_true, y_pred

# LOSO Prediction Random Forest 
def loso_predict_rf(model, X, y, ada, n):
    model_ = cp.deepcopy(model)
    cv = LeaveOneOut()

    # enumerate splits
    y_true, y_pred = list(), list()

    for train_ix, test_ix in cv.split(X):
      # split data - X is in a dataframe
      X_train, X_test = X.iloc[train_ix, :], X.iloc[test_ix, :]
      y_train, y_test = np.array(y)[train_ix], np.array(y)[test_ix]

      if ada:
        ada = ADASYN(random_state=42, n_neighbors=n)
        X_train, y_train = ada.fit_resample(X_train, y_train)

      # fit model
      model_.fit(X_train, y_train)

      # evaluate model
      yhat = model_.predict(X_test)

      # store
      y_true.append(y_test[0])
      y_pred.append(yhat[0])

    return y_true, y_pred

# generic loso
def losoEval2(model, X, y, metric):
  cv = LeaveOneOut()
  scores = cross_val_score(model, X, yGr, scoring=metric, cv=cv, n_jobs=-1)
  # print(scores)
  return np.mean(scores), np.std(scores)

# Load Data

In [None]:
# Load in the data
drtp = pd.read_csv('/content/drive/MyDrive/Thesis/Data/RTP_FeaturesV3.csv', index_col=[0])

# Drop rows that have NaN
drtp = drtp.dropna()
drtp = drtp.reset_index(drop=True)

# Generate combined
comb = []
for i in range(len(drtp['score_Gr'])):
  tot = drtp['score_Al'][i] + drtp['score_Gr'][i] + drtp['score_Fl'][i]
  if tot > 14:
    comb.append(5)
  elif  tot > 10.5:
    comb.append(4)
  else:
    comb.append(3)
  
# Seperate X, y 
X = mergeLowerClasses(drtp, 3.5)  
yGr = floorYs(drtp['score_Gr'])
yAl	= floorYs(drtp['score_Al']) 
yFl = floorYs(drtp['score_Fl'])
subjects = drtp['subject']
X = drtp.drop(['subject', 'move', 'score_Gr', 'score_Al', 'score_Fl', 'Unnamed: 0.1'], axis = 1)
X.head()

# Feature Summary Statistics 

In [None]:
Xstats = X.describe().drop('count')
Xstats.sort_values(by = 'std', axis = 1, ascending = False)
Xstats

In [None]:
Xstats_stats = Xstats.transpose().describe()
Xstats_stats

## Random Forest

In [18]:
metric = comb

In [None]:
# Grid search over all the hyperparemeters of interest for kNN
params = { 
    'n_estimators': [10, 25, 50, 100],
    'max_features': ['sqrt'],
    'max_depth': [4,5,6],
    'criterion': ['gini', 'entropy'],
    'class_weight': [None, 'balanced', 'balanced_subsample']
}

grid_search = GridSearchCV(estimator=RandomForestClassifier(),
                           param_grid=params,
                           cv = LeaveOneOut(),
                           n_jobs=-1, verbose=1, scoring="accuracy") #f1_macro

grid_search.fit(X, metric)

rf_best = grid_search.best_params_
rf_best

Fitting 26 folds for each of 72 candidates, totalling 1872 fits


{'class_weight': 'balanced',
 'criterion': 'gini',
 'max_depth': 4,
 'max_features': 'sqrt',
 'n_estimators': 10}

In [None]:
# Training Evaluation
rf_model_ = RandomForestClassifier(random_state=42, n_jobs=-1, max_depth=4, n_estimators=10, max_features='sqrt', criterion='gini', class_weight='balanced')
rf_model_.fit(X, metric) 
ypreds = rf_model_.predict(X)
confusion_matrix = metrics.confusion_matrix(metric, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(metric)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(metric, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

In [None]:
# Testing Evaluation 
rf_model1 = RandomForestClassifier(random_state=42, n_jobs=-1, max_depth=5, n_estimators=25, max_features='sqrt', criterion='gini', class_weight='balanced_subsample')
ytrues, ypreds = loso_predict_rf(rf_model1, X, metric, False, 0) 
confusion_matrix = metrics.confusion_matrix(ytrues, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(ytrues)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(ytrues, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

In [None]:
# Random Forest w/Adasyn
rf_model1 = RandomForestClassifier(random_state=42, n_jobs=-1, max_depth=5, n_estimators=10, max_features='sqrt', criterion='entropy', class_weight=None)
ytrues, ypreds = loso_predict_rf(rf_model1, X, metric, True, 2) 
confusion_matrix = metrics.confusion_matrix(ytrues, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(ytrues)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(ytrues, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

# Random Forest Wrapper

In [23]:
## Random Forest Model Eval Wrapper 

def rf_interp_wrapper(model, x, Xtrain, Ytrain, metric):
  '''
  Inputs:
  - model, data point to be predicted, and training data
  -
  Output: 
  - dictionary with the neigbors used to classify this point, 
  the class, and the most relevant features on the basis of gini impurity
  '''
  # Deep copy 
  model_ = cp.deepcopy(model)

  # Get parameters
  params = model_.get_params()

  # Fit
  model_.fit(Xtrain, Ytrain)
  
  # Prediction 
  yhat = model_.predict(x)
  
  # The features ranked by their relative importance
  # Select the top 20
  features =  Xtrain.columns
  importance = model_.feature_importances_
  rank = [x for _, x in sorted(zip(importance, features), reverse=True)][:20]
  importance.sort()
  importance = importance[::-1][:20]

  # Return as a json 
  results = dict()
  results["model"] = params
  results["metric"] = metric
  results["score"] = str(yhat[0])
  for i in range(len(rank)):
    feat = rank[i]
    info = dict()
    info["importance"] =  importance[i]
    info["5mean"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 5]].mean()
    info["5std"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 5]].std()

    info["4mean"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 4]].mean()
    info["4std"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 4]].std()

    info["3mean"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 3]].mean()
    info["3std"] = Xtrain[feat].iloc[[i for i, x in enumerate(Ytrain) if x == 3]].std()
    
    results[feat] = info
    
  return results 

In [None]:
# Running wrapper evalution on model for subject 1
ans = rf_interp_wrapper(rf_model1, X.iloc[:1], X.iloc[1:], comb[1:], "comb")

with open("rf_explanation.json", "w") as fp:
    json.dump(ans,fp) 

## kNN

In [7]:
np.random.seed(42)

In [None]:
# GridSearch over all the hyperparameters of interest for kNN
def knn_gridsearch(X, y):
  maxScore = [0, 0, 0, 0, 0, 0, 0, 0]
  for s in range(5, 13):
      for r in ['anova', 'mutual', 'kendall']:
        for n in [2, 3, 4, 5]:
          for d in ['euclidean', 'minkowski', 'chebyshev', 'manhattan']:
            for w in ['uniform', 'distance']:
              # Create classifier
              knn_model = KNeighborsClassifier(n_neighbors = n, metric=d, weights=w)
              # Get mean accuracy on the entered values
              for val in [False]:
                ytrues, ypreds = loso_predict_knn(knn_model, X, y, r, s, val, 3) 
                acc, f1  =  accuracy_score(ytrues, ypreds), f1_score(ytrues, ypreds, average=None)
                if acc > maxScore[0]:
                  maxScore[0] = acc
                  maxScore[1] = f1
                  maxScore[2] = n
                  maxScore[3] = d
                  maxScore[4] = r
                  maxScore[5] = s
                  maxScore[6] = w
                  maxScore[7] = val
                

  print(maxScore)

metric2 = yFl
knn_gridsearch(X, metric2)

In [None]:
# Training kNN Evalution
knn_model_ = KNeighborsClassifier(n_neighbors = 2, metric='chebyshev', weights='uniform')
knn_model_.fit(X, metric2)
ypreds = knn_model_.predict(X)
confusion_matrix = metrics.confusion_matrix(metric2, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(metric2)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(metric2, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

In [None]:
# Testing kNN 
metric2 = comb
knn_model = KNeighborsClassifier(n_neighbors = 3, metric='chebyshev', weights='uniform')
ytrues, ypreds = loso_predict_knn(knn_model, X, metric2, 'kendall', 8, False, 3) 
confusion_matrix = metrics.confusion_matrix(ytrues, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(ytrues)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(ytrues, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

In [None]:
# Testing kNN w/Adasyn
knn_model2 = KNeighborsClassifier(n_neighbors = 5, metric='chebyshev', weights='uniform')
ytrues, ypreds = loso_predict_knn(knn_model2, X, metric,'kendall', 6, True, 2) 
confusion_matrix = metrics.confusion_matrix(ytrues, ypreds)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = list(set(ytrues)))
cm_display.plot()
plt.show()

ac, f1, re, pre = get_scores(ytrues, ypreds, None)
print("Average F1: ", np.mean(f1))
print("Average Recall: ", np.mean(re))
print("Average Precision: ", np.mean(pre))

# kNN Wrapper

In [30]:
## kNN Model Eval Wrapper 
def knn_interp_wrapper(model, x, Xtrain, Ytrain, metric, r, s):
  '''
  Inputs:
  - model, data point to be predicted, and training data
  -
  Output: 
  - dictionary with the neighbors used to classify this point, 
  the class, and the most relevant features 
  '''
  # Deep copy 
  model_ = cp.deepcopy(model)

  # Get parameters
  params = model_.get_params()

  # Fit
  if r == 'anova':
    X_train = anova_reduction(Xtrain, Ytrain, s, False)
  elif r == 'mutual':
    X_train = mutual_reduction(Xtrain, Ytrain, s, False)
  elif r == 'kendall':
    X_train = kendall_reduction(Xtrain, Ytrain, s, False)

  scaler = preprocessing.MinMaxScaler()
  XX = scaler.fit_transform(X_train)
  model_.fit(XX, Ytrain)
  
  # Prediction 
  xx = scaler.transform(x[X_train.columns])
  yhat = model_.predict(xx)
  
  # Relative falling of the k closest neighbors 
  dist, neighs = model_.kneighbors(xx)

  # Return as a json 
  results = dict()
  results["model"] = params
  results["metric"] = metric
  results["score"] = str(yhat[0])
  results["results"] = dict()
  for i in range(neighs.size):
    neigh = neighs[:, i][0]
    # loop over all the features to find the closest on that plane 
    distances = []
    features = X_train.columns
    for col in range(len(features)):
      distances.append(abs((xx[:, col] - XX[neigh, col])[0]))
    rank = [x for _, x in sorted(zip(distances, features))]
    distances.sort()
    feats = {
        "dist": dist[:, i][0],
        "feat_rank": rank, 
        "feat_list": distances, 
        "label": Ytrain[neigh]
    }
    results["results"][str(neigh)] = feats

  return results

In [31]:
# Running wrapper evalution on model for subject 1
ans = knn_interp_wrapper(knn_model, X.iloc[:1], X.iloc[1:], comb[1:], "comb", "kendall", 8)

with open("knn_explanation.json", "w") as fp:
    json.dump(ans,fp) 

In [32]:
comb[0]

4