<a href="https://colab.research.google.com/github/s-ravi18/HAR/blob/main/Verifying_Results_for_HAR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
##Basics
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import regex as re
import os
import string
import time
import warnings
import json
from os import path
warnings.filterwarnings("ignore")
from contextlib import redirect_stdout

##ML scikit learn classes for data preprocessing:
from sklearn.preprocessing import StandardScaler,LabelEncoder
from sklearn.model_selection import train_test_split

##ML scikit learn classes for model selection:  
from xgboost import XGBClassifier
!pip install catboost
from catboost import CatBoostClassifier
from lightgbm import LGBMClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

##ML scikit learn classes for evaluating model:
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report,accuracy_score,make_scorer,confusion_matrix,precision_recall_fscore_support,roc_curve
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import learning_curve

## Deep Learning Libraries:
import tensorflow as tf
from tensorflow.keras.models import load_model,Model
from tensorflow.keras.layers import concatenate,Input,Dense,ReLU,BatchNormalization,Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [8]:
## Setting the seed to allow reproducibility
np.random.seed(31415)
tf.random.set_seed(2)

In [9]:
### Label Encoding, Removing Zero variance Features and Scaling the test data::
def initial(df):
    
    #### Label Encoding the Target Variable

    X=df.drop(["label"],axis=1)
    y=df["label"]
    if df.label.dtype==str:   ### Will apply label encoding if needed
        le=LabelEncoder()
        y=le.fit_transform(y)
        y=pd.Series(y)

    #### Removing Features having zero variance.

    Var=X[X.columns].std()
    col=Var[Var==0].index
    X=X.drop(col,axis=1)
    
    return X,y

In [10]:
### For the best model using EMG and NEMG Features (EMG+NEMG Combined)
# df_emg=pd.read_csv("/content/drive/MyDrive/HAR Datasets/EMG_ANOVA_200_features.csv").rename({'0':'label'},axis=1)
df_nemg=pd.read_csv("/content/drive/MyDrive/HAR Datasets/NEMG_ANOVA_200_features.csv").rename({"0":"label"},axis=1)

# temp=pd.DataFrame()
# for i in range(26):
#     n=df_nemg[df_nemg["label"]==i].reset_index(drop=True)
#     e=df_emg[df_emg["label"]==i].reset_index(drop=True).drop(["label"],axis=1)[0:n.shape[0]]
#     k=pd.concat([e,n],axis=1)
#     temp=pd.concat([temp,k],axis=0)   

X,y=initial(df_nemg) 

In [11]:
X.shape,y.shape

((23680, 200), (23680,))

In [12]:
l1=list(df_nemg.columns[:-1])
#l2=list(df_nemg.columns[:-1])
#l1.extend(l2)

In [13]:
len(l1)

200

In [14]:
## Splitting the data:
X_train, X_test, y_train, y_test = train_test_split(X, y,stratify=y,test_size=0.15, random_state=2)
sc=StandardScaler()
X_train=sc.fit_transform(X_train)
X_train=pd.DataFrame(X_train,columns=l1)
X_test=sc.transform(X_test)
X_test=pd.DataFrame(X_test,columns=l1)

In [15]:
class Model:
    def __init__(self,number,l1,l2,l3,l4):
        self.number=number
        self.model = tf.keras.Sequential()
        self.model.add(tf.keras.layers.InputLayer(input_shape=(200)))
        self.model.add(tf.keras.layers.Normalization(axis=-1))
        self.model.add(tf.keras.layers.Dense(units=l1,activation=None))   ###
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.ReLU())
        self.model.add(tf.keras.layers.Dropout(0.2))
        self.model.add(tf.keras.layers.Dense(units=l2,activation=None))  ###
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.ReLU())
        self.model.add(tf.keras.layers.Dropout(0.2))
        self.model.add(tf.keras.layers.Dense(units=l3,activation=None))  ###
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.ReLU())
        self.model.add(tf.keras.layers.Dropout(0.2))
        self.model.add(tf.keras.layers.Dense(units=l4,activation=None))  ###
        self.model.add(tf.keras.layers.BatchNormalization())
        self.model.add(tf.keras.layers.ReLU())
        self.model.add(tf.keras.layers.Dropout(0.2))
        #self.model.add(tf.keras.layers.Dropout(0.3))
        self.model.add(tf.keras.layers.Dense(units=26,activation="softmax"))
        self.model.compile(optimizer="adam",loss=tf.keras.losses.SparseCategoricalCrossentropy(),metrics="accuracy")
    
    def funct_fit(self,path,X_train,X_test,y_train,y_test,count,n_split):
        self.model.fit(X_train,y_train,validation_data=(X_test,y_test),epochs=50,batch_size=50)
        if count==n_split:
            self.model.save(f"{path}/model_{self.number}_deep.h5")
        return self.model.history.history["accuracy"],self.model.history.history["val_accuracy"],self.model.history.history["loss"],self.model.history.history["val_loss"] 

def funct_avg(d):
    temp={}
    for i,j in d.items():
        temp[i]=np.mean(j)
        
    return temp

def lc(path,m,acc,val_acc,loss,val_loss):
  if os.path.exists(f"{path}/LC")==False:
    os.mkdir(f"{path}/LC")
  fig,ax=plt.subplots(1,2,figsize=(15,7))
  ax[0].plot(acc,c= 'b',label="train_accuracy")
  ax[0].plot(val_acc,c= 'r',label="val_accuracy")
  ax[0].set_xlabel("Number of Epochs")
  ax[0].set_ylabel("Accuracy")
  ax[0].legend()

  ax[1].plot(loss,c= 'b',label="train_loss")
  ax[1].plot(val_loss,c= 'r',label="val_loss")
  ax[1].set_xlabel("Number of Epochs")
  ax[1].set_ylabel("Loss")
  ax[1].legend()

  fig.savefig(f'{path}/LC/Learning Curve_{m}.png')

### cross validation:
def funct_cv(path,m,n_split,X,y,l1,l2,l3,l4,final_result):
    dic_results={"accuracy":[],"val_accuracy":[]}
    n_split=n_split
    count=0
    for index,(train_index,test_index) in enumerate(StratifiedKFold(n_split).split(X,y)):
        X_train,X_test=X[train_index],X[test_index]
        y_train,y_test=y[train_index],y[test_index]

        model=Model(m,l1,l2,l3,l4)   ### creating the model object 
        count=count+1
        acc,val_acc,loss,val_loss=model.funct_fit(path,X_train,X_test, y_train,y_test,count,n_split) 
        if index==4:
          lc(path,m,acc,val_acc,loss,val_loss)

        dic_results["accuracy"].append(np.mean(acc))
        dic_results["val_accuracy"].append(np.mean(val_acc))

    dic_results=funct_avg(dic_results)

    final_result[m]=dic_results
    return final_result

# final_result={}
# funct_cv(1,5,X_train.values,y_train.values,140,120,80,70)
# funct_cv(2,5,X_train.values,y_train.values,180,160,100,90)
# funct_cv(3,5,X_train.values,y_train.values,320,250,200,150)

# with open('/content/drive/MyDrive/HAR Results/IMU/accuracy_results_StratifiedCV_deep.json', 'w') as fp:
#     json.dump(final_result, fp,  indent=4) 

In [29]:
#### Loading other models:
def funct_load(path,number):
    allmodels=[]
    for i in range(1,number+1):
        # load model from file
        model = load_model(f'{path}/model_{i}_deep.h5')
        with open(f'{path}/base_model_{i}_summary.txt', 'w') as f:  ### Getting the output of the model configuration.
          with redirect_stdout(f):
            model.summary()
        # add to list of members
        allmodels.append(model)
    return allmodels
            
#funct_load(3)    

# create stacked model input dataset as outputs from the individual ensemble models
def stacked_dataset(allmodels, X_test):
    stackX = None
    for model in allmodels:
        # make prediction
        yhat = model.predict(X_test, verbose=0)
        # stack predictions into [rows, members, class]
        if stackX is None:
            stackX = yhat
        else:
            stackX = np.dstack((stackX, yhat))
    print(stackX)
    # flatten predictions to [rows, members x class]
    stackX = stackX.reshape((stackX.shape[0], stackX.shape[1]*stackX.shape[2]))
    return stackX

#stackedX = stacked_dataset(allmodels, X_test)

# results={}

# evaluate standalone models on test dataset
for model in allmodels:
    y_hat=model.predict(X_test)
    y_hat=np.argmax(y_hat,axis=1)
    acc = accuracy_score(y_test, y_hat)
    print('Model Accuracy: %.6f' % acc)


def funct_report_csv(y,y_hat):
    clf_rep = precision_recall_fscore_support(y, y_hat)
    out_dict = {
                 "precision" :clf_rep[0].round(2)
                ,"recall" : clf_rep[1].round(2)
                ,"f1-score" : clf_rep[2].round(2)
                ,"support" : clf_rep[3]
                }
    out_df = pd.DataFrame(out_dict)
    avg_tot = (out_df.apply(lambda x: round(x.mean(), 2) if x.name!="support" else  round(x.sum(), 2)).to_frame().T)
    avg_tot.index = ["avg/total"]
    out_df = out_df.append(avg_tot)
    return out_df

def classification_report_with_accuracy_score(y, y_hat,model_name,path):
    if os.path.exists(f"{path}/CR")==False:
      os.mkdir(f"{path}/CR")
    if os.path.exists(f"{path}/CM")==False:
      os.mkdir(f"{path}/CM")
    #report=classification_report(y, y_hat,output_dict=True) # print classification report
    report=funct_report_csv(y, y_hat) # print classification report
    report.to_csv(f"{path}/CR/Classification_Report_{model_name}.csv",index=False)

    cm=confusion_matrix(y,y_hat)
    plt.figure(figsize=(20,10))
    plt.rc("font",size=10)
    sns.heatmap(cm,annot=True,fmt=".2f",cmap="viridis")
    plt.savefig(f"{path}/CM/Confusion_Matrix_{model_name}_simple.png")

    cm_1 = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] # For normalising the Matrix for better visualisation.
    plt.figure(figsize=(20,10))
    plt.rc("font",size=10)
    sns.heatmap(cm_1,annot=True,fmt=".2f",cmap="viridis")
    plt.savefig(f"{path}/CM/Confusion_Matrix_{model_name}_axis=1.png")
    
    return accuracy_score(y, y_hat) # return accuracy score

    
def stacked_model_test(path,allmodels,X,y):
    results={}
    # dictionary of all models
    sample={"Naive Bayes":GaussianNB(),"XGB":XGBClassifier(),"LGBM":LGBMClassifier(),"RandomForest":RandomForestClassifier(),"LR":LogisticRegression(),"CatBoost":CatBoostClassifier(),"Naive Bayes":GaussianNB(),"SVC":SVC(),"KNN_3":KNeighborsClassifier(n_neighbors=3),"KNN_5":KNeighborsClassifier(n_neighbors=5)}
    # create dataset using ensemble
    stackedX = stacked_dataset(allmodels, X)
    
    for i,j in sample.items():
        arg={"model_name":i,"path":path}
        sc=make_scorer(classification_report_with_accuracy_score,**arg)
        score_=cross_val_score(j,stackedX,y,cv=5,scoring=sc)
        acc = score_.mean()
       
        results[i]=round(acc,5)
    return results           

# stacked_model_test(allmodels,X_test,y_test)
# #### To store the results in the form of json that is prettified:
# with open('/content/drive/MyDrive/HAR Results/IMU/accuracy_results_final_stacked_random-sampling.json', 'w') as fp:
#     json.dump(results, fp,  indent=4)     

Model Accuracy: 0.117399
Model Accuracy: 0.093750
Model Accuracy: 0.087556


In [17]:
## Function for ROC and Shap FI:
def roc(path,X_train,X_test,y_train,y_test,model_name,model):
  if os.path.exists(f"{path}/ROC")==False:
    os.mkdir(f"{path}/ROC")
  plt.figure(figsize=(10,10))
  plt.title("ROC Curve and AUC", fontsize=18)
  plt.xlabel("False Positive Rate", fontsize=16)
  plt.ylabel("True Positive Rate", fontsize=16)
  visualizer = RadViz(size=(700,700))
  model = wrap(model)
  visualizer = ROCAUC(model)

  visualizer.fit(X_train, y_train)        # Fit the training data to the visualizer
  visualizer.score(X_test, y_test)        # Evaluate the model on the test data
  visualizer.show(outpath=f"{path}/ROC/ROC_{model_name}.png")

def stacked_model_test(path,allmodels,X,y):
    # dictionary of all models
    sample={"Naive Bayes":GaussianNB(),"XGB":XGBClassifier(),"LGBM":LGBMClassifier(),"RandomForest":RandomForestClassifier(),"LR":LogisticRegression(),"CatBoost":CatBoostClassifier(),"Naive Bayes":GaussianNB(),"SVC":SVC(),"KNN_3":KNeighborsClassifier(n_neighbors=3),"KNN_5":KNeighborsClassifier(n_neighbors=5)}
    # create dataset using ensemble
    stackedX = stacked_dataset(allmodels, X)
    
    X_train,X_test,y_train,y_test=train_test_split(stackedX,y,stratify=y)
    for i,j in sample.items():      
      arg={"model_name":i,"model":j}
      try:
        roc(path,X_train,X_test,y_train,y_test,**arg)  
      except Exception as e:
        print(e)
        continue

#stacked_model_test(allmodels,X_test,y_test)

In [18]:
## For Shap Visualisation and Top Features

def shap_fe(path,df,X,y,model_name,model):
  # compute SHAP values
  model.fit(X,y)

  if os.path.exists(f"{path}/Shap")==False:
    os.mkdir(f"{path}/Shap")

  explainer = shap.TreeExplainer(model)
  shap_values = explainer.shap_values(X)

  shap.summary_plot(shap_values, X, class_names= [i for i in range(26)], feature_names = df.columns,show=False)
  plt.savefig(f"{path}/Shap/Shap_Feature_Plot_{model_name}.png")

  vals= np.abs(shap_values[1]).mean(0)
  df_feature_importance=pd.DataFrame(np.concatenate([np.array(df.columns).reshape(-1,1),vals.reshape(-1,1)],axis=1),columns=["Feature","Shap_Scores"])
  df_feature_importance = df_feature_importance.sort_values('Shap_Scores',ascending=False)
  df_feature_importance.reset_index(drop=True,inplace=True)
  df_feature_importance.to_csv(f"{path}/Shap/Feature_scores_{model_name}.csv")

# sample={"XGB":XGBClassifier()} #,"LGBM":LGBMClassifier(),"RandomForest":RandomForestClassifier(),"LR":LogisticRegression(),"CatBoost":CatBoostClassifier(),"Naive Bayes":GaussianNB(),"SVC":SVC(),"KNN_3":KNeighborsClassifier(n_neighbors=3),"KNN_5":KNeighborsClassifier(n_neighbors=5)}  
# for i,j in sample.items():
#   try:
#     shap_fe(df_nemg.iloc[:,:-1],X_train,y_train,i,j)
#   except Exception as e:
#     print(e)
#     continue

In [30]:
def master_fn(data,path):
  if os.path.exists(f"{path}")==False:
    os.mkdir(f"{path}")
  ## Reading the data::
  if data=="EMG":
    df=pd.read_csv("/content/drive/MyDrive/HAR Datasets/EMG_ANOVA_200_features.csv").rename({'0':'label'},axis=1)
  elif data=="IMU":
    df=pd.read_csv("/content/drive/MyDrive/HAR Datasets/Combined Results/NEMG_ANOVA_200_features.csv").rename({"0":"label"},axis=1)
  elif data=="Combined":
    df_emg=pd.read_csv("/content/drive/MyDrive/HAR Datasets/EMG_ANOVA_200_features.csv").rename({'0':'label'},axis=1)
    df_nemg=pd.read_csv("/content/drive/MyDrive/HAR Datasets/Combined Results/NEMG_ANOVA_200_features.csv").rename({"0":"label"},axis=1)
    df=pd.DataFrame()
    for i in range(26):
        n=df_nemg[df_nemg["label"]==i].reset_index(drop=True)
        e=df_emg[df_emg["label"]==i].reset_index(drop=True).drop(["label"],axis=1)[0:n.shape[0]]
        k=pd.concat([e,n],axis=1)
        df=pd.concat([df,k],axis=0)   

  ## Label Encoding->Removing Zero Variance Features->Scaling the test data::
  X,y=initial(df)

  ## Splitting the data:
  X_train, X_test, y_train, y_test = train_test_split(X, y,stratify=y,test_size=0.15, random_state=2)
  sc=StandardScaler()
  X_train=sc.fit_transform(X_train)
  X_train=pd.DataFrame(X_train,columns=list(df.columns[:-1]))
  X_test=sc.transform(X_test)
  X_test=pd.DataFrame(X_test,columns=list(df.columns[:-1]))

  ## Creating and training the models:
  final_result={}
  final_result=funct_cv(path,1,5,X_train.values,y_train.values,140,120,80,70,final_result)
  final_result=funct_cv(path,2,5,X_train.values,y_train.values,180,160,100,90,final_result)
  final_result=funct_cv(path,3,5,X_train.values,y_train.values,320,250,200,150,final_result)

  with open(f'{path}/accuracy_results_StratifiedCV_deep.json', 'w') as fp:
      json.dump(final_result, fp,  indent=4) 

  ## Loading the models:
  allmodels=funct_load(path,3)

  ## 
  results=stacked_model_test(path,allmodels,X_test,y_test)
  ## To store the results in the form of json that is prettified:
  with open(f'{path}/accuracy_results_final_stacked_random-sampling.json', 'w') as fp:
      json.dump(results, fp,  indent=4)

  ## ROC and Shap:
  stacked_model_test(path,allmodels,X_test,y_test)    

  sample={"XGB":XGBClassifier()} #,"LGBM":LGBMClassifier(),"RandomForest":RandomForestClassifier(),"LR":LogisticRegression(),"CatBoost":CatBoostClassifier(),"Naive Bayes":GaussianNB(),"SVC":SVC(),"KNN_3":KNeighborsClassifier(n_neighbors=3),"KNN_5":KNeighborsClassifier(n_neighbors=5)}  
  for i,j in sample.items():
    try:
      shap_fe(path,df_nemg.iloc[:,:-1],X_train,y_train,i,j)
    except Exception as e:
      print(e)
      continue

In [None]:
master_fn("EMG","/content/drive/MyDrive/sample_emg")

[[[6.38044151e-09 1.60730151e-10 2.52900253e-11]
  [5.08783899e-07 7.18212095e-06 1.83324516e-07]
  [4.57483196e-07 3.77108393e-07 1.16107287e-07]
  ...
  [1.81605742e-06 1.29816456e-07 7.60002123e-08]
  [1.13593260e-05 7.93476090e-07 9.03390264e-06]
  [2.50931237e-07 3.61808858e-08 2.53839838e-10]]

 [[1.35215515e-12 8.03434576e-12 2.80434313e-13]
  [2.47548093e-11 3.07072319e-11 8.11577971e-10]
  [1.88357945e-12 1.02127820e-10 3.72104847e-11]
  ...
  [6.18643027e-12 1.85771589e-11 1.27061209e-11]
  [9.68054366e-08 7.90411505e-08 3.06754044e-09]
  [5.43410206e-13 2.61286288e-11 2.46139979e-12]]

 [[3.84785342e-10 9.59314295e-11 3.73715989e-12]
  [9.99997735e-01 9.99999166e-01 9.99976516e-01]
  [1.40165230e-06 5.58949750e-07 1.52421126e-05]
  ...
  [2.68460559e-10 8.72680834e-12 8.90380894e-11]
  [3.60979406e-08 3.95739032e-11 1.05032434e-10]
  [2.31408279e-08 8.89096885e-09 3.52632732e-08]]

 ...

 [[6.83713433e-11 1.96332395e-11 2.20559289e-13]
  [9.99999642e-01 1.00000000e+00 9.9999