In [None]:
!pip install dd



In [6]:
import numpy as np
import pandas as pd
import itertools
import math

import torch.nn.functional as F

from dd.autoref import BDD
from joblib import dump, load

import time
from sklearn.decomposition import PCA
from scipy.spatial.distance import hamming
from sklearn.metrics import accuracy_score, confusion_matrix
import pickle
import matplotlib.pyplot as plt
%matplotlib inline

from methods_collection.box_abstraction_monitoring.RuntimeMonitor import BinaryMonitor
from methods_collection.box_abstraction_monitoring.RuntimeMonitor import binary_monitor_online_generation
from methods_collection.box_abstraction_monitoring.RuntimeMonitor import monitor_online_generation


## Auxiliary data functions

In [7]:
def ReLU(x):
    return x * (x > 0)
def dReLU(x):
    return 1 * (x > 0)

In [8]:
def monitor_test_output_verdicts(n_classes, test_features, test_input_labels, y_pred, path_to_store):
  verdicts_list = []

  for y in range(n_classes):
      print(y)
      test_features_class_y = test_features[y_pred == y, :]
      y_label= test_input_labels[y_pred == y]

      if len(y_label) == 0:
        verdicts_list.append([])
        continue
      df = pd.DataFrame()
      index_sampled = np.random.choice(test_features_class_y.shape[0], int(test_features_class_y.shape[0]*0.25), replace=False)
      test_features_class_y = test_features_class_y[index_sampled]
      y_label = y_label[index_sampled]
      df["label"] = y_label

      monitor_path = f'{path_to_store}monitor_{y}.pkl'
      with open(monitor_path, 'rb') as f:
        monitor = load(f)

      verdicts = monitor.make_verdicts(test_features_class_y.tolist())
      df["verdict"] = verdicts
      verdicts_list.append(df)
  print(path_to_store)
  return verdicts_list

In [9]:
def monitors_performance(true_labels, monitor_verdicts):
    if len(true_labels) == len(monitor_verdicts):
        TN = 0
        FP = 0
        MN = 0
        FN = 0
        TP = 0
        MP = 0
        for i in range(len(true_labels)):
            if true_labels[i] == 1:
                if monitor_verdicts[i] == 0:
                    TP += 1
                elif monitor_verdicts[i] == 1:
                    FN += 1
                else:
                    MP += 1
            else:
                if monitor_verdicts[i] == 0:
                    FP += 1
                elif monitor_verdicts[i] == 1:
                    TN += 1
                else:
                    MN += 1
        return TN, FP, MN, FN, TP, MP
    else:
        print("The dimensionalities of inputs are not consistent.")

## Monitors generation

In [10]:
layer=2
frames = []
for i in range(10):
  frames.append(pd.read_csv(f'./models/cifar_10_ann_model/patterns/layer_2/train/train_pattern_dataset_class_{i}.csv', sep=","))
train_dataset_full = pd.concat(frames)
print(f'Loaded train set: train_pattern_dataset_class_full.csv with shape {train_dataset_full.shape}')

Loaded train set: train_pattern_dataset_class_full.csv with shape (50000, 87)


In [None]:
X = dReLU(train_dataset_full.drop(["label", "ground_truth", "pred"] , axis=1).to_numpy())
y = train_dataset_full["label"].to_numpy()
y_pred = train_dataset_full["pred"].to_numpy()
path_to_store = './models/cifar_10_ann_model/monitors/binary/layer_2/'
binary_monitor_online_generation(X, y, y_pred, 1, path_to_store, 10)

In [None]:
test_dataset_full = pd.read_csv(f'./models/cifar_10_ann_model/patterns/layer_2/test/test_pattern_dataset_class_full.csv', sep=",")
print(f'Loaded test set: test_pattern_dataset_class_full.csv with shape {test_dataset_full.shape}')

Loaded test set: test_pattern_dataset_class_full.csv with shape (10000, 87)


In [None]:
X_test = dReLU(test_dataset_full.drop(["label", "ground_truth", "pred"] , axis=1).to_numpy())
y_test = test_dataset_full["label"].to_numpy()
y_test_pred = test_dataset_full["pred"].to_numpy()

test_verdicts_list = monitor_test_output_verdicts(10, X_test, y_test, y_test_pred)

0
1
2
3
4
5
6
7
8
9


In [None]:
tau_list=[0, 1, 2, 3, 4, 5, 6, 7]
df_list = []
for tau in tau_list:
  X = ReLU(train_dataset_full.drop(["label", "ground_truth", "pred"] , axis=1).to_numpy())
  y_lab = train_dataset_full["label"].to_numpy()
  y_pred = train_dataset_full["pred"].to_numpy()
  path_to_store = './models/cifar_10_ann_model/monitors/binary/layer_2/'
  monitor_online_generation(X, y_lab, y_pred, tau, path_to_store, 43)
  performance_results = []
  X_test = ReLU(test_dataset_full.drop(["label", "ground_truth", "pred"] , axis=1).to_numpy())
  y_test = test_dataset_full["label"].to_numpy()
  y_test_pred = test_dataset_full["pred"].to_numpy()

  test_verdicts_list = monitor_test_output_verdicts(43, X_test, y_test, y_test_pred)
  for y in range(43):
    TN, FP, MN, FN, TP, MP = monitors_performance(test_verdicts_list[y]['label'], test_verdicts_list[y]['verdict'])
    set_len = len(test_verdicts_list[y]['label'])
    if set_len == 0:
      performance_results.append([y, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1])
      continue
    UNK = MN + MP
    ACC = np.round(100 * (TP + TN)/(set_len - UNK), 2)
    UNKR = np.round(100 * UNK/set_len, 2)
    if(FP + TN) > 0:
      TPR = np.round(100 * TP/(TP + FN), 2)
      FPR = np.round(100 * FP/(FP + TN), 2)
      TNR = np.round(100 * TP/(TP + FN), 2)
      FNR = np.round(100 * FN/(TP + FN), 2)
    else:
      TPR = -1
      FPR = -1
      TNR = -1
      FNR = -1
    performance_results.append([y, TN, FP, MN, FN, TP, MP, ACC, UNKR, TPR, FPR, TNR, FNR])
  df1 = pd.DataFrame(performance_results, columns=['class', 'TN', 'FP', 'MN', 'FN', 'TP', 'MP', 'ACC', 'UNKR', 'TPR', 'FPR', 'TNR', 'FNR'])
  df_list.append((df1, tau))

In [None]:
for (df1, tau) in df_list:
  print("Tau", tau)
  print(df1)
  print("-------------------")

## Monitors evaluation

In [None]:
path_to_store = './models/gtsrb_ann_model/monitors/binary/layer_2/'
in_test_dataset_full = pd.read_csv(f'./models/gtsrb_ann_model/patterns/layer_2/test/test_pattern_dataset_class_full.csv', sep=",")
in_test_dataset_full['label'] = 0
out_test_dataset_full = pd.read_csv(f'./models/gtsrb_ann_model/patterns/layer_2_imagenet/test/test_pattern_dataset_class_full.csv', sep=",")
out_test_dataset_full['label'] = 1

test_dataset_full = pd.concat([in_test_dataset_full, out_test_dataset_full])
print(f'Loaded test set: test_pattern_dataset_class_full.csv with shape {test_dataset_full.shape}')
X_test = dReLU(test_dataset_full.drop(["label", "ground_truth", "pred"] , axis=1).to_numpy())
y_test = test_dataset_full["label"].to_numpy()
y_test_pred = test_dataset_full["pred"].to_numpy()

test_verdicts_list = monitor_test_output_verdicts(43, X_test, y_test, y_test_pred, path_to_store)

In [None]:
performance_results=[]
TN_total = 0
FP_total = 0
TP_total = 0
FN_total = 0
for y in range(43):
  if len(test_verdicts_list[y]) ==0:
    continue
  test_verdicts_list[y]['verdict'][test_verdicts_list[y]['verdict'] == 2] = 1
  TN, FP, FN, TP = confusion_matrix(test_verdicts_list[y]['label'].to_numpy(), test_verdicts_list[y]['verdict'].to_numpy()).ravel()

  TN_total += TN
  FP_total += FP
  FN_total += FN
  TP_total += TP

if (TP_total+FN_total)>0:
  tpr = np.round(100 * TP_total/(TP_total+FN_total), 2)
if (TN_total+FP_total) > 0:
  tnr = np.round(100 * TN_total/(TN_total+FP_total), 2)
acc = np.round(100*(TN_total + TP_total)/(TN_total+TP_total+FN_total+FP_total), 2)
print('$'+str(tpr)+'$' + ' & ' + '$'+str(tnr)+'$' + ' & ' + '$'+str(acc)+'$')

$100.0$ & $9.15$ & $49.18$
