In [2]:
import numpy as np
import matplotlib.pyplot as plt
from numpy import linalg
from scipy.spatial.distance import pdist
from scipy.spatial.distance import squareform
import pickle

# Notebook for testing CAD methodology

## KNN (for conformal score)

In [None]:
# cuda library to run knn (much faster)

In [3]:
class KNearestNeighbors():
    """
        A simple real-valued function to compute the conformal scores
        Each conformal score is the average k-nearest neighbors according to a specified metric
        @params
            k: int
                Determines k nearest neighbors
            metric: str
                distance metric (see scipy's pdist function for valid metrics)
    """
    def __init__(self,k,metric='euclidean'):
        self._k = k
        self._metric = metric

    """
        Returns a pairwise distance matrix
        @params
            x: np.ndarray
                An m x n array with m samples and n dimensions
    """
    def get_pairwise_distance_matrix(self,x):
        distances = pdist(x,self._metric)
        distance_matrix = squareform(distances)
        return distance_matrix

    """
        Returns the mean pairwise distance between the k'th nearest neighbors
        @params
            x: np.ndarray
                An m x n array with m samples and n dimensions
    """
    def __call__(self,x):
        distance_matrix = self.get_pairwise_distance_matrix(x)
        distance_matrix = np.sort(distance_matrix,axis=1)
        assert self._k +1 < distance_matrix.shape[1],\
            print('K must be less than the number of data points (k={},num_samples={})'.format(self._k +1,distance_matrix.shape[1]))
        return np.mean(distance_matrix[:,1:self._k+1],axis=1)

In [64]:
class AbsDistanceFromMedian():
    def get_distance(self, x):
        med = np.median(x)
        distance = abs(x - med)
        return distance
    def __call__(self, x):
        distance = self.get_distance(x)
        return distance

class DistanceFromMedian():
    def get_distance(self, x):
        med = np.median(x)
        distance = x - med
        return distance
    def __call__(self, x):
        distance = self.get_distance(x)
        return distance

## CAD

In [None]:
# table including confidence (histogram)

In [18]:
class ConformalAnomalyDetector():
    """
    Conformal Anomaly Detector Class
    @params
        ICM: class
            An object whose call operation should produce an array of conformal scores
        z: tuple (len==2)
            Each element is an (x,y) pair of the training set for CAD
        significance: float
            The significance level (must be between 0 and 1 exclusive)
    """
    # def __init__ (self,ICM,x,significance=0.05):
    #     self._ICM = ICM
    def __init__ (self,ICM,x,significance=0.05):
        self._ICM = ICM
        self.x = x
        # self.y = z[1]
        assert significance > 0 and significance < 1, \
            print('Significance must be in range (0,1).')
        self._significance = significance
        
    """
    Return true or false if the test example are an anomaly
    @params
        test: np.ndarray
            A 1xn test example where m is the number of test examples and n is the number of dimensions
    @return: bool
        True if test input is anomaly and false otherwise 
    """
    def testIfAnomaly(self,test):
        conformal_set = np.concatenate((self.x,test))
        conformal_scores = self._ICM(conformal_set)
        p = np.sum(conformal_scores >= conformal_scores[-1]) / (len(self.x)+1)
        return p < self._significance

    """
    Return array of true or false if the test examples are an anomaly
    @params
        test: np.ndarray
            A mxn test example where m is the number of test examples and n is the number of dimensions
    @return: np.ndarray
        An mx1 array of true if test input is anomaly and false otherwise 
    """ 
    def __call__(self,anomalies):
        isAnomaly = [self.testIfAnomaly(np.expand_dims(anomalies[i],axis=0)) for i in range(anomalies.shape[0])]
        return isAnomaly

    """
    Change significance level (hyper-parameter)
    @params
        significance: float
            The significance level (must be between 0 and 1 exclusive)
    """ 
    def set_significance(self,significance):
        assert significance > 0 and significance < 1, \
            print('Significance must be in range (0,1).')
        self._significance = significance

## FGSM4

In [19]:
with open("output/base_loss.pkl", "rb") as f:
    base_loss_values = pickle.load(f)\

with open("output/fgsm4_mixed2.pkl", "rb") as f:
    fgsm4_mixed = pickle.load(f)

with open("output/fgsm4_loss.pkl", "rb") as f:
    fgsm4_clean = pickle.load(f)

In [60]:
# k_nearest_neighbor = KNearestNeighbors(k=1)
medAbsDistance = AbsDistanceFromMedian()

train = np.array(base_loss_values).reshape(len(base_loss_values), 1)

# conformal_predictor = ConformalAnomalyDetector(ICM=k_nearest_neighbor, x=train)
conformal_predictor = ConformalAnomalyDetector(ICM=medAbsDistance, x=train)

In [61]:
conformal_predictor.set_significance(0.30)

test = np.array(fgsm4_mixed).reshape(len(fgsm4_mixed), 1)
is_outlier = conformal_predictor(test)

In [62]:
print("FP", sum(is_outlier[0:len(base_loss_values)]))
print("TN", len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)]))
print("FN", len(fgsm4_clean) - sum(is_outlier[len(base_loss_values):]))
print("TP", sum(is_outlier[len(base_loss_values):]))

FP 14996
TN 35004
FN 15666
TP 34334


In [63]:
print("accuracy", (sum(is_outlier[len(base_loss_values):]) + len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)])) / len(fgsm4_mixed))

accuracy 0.69338


### testing raw distance from median (no abs)

In [65]:
medDistance = DistanceFromMedian()

train = np.array(base_loss_values).reshape(len(base_loss_values), 1)
conformal_predictor = ConformalAnomalyDetector(ICM=medDistance, x=train)

In [73]:
significances = [0.01, 0.05, 0.1, 0.15, 0.25, 0.4, 0.5]

In [74]:
for s in significances: 
    conformal_predictor.set_significance(s)
    test = np.array(fgsm4_mixed).reshape(len(fgsm4_mixed), 1)
    is_outlier = conformal_predictor(test)

    print(f"## significance: {s} ## \n")

    print("FP", sum(is_outlier[0:len(base_loss_values)]))
    print("TN", len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)]))
    print("FN", len(fgsm4_clean) - sum(is_outlier[len(base_loss_values):]))
    print("TP", sum(is_outlier[len(base_loss_values):]))
    print("accuracy", (sum(is_outlier[len(base_loss_values):]) + len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)])) / len(fgsm4_mixed))
    print("\n")

## significance: 0.01 ## 

FP 484
TN 49516
FN 42627
TP 7373
accuracy 0.56889


## significance: 0.05 ## 

FP 2498
TN 47502
FN 31995
TP 18005
accuracy 0.65507


## significance: 0.1 ## 

FP 4981
TN 45019
FN 22476
TP 27524
accuracy 0.72543


## significance: 0.15 ## 

FP 7492
TN 42508
FN 15514
TP 34486
accuracy 0.76994


## significance: 0.25 ## 

FP 12479
TN 37521
FN 8223
TP 41777
accuracy 0.79298


## significance: 0.4 ## 

FP 19978
TN 30022
FN 3605
TP 46395
accuracy 0.76417


## significance: 0.5 ## 

FP 24978
TN 25022
FN 2045
TP 47955
accuracy 0.72977




# PGDl2

In [75]:
with open("output/base_loss.pkl", "rb") as f:
    base_loss_values = pickle.load(f)\

with open("output/pgdl2_mixed2.pkl", "rb") as f:
    pgdl2_mixed = pickle.load(f)

with open("output/pgdl2_loss.pkl", "rb") as f:
    pgdl2_clean = pickle.load(f)

In [77]:
medDistance = DistanceFromMedian()

train = np.array(base_loss_values).reshape(len(base_loss_values), 1)
conformal_predictor = ConformalAnomalyDetector(ICM=medDistance, x=train)

In [78]:
significances = [0.01, 0.05, 0.1, 0.15, 0.25, 0.4, 0.5]

In [79]:
for s in significances: 
    conformal_predictor.set_significance(s)
    test = np.array(pgdl2_mixed).reshape(len(pgdl2_mixed), 1)
    is_outlier = conformal_predictor(test)

    print(f"## significance: {s} ## \n")

    print("FP", sum(is_outlier[0:len(base_loss_values)]))
    print("TN", len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)]))
    print("FN", len(pgdl2_clean) - sum(is_outlier[len(base_loss_values):]))
    print("TP", sum(is_outlier[len(base_loss_values):]))
    print("accuracy", (sum(is_outlier[len(base_loss_values):]) + len(base_loss_values) - sum(is_outlier[0:len(base_loss_values)])) / len(pgdl2_mixed))
    print("\n")

## significance: 0.01 ## 

FP 484
TN 49516
FN 45913
TP 4087
accuracy 0.53603


## significance: 0.05 ## 

FP 2498
TN 47502
FN 40151
TP 9849
accuracy 0.57351


## significance: 0.1 ## 

FP 4981
TN 45019
FN 34269
TP 15731
accuracy 0.6075


## significance: 0.15 ## 

FP 7492
TN 42508
FN 28655
TP 21345
accuracy 0.63853


## significance: 0.25 ## 

FP 12479
TN 37521
FN 20507
TP 29493
accuracy 0.67014


## significance: 0.4 ## 

FP 19978
TN 30022
FN 12858
TP 37142
accuracy 0.67164


## significance: 0.5 ## 

FP 24978
TN 25022
FN 9322
TP 40678
accuracy 0.657


