In [1]:
# # when in google drive
# from google.colab import drive
# drive.mount('/content/gdrive')
# !pip install scikeras --quiet

In [2]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import pyplot

import warnings
warnings.filterwarnings('ignore')

from keras.utils.np_utils import to_categorical 
from tensorflow.keras.callbacks import EarlyStopping
from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization,\
Flatten, LSTM
# from scikeras.wrappers import KerasClassifier
from keras.wrappers.scikit_learn import KerasClassifier
from keras.models import load_model

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
# from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.preprocessing import OneHotEncoder

NUM_FEATURES = 31
TIMESTEPS = 1
DATASET_LINK = "https://raw.githubusercontent.com/ehandywhyy/ict3204-security-analytics/main/dataset/overall.csv"
TEST_DATA_LINK = "https://raw.githubusercontent.com/ehandywhyy/ict3204-security-analytics/main/dataset/overall_test.csv"

##### Initialise Seed

In [3]:
# random seed for reproducibility
seed = 10
np.random.seed(seed)

# loading of dataset
df = pd.read_csv(DATASET_LINK)

# # Remove missing values IF AVAILABLE and print head
df = df.dropna()
df.head()

Unnamed: 0,subject,sessionIndex,rep,H.period,DD.period.t,UD.period.t,H.t,DD.t.i,UD.t.i,H.i,...,H.a,DD.a.n,UD.a.n,H.n,DD.n.l,UD.n.l,H.l,DD.l.Return,UD.l.Return,H.Return
0,Andy,1,1,0.142176,0.15688,0.014704,0.127937,0.142299,0.014361,0.155919,...,0.128263,0.085967,-0.042296,0.120442,0.209372,0.08893,0.116911,0.165437,0.048526,0.160258
1,Andy,1,2,0.141793,0.164635,0.022842,0.121625,0.152146,0.03052,0.137039,...,0.14449,0.082397,-0.062093,0.101188,0.126066,0.024878,0.098424,0.130661,0.032238,0.173819
2,Andy,1,3,0.157152,0.14722,-0.009932,0.134796,0.14579,0.010994,0.124892,...,0.135452,0.082423,-0.053029,0.108598,0.155843,0.047244,0.118759,0.14655,0.027791,0.173512
3,Andy,1,4,0.158673,0.142487,-0.016186,0.118908,0.159179,0.040271,0.133175,...,0.114266,0.072556,-0.041711,0.101937,0.145692,0.043755,0.0963,0.12258,0.02628,0.190808
4,Andy,1,5,0.166583,0.157102,-0.00948,0.115094,0.141103,0.026009,0.135121,...,0.125474,0.089247,-0.036227,0.114211,0.149136,0.034925,0.121197,0.230113,0.108917,0.143299


##### Divide dataset into X and Y
##### Normalise features within range 0 (minimum) and 1 (maximum)


In [4]:
dataset = df.values

# divide data into features X and target (Classes) Y
X = dataset[:,3:].astype(float)
Y = dataset[:,0]

# # check for class imbalance
print(df.groupby(Y).size())

Andy       100
Azfar      100
Chris      100
Qikai      100
Safaraz    100
dtype: int64


In [5]:
# convert target Y to one hot encoded Y for model
Y = Y.reshape(-1, 1)
encoder = OneHotEncoder().fit(Y)

# get all the encoded class
print(encoder.get_feature_names_out())

# print X and Y shape
print("X dataset shape: " + str(X.shape))
print("Y dataset shape: " + str(Y.shape))

['x0_Andy' 'x0_Azfar' 'x0_Chris' 'x0_Qikai' 'x0_Safaraz']
X dataset shape: (500, 31)
Y dataset shape: (500, 1)


##### Preparing dataset

In [6]:
# split dataset into train and test of 0.8/0.2 ratio
X_train, X_test, y_train, y_test = train_test_split(
    X, Y, test_size=0.2, random_state=seed)

# normalisation to 0 to 1
scaler = MinMaxScaler(feature_range=(0, 1))
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# reshaping the dataset to include LSTM
X_train = np.asarray(X_train, dtype=np.float32)
X_train = np.reshape(X_train, (X_train.shape[0], TIMESTEPS, X_train.shape[1]))
X_test = np.asarray(X_test, dtype=np.float32)
X_test = np.reshape(X_test, (X_test.shape[0], TIMESTEPS, X_test.shape[1]))

# converting y data to encoding
y_train = encoder.transform(y_train).toarray()
y_test = encoder.transform(y_test).toarray()

num_classes = y_train.shape[1]

print("X train shape: " + str(X_train.shape))
print("Y train shape: " + str(y_train.shape))
print("X test shape: " + str(X_test.shape))
print("Y test shape: " + str(y_test.shape))

X train shape: (400, 1, 31)
Y train shape: (400, 5)
X test shape: (100, 1, 31)
Y test shape: (100, 5)


### Create Model

In [7]:
def create_model():
    # define model
    model = Sequential()
    model.add(LSTM(units=128, return_sequences=True, 
                 input_shape=(TIMESTEPS,NUM_FEATURES)))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    model.add(LSTM(units=128, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    model.add(LSTM(units=64, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    # Softmax for multi-class classification
    model.add(Flatten())
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(loss='categorical_crossentropy', optimizer='adam',
                metrics=['accuracy'])
    return model

##### Wrap Model in KerasClassifier

In [8]:
model = KerasClassifier(build_fn=create_model, epochs=100, 
                            batch_size=10)

### Perform KFold Validation

##### evaluate suitable kfold validation

In [9]:
# # kfold = KFold(n_splits=num_folds, 
# #               shuffle=True,
# #               random_state=seed)

from sklearn.model_selection import LeaveOneOut
def evaluate_kfold(model, X_train, y_train, seed):
    
    # evaluate the model
    scores = cross_val_score(model, X_train, y_train, scoring='accuracy', cv=LeaveOneOut(), n_jobs=-1)
    ideal, _, _ = mean(scores), scores.min(), scores.max()
    print('Ideal: %.3f' % ideal)
    
    folds = range(2,31)
    means, mins, maxs = list(), list(), list()
    
    # evaluate each k value
    for k in folds:
        # define the test condition
        kfold = KFold(n_splits=k, shuffle=True, random_state=seed)
        # evaluate k value
        scores = cross_val_score(model, X_train, y_train, scoring='accuracy', cv=LeaveOneOut(), n_jobs=-1)
        print(scores)
        k_mean = np.mean(scores)
        k_min = scores.min()
        k_max = scores.max()
        
        # report performance
        print('> folds=%d, accuracy=%.3f%% (min=%.3f%%, max=%.3f%%)' %
              (k, k_mean*100, k_min*100, k_max*100))
        
        # store mean accuracy
        means.append(k_mean)
        
        # store min and max relative to the mean
        mins.append(k_mean - k_min)
        maxs.append(k_max - k_mean)
        
    # line plot of k mean values with min/max error bars
    pyplot.errorbar(folds, means, yerr=[mins, maxs], fmt='o')
    # plot the ideal case in a separate color
    pyplot.plot(folds, [ideal for _ in range(len(folds))], color='r')
    # show the plot
    pyplot.show()
    
evaluate_kfold(model, X_train, y_train, seed)

KeyboardInterrupt: 

##### Get Accuracy from KFold Validation

In [None]:
results = cross_val_score(model, X_train, y_train, 
                          cv=kfold, error_score="raise", verbose=1)

##### get validation accuracy

In [None]:
print("Validation Accuracy of %.2f%% (with standard deviation of %.2f%%)" % 
      (results.mean()*100, results.std()*100))

##### Fit the Model

In [None]:
# fit the model
es = EarlyStopping(monitor='loss', mode='min', min_delta=0.001, patience=50,
                   verbose=0)
history = model.fit(X_train, y_train, callbacks=es)

In [None]:
plt.plot(history.history['accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

# view model summary
# model.model.summary()

# save model
# model.model.save(F"/content/gdrive/My Drive/Colab Notebooks/overall_key_classifier.h5")
model.model.save("model/key_classifier.h5")

In [None]:
# load model for Google Drive
# model.model = load_model("/content/gdrive/My Drive/Colab Notebooks/overall_key_classifier.h5")
model.model = load_model("model/key_classifier.h5")

y_pred = model.predict(X_test)
y_pred = to_categorical(y_pred)

# evaluate predictions
acc = accuracy_score(y_test, y_pred)
print("Testing accuracy: %.3f%%" % (acc*100))

# Making the Confusion Matrix
# import sys
# np.set_printoptions(threshold=sys.maxsize)
# cm = confusion_matrix(y_test, y_pred)
# print(cm)

# from sklearn.metrics import roc_curve
# fpr_RF, tpr_RF, thresholds_RF = roc_curve(df.actual_label.values, df.model_RF.values)
# fpr_LR, tpr_LR, thresholds_LR = roc_curve(df.actual_label.values, df.model_LR.values)

In [None]:
# import unseen data to check if model works
pred_df = pd.read_csv(TEST_DATA_LINK)
pred_df.head()

In [None]:
pred_dataset = pred_df.values

# divide data into features X
# X_new = new_dataset[:,3:].astype(float)

pred_row=pred_df.iloc[:,3:]
# print("check name")
# print(pred_df.iloc[0:7,0:1])

# convert to
pred_row = pred_row.values.tolist()
pred_row = scaler.transform(pred_row)
pred_arr = np.asarray(pred_row, dtype=np.float32)
pred_arr = np.reshape(pred_arr, (pred_row.shape[0], TIMESTEPS, pred_arr.shape[1]))

In [None]:
# get prediction and its label
pred = model.predict(pred_arr)
pred = to_categorical(pred)
pred = encoder.inverse_transform(pred)

pred = np.squeeze(pred)

pred_proba = model.predict_proba(pred_arr)
acc = np.max(pred_proba, axis=1)

print(pred)
print(acc)