In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import warnings
warnings.filterwarnings('ignore')
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0"
import pandas as pd
import numpy as np
from gtda.time_series import SlidingWindow
import matplotlib.pyplot as plt
from tensorflow.python.keras.backend import set_session
import tensorflow as tf
config = tf.compat.v1.ConfigProto() 
config.gpu_options.allow_growth = True  
config.log_device_placement = True  
sess2 = tf.compat.v1.Session(config=config)
set_session(sess2)  
import csv
import random
import itertools
import glob
import time
import pickle
from data_utils import *

from tensorflow import keras
from tqdm import tqdm
from tensorflow.keras.losses import MSE
from edgeml_tf.tflite.bonsaiLayer import BonsaiLayer #modified bonsailayer to return logits

## Import Dataset

In [None]:
f = '/home/nesl/209as_sec/human_act/Data/Activity_Dataset/'
model_dir = 'trained_models/'
window_size = 550
stride = 50
channels = 2

X_tr, Y_tr, X_test, Y_test = import_auritus_activity_dataset(dataset_folder = f, 
                                use_timestamp=False, 
                                shuffle=True, 
                                window_size = window_size, stride = stride, 
                                return_test_set = True, test_set_size = 300,channels=2)
print(X_tr.shape)
print(Y_tr.shape)
print(X_test.shape)
print(Y_test.shape)

# Feature Extraction

In [None]:
feat_size = 10
featX_tr = np.zeros((X_tr.shape[0],feat_size))
featX_test = np.zeros((X_test.shape[0],feat_size))
for i in range(X_tr.shape[0]):
    cur_win = X_tr[i]
    featX_tr[i,0] = np.min(cur_win[:,0])
    featX_tr[i,1] = np.min(cur_win[:,1])
    featX_tr[i,2] = np.max(cur_win[:,0])
    featX_tr[i,3] = np.max(cur_win[:,1])
    featX_tr[i,4] = featX_tr[i,2]-featX_tr[i,0]
    featX_tr[i,5] = featX_tr[i,3]-featX_tr[i,1]
    featX_tr[i,6] = np.var(cur_win[:,0])
    featX_tr[i,7] = np.var(cur_win[:,1])
    featX_tr[i,8] = np.sqrt(featX_tr[i,6])
    featX_tr[i,9] = np.sqrt(featX_tr[i,7])  
    
for i in range(X_test.shape[0]):
    cur_win = X_test[i]
    featX_test[i,0] = np.min(cur_win[:,0])
    featX_test[i,1] = np.min(cur_win[:,1])
    featX_test[i,2] = np.max(cur_win[:,0])
    featX_test[i,3] = np.max(cur_win[:,1])
    featX_test[i,4] = featX_test[i,2]-featX_test[i,0]
    featX_test[i,5] = featX_test[i,3]-featX_test[i,1]
    featX_test[i,6] = np.var(cur_win[:,0])
    featX_test[i,7] = np.var(cur_win[:,1])
    featX_test[i,8] = np.sqrt(featX_test[i,6])
    featX_test[i,9] = np.sqrt(featX_test[i,7])

dataDimension = featX_tr.shape[1]
numClasses = Y_tr.shape[1]
Xtrain = featX_tr
Ytrain = Y_tr
Xtest = featX_test
Ytest = Y_test

## Load Model

In [None]:
Z = np.load( glob.glob(model_dir + "/**/Z.npy", recursive = True)[0], allow_pickle=True )
W = np.load( glob.glob(model_dir + "/**/W.npy", recursive = True)[0], allow_pickle=True )
V = np.load( glob.glob(model_dir + "/**/V.npy", recursive = True)[0], allow_pickle=True )
T = np.load( glob.glob(model_dir + "/**/T.npy", recursive = True)[0], allow_pickle=True )
hyperparams = np.load(glob.glob(model_dir + "/**/hyperParam.npy", 
                            recursive = True)[0], allow_pickle=True ).item()

n_dim = hyperparams['dataDim']
projectionDimension = hyperparams['projDim']
numClasses = hyperparams['numClasses']
depth = hyperparams['depth']
sigma = hyperparams['sigma']

dense = BonsaiLayer( numClasses, n_dim, projectionDimension, depth, sigma )
model = keras.Sequential([
keras.layers.InputLayer(n_dim),
dense
])

dummy_tensor = tf.convert_to_tensor( np.zeros((1,n_dim), np.float32) )
out_tensor = model(dummy_tensor)
model.summary()
dense.set_weights( [Z, W, V, T] )
model.compile()

# Attack

In [None]:
def pgd_attack(model,iterations, image, label, alpha, eps):
    gen_img = tf.identity(image)
    gen_img = tf.cast(gen_img,dtype=tf.dtypes.float32)
    gen_img = gen_img + tf.random.uniform(gen_img.get_shape().as_list(), minval=-eps, 
                                          maxval=eps, dtype=tf.dtypes.float32)
    x_temp = image
    for iter in range(iterations):
        imgv = tf.Variable(gen_img)
        with tf.GradientTape() as tape:
            tape.watch(imgv)
            predictions = model(imgv)
            loss = tf.keras.losses.CategoricalCrossentropy()(label, predictions)
            grads = tape.gradient(loss,imgv)
        signed_grads = tf.sign(grads)
        gen_img = gen_img + (alpha*signed_grads)
        gen_img = tf.clip_by_value(gen_img, image-eps, image+eps)
    return gen_img

In [None]:
eps = [0.1,0.3,0.5,0.7,0.9,1.0,2.0]
iterations = 5
alpha = [0.1,0.3,0.5,0.7,0.9,1.0]
take_size = 4890
accu_num = []
eps_list = []
alpha_list = []

for al in alpha:
    for item in eps:
        countadv = 0
        for i in tqdm(range(len(Xtest))):
            audio = Xtest[i,:].reshape(1,len(Xtest[i,:]))
            label = Y_test[i,:].reshape(9,1)
            audioPred = model.predict(audio)
            audioPred = audioPred.argmax()
            adversary = pgd_attack(model,iterations,audio, label, alpha=al, eps=item)
            pred = model.predict(adversary)
            adversaryPred = pred.argmax()
            if audioPred == adversaryPred:
                countadv += 1

        print("Adversarial accuracy : ", countadv / len(Xtest))
        accu_num.append(countadv / len(Xtest))
        eps_list.append(item)
        alpha_list.append(al)

In [None]:
accu_num

In [None]:
eps_list

In [None]:
alpha_list