In [1]:
# pre-requisite  
# a. double-check whether all 40000 (5000*8 signatures are succesfully generated) -> checked ... DONE 


# According to the paper of NIC: Detecting Adversarial Samples with Neural Network Invariant Checking
# 1.a Train VIs via OSVM (RBF) based on 50000 signatures ... DONE
# 1.b Verify v1 to v4 on 35000 (5000*7) signatures ... DONE
# 2.a Train reduced models -> obtain signatures from reduced models ... DONE 
# 2.b Train PIs via OSVM (RBF) based on 50000 signatures ... DONE
# 3.  Assemble VIs and PIs together ... DONE 
# 4.  Evaluate 5000 benign samples + 35000 adversarial samples (5000 per adversarial attack) ... working 

In [None]:
# 1.a Train VIs via OSVM (RBF) based on 50000 signatures
from MNIST_models import *
import pickle 
import warnings
warnings.filterwarnings('ignore')

# Get dataset (V1X to V4X) for training VIs 
num_of_X = 50000
prefixs = ['store_zero/', 'store_one/', 'store_two/', 'store_three/', 'store_four/', 
        'store_five/', 'store_six/', 'store_seven/', 'store_eight/', 'store_nine/']

V1X, V2X, V3X, V4X = [], [], [], []
VY = []
for i in range(num_of_X):
    if (i+1) % 1000 == 0: print(i+1)
    for prefix_i, prefix in enumerate(prefixs):
        
        fn_name = 'store_subs_fadv/'+prefix+'normal'+'_'+str(i+1)+'.txt'
        try: fp = open(fn_name, 'rb')
        except: continue
                
        signatures = pickle.load(fp)
        f1, f2, f3, f4 = preprocess(signatures)
        V1X.append(f1)
        V2X.append(f2)
        V3X.append(f3)
        V4X.append(f4)
        VY.append(prefix_i)
        
        fp.close()

# preprocessing 
V1X, V2X, V3X, V4X = torch.stack(V1X, 0), torch.stack(V2X, 0), torch.stack(V3X, 0), torch.stack(V4X, 0) # list of tensor to tensor
V1X, V2X, V3X, V4X = V1X.view(-1, 16*24*24), V2X.view(-1, 16*10*10), V3X.view(-1, 32*3*3), V4X.view(-1, 64) # flatten
VY = torch.tensor(VY)

from RBF import *
args = {
    'epoch': 50, 
    'batch_size': 200, 
    'lr': 0.05,
    'num_class': 10, 
    'num_centers': 50, # originally 60000 samples to 100 centers -> 10 outputs 
    'num_of_elements': None,
    'save_dir': 'ckpoints',
    'model_name': None
}

# train v1 to v4 & store them for further usage 
set_VX = [V4X, V3X, V2X, V1X]
model_names = ['v4', 'v3', 'v2', 'v1']
for i, (VX, model_name) in enumerate(zip(set_VX, model_names)):
    if i in [0, 1]: continue
    
    print(model_name)
        
    cut_ratio = 0.95
    cut_num = int(len(VX)*cut_ratio)
    train_VX, test_VX = VX[:cut_num], VX[cut_num:]
    train_VY, test_VY = VY[:cut_num], VY[cut_num:]
    
    args['num_of_elements'] = VX.shape[1]
    args['model_name'] = model_name

    rbfn = RBFN(args, train_VX, train_VY, test_VX, test_VY)
    rbfn.train()
    rbfn.save()
    rbfn.load()
    rbfn.test()

1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000
46000
47000
48000
49000
50000
v2
[Epoch:    1] cost = 2.22029734
[Epoch:    2] cost = 2.07310748
[Epoch:    3] cost = 1.98631835
[Epoch:    4] cost = 1.91598678
[Epoch:    5] cost = 1.89415467
[Epoch:    6] cost = 1.86075616
[Epoch:    7] cost = 1.81767941
[Epoch:    8] cost = 1.7820245


In [None]:
# # 1.b Verify v1 to v4 on 35000 adversarial signatures 
# num_of_X = 500
# adv_types = ['FGSM', 'JSMA', 'CWL2', 'LINFPGD', 'LINFBI', 'ENL1', 'ST']
# for adv_type in adv_types:
#     print()
#     print(adv_type)
#     V1X, V2X, V3X, V4X = [], [], [], []
#     V1Y, V2Y, V3Y, V4Y = [], [], [], []
#     for i in range(num_of_X):
#         for prefix_i, prefix in enumerate(prefixs):

#             fn_name = 'store_subs_fadv/'+prefix+adv_type+'_'+str(i+1)+'.txt'
#             try: fp = open(fn_name, 'rb')
#             except: continue

#             signatures = pickle.load(fp)
#             f1, f2, f3, f4 = preprocess(signatures)
#             V1X.append(f1)
#             V2X.append(f2)
#             V3X.append(f3)
#             V4X.append(f4)

#             V1Y.append(prefix_i)
#             V2Y.append(prefix_i)
#             V3Y.append(prefix_i)
#             V4Y.append(prefix_i)

#             fp.close()
            
#     # preprocessing 
#     V1X, V2X, V3X, V4X = torch.stack(V1X, 0), torch.stack(V2X, 0), torch.stack(V3X, 0), torch.stack(V4X, 0) # list of tensor to tensor
#     V1X, V2X, V3X, V4X = V1X.view(-1, 16*24*24), V2X.view(-1, 16*10*10), V3X.view(-1, 32*3*3), V4X.view(-1, 64) # flatten
#     V1Y, V2Y, V3Y, V4Y = torch.tensor(V1Y), torch.tensor(V2Y), torch.tensor(V3Y), torch.tensor(V4Y) # list to tensor

#     from RBF import *
#     args = {
#         'epoch': 30, 
#         'batch_size': 200, 
#         'lr': 0.01,
#         'num_class': 10, 
#         'num_centers': 50, # originally 60000 samples to 100 centers -> 10 outputs 
#         'num_of_elements': None,
#         'save_dir': 'ckpoints',
#         'model_name': None
#     }

#     # test v1 to v4 
#     set_VX = [V4X, V3X, V2X, V1X]
#     set_VY = [V4Y, V3Y, V2Y, V1Y]
#     model_names = ['v4', 'v3', 'v2', 'v1']
#     for (VX, VY, model_name) in zip(set_VX, set_VY, model_names):
#         cut_ratio = 0.8
#         cut_num = int(len(VX)*cut_ratio)
#         train_VX, test_VX = VX[:cut_num], VX[cut_num:]
#         train_VY, test_VY = VY[:cut_num], VY[cut_num:]

#         args['num_of_elements'] = VX.shape[1]
#         args['model_name'] = model_name

#         rbfn = RBFN(args, train_VX, train_VY, test_VX, test_VY)
#         rbfn.load()
#         rbfn.test()

In [None]:
# 2.a Train reduced models -> obtain signatures from reduced models

# train reduced models and get the reduced VX
set_reduced_VX = []
model_names = ['r4', 'r3', 'r2', 'r1']
for (VX, model_name) in zip(set_VX, model_names):
    # generate_reduced_model(VX, VY, model_name) # train reduced model 
    reduced_VX = generate__reduced_signatures_on_reduced_model(VX, model_name)
    set_reduced_VX.append(reduced_VX)
    

In [None]:
# 2.b Train PIs via OSVM (RBF) based on 50000 signatures
reduced_V4X, reduced_V3X, reduced_V2X, reduced_V1X = set_reduced_VX[0], set_reduced_VX[1], set_reduced_VX[2], set_reduced_VX[3]
set_reduced_PX = [(reduced_V4X, reduced_V3X), (reduced_V3X, reduced_V2X), (reduced_V2X, reduced_V1X)]
model_names = ['p43', 'p32', 'p21']
args = {
    'epoch': 50, 
    'batch_size': 200, 
    'lr': 0.01,
    'num_class': 10, 
    'num_centers': 50, # originally 60000 samples to 100 centers -> 10 outputs 
    'num_of_elements': None,
    'save_dir': 'ckpoints',
    'model_name': None
}
for ((reduced_next_VX, reduced_current_VX), model_name) in zip(set_reduced_PX, model_names):

    # combine reduced_next_VX and reduced_current_VX to VX
    PY = VY
    PX = torch.cat((reduced_current_VX, reduced_next_VX), 1)
    
    cut_ratio = 0.95
    cut_num = int(len(PX)*cut_ratio)
    train_PX, test_PX = PX[:cut_num], PX[cut_num:]
    train_PY, test_PY = PY[:cut_num], PY[cut_num:]
    
    args['num_of_elements'] = PX.shape[1]
    args['model_name'] = model_name

    rbfn = RBFN(args, train_PX, train_PY, test_PX, test_PY)
    rbfn.train()
    rbfn.save()
    rbfn.load()
    rbfn.test()

In [None]:
def NIC_eval(img, label):
    score = 1
    
    # get signatures 
    f1, f2, f3, f4 = preprocess(img)
    f1, f2, f3, f4 = f1.view(-1, 16*24*24), f2.view(-1, 16*10*10), f3.view(-1, 32*3*3), f4.view(-1, 64) # flatten
       
    args = {
    'epoch': 30, 
    'batch_size': 200, 
    'lr': 0.01,
    'num_class': 10, 
    'num_centers': 50, # originally 60000 samples to 100 centers -> 10 outputs 
    'num_of_elements': None,
    'save_dir': 'ckpoints',
    'model_name': None
    }
    # get prob from VI 
    set_VX = [f4, f3, f2, f1]
    model_names = ['v4', 'v3', 'v2', 'v1']
    for i, (VX, model_name) in enumerate(zip(set_VX, model_names)):
        if not (i in [0, 1, 2, 3]): continue 
            
        args['num_of_elements'] = VX.shape[1]
        args['model_name'] = model_name

        rbfn = RBFN(args, None, None, None, None)
        rbfn.load()
        p = rbfn.eval(VX, label)
        score *= p
        
    # get prob from PI 
    set_reduced_VX = []
    model_names = ['r4', 'r3', 'r2', 'r1']
    for (VX, model_name) in zip(set_VX, model_names):
        # generate_reduced_model(VX, VY, model_name) # train reduced model 
        reduced_VX = generate__reduced_signatures_on_reduced_model(VX, model_name)
        set_reduced_VX.append(reduced_VX)
        
    reduced_V4X, reduced_V3X, reduced_V2X, reduced_V1X = set_reduced_VX[0], set_reduced_VX[1], set_reduced_VX[2], set_reduced_VX[3]
    set_reduced_PX = [(reduced_V4X, reduced_V3X), (reduced_V3X, reduced_V2X), (reduced_V2X, reduced_V1X)]
    model_names = ['p43', 'p32', 'p21']
    args = {
        'epoch': 30, 
        'batch_size': 200, 
        'lr': 0.01,
        'num_class': 10, 
        'num_centers': 50, # originally 60000 samples to 100 centers -> 10 outputs 
        'num_of_elements': None,
        'save_dir': 'ckpoints',
        'model_name': None
    }
    
    for i, ((reduced_next_VX, reduced_current_VX), model_name) in enumerate(zip(set_reduced_PX, model_names)):
        if not (i in [0, 1, 2]): continue 
        
        # combine reduced_next_VX and reduced_current_VX to VX
        PY = VY
        PX = torch.cat((reduced_current_VX, reduced_next_VX), 1)

        args['num_of_elements'] = PX.shape[1]
        args['model_name'] = model_name

        rbfn = RBFN(args, None, None, None, None)
        rbfn.load()
        p = rbfn.eval(PX, label)
        score *= p    
        
    # compute joint probs 
    threshold = 0.15
    if score > threshold: 
        return True 
    else: 
        return False
    
from LP_utils import *
model = load_model('store/MNIST_CNN.pt')
prefixs = ['store_zero/', 'store_one/', 'store_two/', 'store_three/', 'store_four/', 
        'store_five/', 'store_six/', 'store_seven/', 'store_eight/', 'store_nine/']

num_of_test_imgs = 10
adv_types = ['None', 'FGSM', 'JSMA', 'CWL2', 'LINFPGD', 'LINFBI', 'ENL1', 'ST']
accs = []
for adv_type in adv_types:
    print(adv_type)
    acc = 0
    total = 0 
    for i in range(num_of_test_imgs):
        if adv_type ==  'None': 
            # Load  
            fn = 'adv_images/'+'benign'+str(i)+'.npy'
            print(fn)
            x = np.load(fn)
            
            data = torch.from_numpy(np.expand_dims(x, axis=0).astype(np.float32))
            outputs = model.forward(data).detach().numpy()
            prediction = np.argmax(outputs, axis=1)
            singatures = extract_signature_from_CNN(model, x)
            if NIC_eval(singatures, prediction): acc += 1
            total += 1

        elif not (adv_type is None): 
            fn = 'adv_images/'+adv_type+str(i)+'.npy'
            print(fn)
            try: adv_x = np.load(fn)
            except: continue
                
            data = torch.from_numpy(np.expand_dims(adv_x, axis=0).astype(np.float32))
            outputs = model.forward(data).detach().numpy()
            prediction = np.argmax(outputs, axis=1)[0]
            singatures = extract_signature_from_CNN(model, adv_x)
            if not NIC_eval(singatures, prediction): acc += 1
            total += 1
                
                
    accs.append(acc/total)
    print(accs)
        

In [None]:
for adv_type, acc in zip(adv_types, accs):
    print(adv_type, acc)