In [None]:
import tensorflow as tf

import cirq
import sympy
import numpy as np
import seaborn as sns
import collections
import random as random

import pandas as pd
from sklearn.preprocessing import MinMaxScaler,StandardScaler

from tensorflow.keras.models import Model
from sklearn.preprocessing import minmax_scale

# visualization tools
%matplotlib inline
import matplotlib.pyplot as plt

# Data Pre-processing

In [None]:
df = pd.read_excel('Data.xlsx')

df_noAtk_all = df[df['Attack']==0]
df_Atk = df[df['Attack']==1]

df_noAtk_all = df_noAtk_all.reset_index(drop=True)
df_Atk = df_Atk.reset_index(drop=True)

col_names = list(df_noAtk_all.columns)

row_noAtk_all = df_noAtk_all.count()[0]
row_Atk = df_Atk.count()[0]

row_smaller = min(row_noAtk_all,row_Atk)
num_CAN_img = int(row_smaller/(len(col_names)-2))

df_noAtk = df_noAtk_all[:][-row_Atk:].reset_index(drop=True)

init_image_size = len(col_names) - 2
train_noAtk_size = int(row_Atk*0.8)

In [None]:
train_noAtk = df_noAtk[:][0:train_noAtk_size]
test_noAtk = df_noAtk[:][train_noAtk_size:]

train_Atk = df_Atk[:][0:train_noAtk_size]
test_Atk = df_noAtk[:][train_noAtk_size:]

train_noAtk = train_noAtk.reset_index(drop=True)
test_noAtk = test_noAtk.reset_index(drop=True)
train_Atk = train_Atk.reset_index(drop=True)
test_Atk = test_Atk.reset_index(drop=True)

train_df = pd.concat([train_noAtk,train_Atk], ignore_index=True).reset_index(drop=True)
test_df = pd.concat([test_noAtk,test_Atk], ignore_index=True).reset_index(drop=True)

In [None]:
col_names = list(df_noAtk_all.columns)
col_names.remove('Attack')
col_names.remove('Timestamp')

In [None]:
df_x = train_df.drop(columns = ['Attack','Timestamp'])
df_y = train_df.drop(columns = col_names)

df_x_test = test_df.drop(columns = ['Attack','Timestamp'])
df_y_test = train_df.drop(columns = col_names)

min_max_scaler = MinMaxScaler()
scaler = StandardScaler()
df_x = pd.DataFrame(scaler.fit_transform(df_x), columns=df_x.columns)
df_x = pd.DataFrame(min_max_scaler.fit_transform(df_x), columns=df_x.columns)

df_x_test = pd.DataFrame(scaler.fit_transform(df_x_test), columns=df_x.columns)
df_x_test = pd.DataFrame(min_max_scaler.fit_transform(df_x_test), columns=df_x.columns)

Build CAN image dataset

In [None]:
x = []
y = []

np_x = df_x.to_numpy()
np_y = df_y.to_numpy()

for i in range(len(np_x)-init_image_size):
    if(i%init_image_size==0):
        img = np_x[i:i+init_image_size,:].reshape(init_image_size,init_image_size,1)
        label = 1 if 1 in np_y[i:i+13,:] else 0
        x.append(img)
        y.append(label)

x_train = np.array(x)
y_train = np.array(y)

In [None]:
x = np.array(x_train)
y = np.array(y_train)

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42)

# Train a convolution neural network (CNN)

In [None]:
def create_full_classical_model(init_image_size):
    # A simple model based off LeNet 
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(init_image_size,init_image_size,1)))
    model.add(tf.keras.layers.Dense(100, activation='relu'))
    model.add(tf.keras.layers.Dense(64))
    model.add(tf.keras.layers.Dense(16))
    model.add(tf.keras.layers.Dense(1))
    return model


model = create_full_classical_model(init_image_size)
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])

model.summary()

EPOCHS = 100
BATCH_SIZE = 8

fair_history = model.fit(x_train,
          y_train,
          batch_size=BATCH_SIZE,
          epochs=EPOCHS,
          verbose=1,
          validation_data=(x_test, y_test))

fair_nn_results = model.evaluate(x_test, y_test)

In [None]:
plt.plot(fair_history.history['accuracy'])
plt.plot(fair_history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='lower right')
plt.show()
# summarize history for loss
plt.plot(fair_history.history['loss'])
plt.plot(fair_history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right')
plt.show()

In [None]:
def extract_feat(model, x_in):
    layer_name = model.layers[-3].name
    intermediate_layer_model = Model(inputs=model.input,
                                 outputs=model.get_layer(layer_name).output)
    intermediate_output = intermediate_layer_model.predict(x_test[0][None, :, :, :])
    foo_norm = minmax_scale(intermediate_output, feature_range=(0,1), axis=1)
    return foo_norm

In [None]:
x_trans = []
y_trans = []
total = len(x_train)
for i in range(len(x_train)):
    x_trans.append(extract_feat(model,x_train[i]))
    y_trans.append(y_train[i])

print ('Done')

In [None]:
x_trans_test = []
y_trans_test = []

for i in range(len(x_test)):
    x_trans_test.append(extract_feat(model,x_test[i]))
    y_trans_test.append(y_test[i])

print ('Done')

In [None]:
x_trans = np.array(x_trans)
x_trans_test = np.array(x_trans_test)

In [None]:
x_trans = x_trans.reshape(len(x_trans),8,8)
x_trans_test = x_trans_test.reshape(len(x_trans_test),8,8)

# Perform Encoding

In [None]:
THRESHOLD = 0.5

x_train_bin = np.array(x_trans > THRESHOLD, dtype=np.float32)
x_test_bin = np.array(x_trans_test > THRESHOLD, dtype=np.float32)

print (x_train_bin.shape)
print (x_test_bin.shape)

In [None]:
import random as random

idx = random.randint(0,len(x_train_bin))
print('index:', idx , ' label : ', y_train[idx])

plt.imshow(x_train_bin[idx,:,:])
plt.colorbar()

# Save Train Data

In [None]:
count = 0
x_train_bin_squeezed = np.squeeze(x_train_bin)

for y in y_train:    
    y_train_bin = np.expand_dims(y*np.ones([8,2]), axis=0)
    x_train_y_train = np.concatenate((x_train_bin_squeezed[count].T, np.squeeze(y_train_bin).T), axis=0).T
    
    if count == 0:
        Train_all = x_train_y_train
    else:
        Train_all = np.concatenate((Train_all, x_train_y_train), axis=0)
    
    count += 1

In [None]:
comb_final = pd.DataFrame(Train_all)

In [None]:
with pd.ExcelWriter('Train.xlsx') as writer:
    comb_final.to_excel(writer) 

# Save Test Data

In [None]:
count = 0
x_test_bin_squeezed = np.squeeze(x_test_bin)

for y in y_test:
    if y == 0:
        y = 1
    else:
        y=0
        
    y_test_bin = np.expand_dims(y*np.ones([8,2]), axis=0)
    x_test_y_test = np.concatenate((x_test_bin_squeezed[count].T, np.squeeze(y_test_bin).T), axis=0).T
    
    if count == 0:
        Test_all = x_test_y_test
    else:
        Test_all = np.concatenate((Test_all, x_test_y_test), axis=0)
    
    count += 1

In [None]:
test_final = pd.DataFrame(Test_all)

In [None]:
with pd.ExcelWriter('Test.xlsx') as writer:
    test_final.to_excel(writer) 