### **Imports**

In [1]:
import pickle

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pylab as plt
import scipy.stats
import warnings
warnings.filterwarnings('ignore')

from numpy import linalg
from scipy.spatial.distance import pdist
from scipy.spatial.distance import squareform

### ***Define some hyperparameters***
---
$$
    \text{Hyperparamerters you should define}
$$
---

In [2]:
def set_hyperparameter():
    # Define filename of dataset
    fn_load = 'J_Dataset_1101_0630_outlier_3sig'

    # # of skip in sample (for downsampling)
    n_skip = 10

    # Set initial Training samples
    IDX_INIT_MODEL = 20000
    # IDX_INIT_MODEL = 10000

    # Set k-nearest neighbors in KNN
    k = 10

    # Set distance metric for kNN
    dist_metric = 'euclidean'

    # Significance level for comformal anomaly detection
    # alpha = 0.95 # 95%
    alpha = 0.997 # 99.7%

    # Define type of anomaly detection
    col_interest = ['CG_1', 'CG_2', 'TT_1','TT_2'] # Caisson #1
    # col_interest = ['CG_3', 'CG_4', 'TT_3','TT_4'] # Caisson #2
    return fn_load, n_skip, IDX_INIT_MODEL, k, dist_metric, alpha, col_interest

#### **1. Load Dataset**

In [3]:
def Load_dataset(fn_load = 'J_Dataset_1101_0630_outlier_3sig',
    col_interest = ['Time', 'CG_1', 'CG_2', 'TT_1', 'TT_2']):
    # 1. Import experimental dataset
        with open(fn_load + '.pickle', 'rb') as f:
            data = pickle.load(f)

        fn = data['fn']
        damage_ind = data['damage_ind']
        df1, df3 = data['data'][0], data['data'][1]

        # 2. Set Dataset
        if 0:
            col_interest = ['Time', 'CG_1', 'CG_2', 'TT_1', 'TT_2']
            df = df1[col_interest]
            Label = df1.Label.values

        else:
            col_interest = ['Time', 'CG_3', 'CG_4', 'TT_3', 'TT_4']
            df = df3[col_interest]
            Label = df3.Label.values

        X_all = df.values[:, 1:]
        return X_all.astype(np.float64), Label, df

#### **2. Data Manager**
A class to manage our experimental data set for recursive monitoring
- Initial training set for initial baseline model
- Update baseline model and manipulate training dataset

In [4]:
class DataManager():

    def __init__(self, X, Y, df, IDX_INIT_MODEL = 20000, n_skip = 1):
        # 1. Save data into class
        self.X, self.Y, self.df = X, Y, df
        
        # 2. Reduce the sequence of dataset (too many samples)
        # (Optional, determined by n_skip)
        self.reduce_sequence_by_skip(n_skip)
        IDX_INIT_MODEL = int(IDX_INIT_MODEL/n_skip)
        self.IDX_INIT_MODEL = IDX_INIT_MODEL

        # 3. Find damage index as # index of sample
        damage_ind = []
        for ind_label in np.unique(self.Y_all):
            if ind_label != 0:
                ind_damage = np.where(self.Y_all == ind_label)[0][0]
                damage_ind.append(ind_damage)
        
        self.damage_ind = damage_ind

        # 4. For Allocation of memory
        SIZE_ALL = self.X_all.shape[0]
        
        self.Is_anomaly = np.zeros((SIZE_ALL, 1))
        self.Threshold = np.zeros((SIZE_ALL, 1))

        # 5. Set Initial Traininigset and Testset
        self.Xtrain = self.X_all[0:IDX_INIT_MODEL,:]
        self.Xtest = self.X_all[IDX_INIT_MODEL:,:]
    """
        Reduce massive sample due to computational issues. 
        In reality, it doesn't matter becauses of online implemtation.
        @params
            n_skip: int
                A # of samples for the skip
            dat: np.ndarray
                A mxn array with m samples with n features
        @return
            dat: Reduced # of samples int(m/n_skip)
    """
    def reduce_sequence_by_skip(self, n_skip):
        if self.X.ndim == 1:
            self.X_all = self.X[::n_skip,]
        else:
            self.X_all = self.X[::n_skip, :]

        self.Y_all = self.Y[::n_skip,]
        self.df = self.df.iloc[::n_skip, :]

    """
        Create a line plot of mxn data with label in y for legend creation and title
        @params
            x: np.ndarray
                A mxn array with m samples with n features
            y: np.ndarray
                A m array of labels
            title: str
                The title of the scatter plot
        @return
            The generated plot in case you want to plot over it
    """
    def plot_line_raw_data(self):
        # Plot scatter plot (Time index vs. Label)
        color_type_str = ['blue', 'orange', 'red']

        plt.figure(figsize = (10, 3), dpi = 200)
        for label_ind in np.unique(self.Y_all):
            indice_ = np.where(self.Y_all == label_ind)
            plt.plot(self.df.Time.iloc[indice_], self.Y_all[indice_], marker = '.', color = color_type_str[label_ind])
        plt.xlabel('Time')
        plt.ylabel('Label')
        plt.gca().set_yticks([0, label_ind])
        plt.grid(linestyle = ':')
        if 'CG_1' in self.df.columns:
            struct_type = 'Caisson #1'
        else:
            struct_type = 'Caisson #3'

        plt.title(struct_type)
        plt.show()

        for col_ind in range(self.X_all.shape[1]):
            plt.figure(figsize = (10, 3), dpi = 200)
            for label_ind in np.unique(self.Y_all):
                row_ind = np.where(self.Y_all == label_ind)
                plt.plot(self.df.Time.iloc[row_ind], self.X_all[row_ind, col_ind].reshape(-1, 1),
                        marker = '.', color = color_type_str[label_ind])
            plt.xlabel('Time')
            plt.ylabel(list(self.df.columns[1:])[col_ind])
            plt.grid(linestyle = ':')
            plt.title(struct_type)
            plt.show()

##### **3. K-Nearest Neighbor**
A simple way to calculate a conformal predictor score

In [11]:
class KNearestNeighbors():
    """
        A simple real-valued function to compute the conformal scores
        Each conformal score is the average k-nearest neighbors according to a specified metric
        @params
            k: int
                Determines k nearest neighbors
            metric: str
                distance metric (see scipy's pdist function for valid metrics)
    """
    def __init__(self,k,metric='euclidean'):
        self._k = k
        self._metric = metric

    """
        Returns a pairwise distance matrix
        @params
            x: np.ndarray
                An m x n array with m samples and n dimensions
    """
    def get_pairwise_distance_matrix(self,x):
        distances = pdist(x,self._metric)
        distance_matrix = squareform(distances)
        return distance_matrix

    """
        Returns the mean pairwise distance between the k'th nearest neighbors
        @params
            x: np.ndarray
                An m x n array with m samples and n dimensions
    """
    def __call__(self,x):
        distance_matrix = self.get_pairwise_distance_matrix(x)
        distance_matrix = np.sort(distance_matrix,axis=1)
        assert self._k +1 < distance_matrix.shape[1],\
            print('K must be less than the number of data points (k={},num_samples={})'.format(self._k +1,distance_matrix.shape[1]))
        return np.mean(distance_matrix[:,1:self._k+1],axis=1)

##### **4. Conformal Anomaly Detector (CAD)**

In [14]:
class ConformalAnomalyDetector():
    """
    Conformal Anomaly Detector Class
    @params
        ICM: class
            An object whose call operation should produce an array of conformal scores
        z: tuple (len==2)
            Each element is an (x,y) pair of the training set for CAD
        significance: float
            The significance level (must be between 0 and 1 exclusive)
    """
    def __init__ (self, ICM, x, y = None, significance=0.05):
        self._ICM = ICM
        self.x = x
        self.y = y
        assert significance > 0 and significance < 1, \
            print('Significance must be in range (0,1).')
        self._significance = significance
        
    """
    Return true or false if the test example are an anomaly
    @params
        test: np.ndarray
            A 1xn test example where m is the number of test examples and n is the number of dimensions
    @return: bool
        True if test input is anomaly and false otherwise 
    """
    def testIfAnomaly(self,test):
        conformal_set = np.concatenate((self.x,test))
        conformal_scores = self._ICM(conformal_set)
        p = np.sum(conformal_scores >= conformal_scores[-1]) / (self.x.shape[0]+1)
        return p < self._significance

    """
    Return array of true or false if the test examples are an anomaly
    @params
        test: np.ndarray
            A mxn test example where m is the number of test examples and n is the number of dimensions
    @return: np.ndarray
        An mx1 array of true if test input is anomaly and false otherwise 
    """ 
    def __call__(self,anomalies):
        isAnomaly = [self.testIfAnomaly(np.expand_dims(anomalies[i],axis=0)) for i in range(anomalies.shape[0])]
        return isAnomaly

    """
    Change significance level (hyper-parameter)
    @params
        significance: float
            The significance level (must be between 0 and 1 exclusive)
    """ 
    def set_significance(self,significance):
        assert significance > 0 and significance < 1, \
            print('Significance must be in range (0,1).')
        self._significance = significance

#### **Main**

In [20]:
# Load experimental data
fn_load, n_skip, IDX_INIT_MODEL, k, dist_metric, alpha, col_interest = set_hyperparameter()
X, Y, df = Load_dataset(fn_load, col_interest)

# Generate DataManager Class
dat = DataManager(X, Y, df, IDX_INIT_MODEL, n_skip)
# dat.plot_line_raw_data()

# Define KNN model
k_nearest_neighbor = KNearestNeighbors(k=10) # Initialize the ICM that uses k-nearest neighbors(k=10)

# Scaler
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
Xtrain_scaled = scaler.fit_transform(dat.Xtrain)
conformal_predictor = ConformalAnomalyDetector(ICM=k_nearest_neighbor,x = dat.Xtrain) # initialize CAD

Xnew = dat.Xtest[:100]
significances = [0.025,0.05]
for i in range(len(significances)):
   significance = significances[i]
   Xnew_scaled = scaler.transform(Xnew.reshape(1, -1))
   conformal_predictor.set_significance(significance) # change significance
   isAnomaly = conformal_predictor(Xnew_scaled) # test if anomamlies according to current CAD
   print(isAnomaly)
   # title = 'CAD Visualization (significance level={})'.format(significance)
   # data_generator.showAnomalies(anomalies,isAnomaly,block = i==len(significances)-1,title=title) # plot results


ValueError: X has 400 features, but MinMaxScaler is expecting 4 features as input.

In [19]:
dat.Xtest[:100].shape

(100, 4)

In [22]:
Xnew.shape

(100, 4)

In [9]:
# def main():
#     np.random.seed(123432) # set seed for reproducibility
#     data_generator = DataGenerator(num_samples_per_class=25) # create 10 classes each with 25 samples
#     k_nearest_neighbor = KNearestNeighbors(k=10) # Initialize the ICM that uses k-nearest neighbors(k=10)
#     conformal_predictor = ConformalAnomalyDetector(ICM=k_nearest_neighbor,z=(data_generator.x,data_generator.y)) # initialize CAD
#     anomalies = data_generator.create_anomaly(200) # Generate 200 anomalies

#     significances = [0.025,0.05,0.25,0.5] # see how different significance levels affect results
#     for i in range(len(significances)):
#         significance = significances[i]
#         conformal_predictor.set_significance(significance) # change significance
#         isAnomaly = conformal_predictor(anomalies) # test if anomamlies according to current CAD
#         title = 'CAD Visualization (significance level={})'.format(significance)
#         data_generator.showAnomalies(anomalies,isAnomaly,block = i==len(significances)-1,title=title) # plot results

# if __name__ == '__main__':
#     main()

#### **4. Construct KNN model with given data**

In [10]:
# def main():
#     from sklearn.preprocessing import MinMaxScaler

#     scaler = MinMaxScaler()
#     Xtrain_scaled = scaler.fit_transform(Xtrain)

#     np.random.seed(123432) # set seed for reproducibility
#     data_generator = DataGenerator(num_samples_per_class=25) # create 10 classes each with 25 samples
#     k_nearest_neighbor = KNearestNeighbors(k=k, metric=dist_metric) # Initialize the ICM that uses k-nearest neighbors(k=10)
    
    
#     conformal_predictor = ConformalAnomalyDetector(ICM=k_nearest_neighbor,z=(data_generator.x,data_generator.y)) # initialize CAD
#     anomalies = data_generator.create_anomaly(200) # Generate 200 anomalies

#     significances = [0.025,0.05,0.25,0.5] # see how different significance levels affect results
#     for i in range(len(significances)):
#         significance = significances[i]
#         conformal_predictor.set_significance(significance) # change significance
#         isAnomaly = conformal_predictor(anomalies) # test if anomamlies according to current CAD
#         title = 'CAD Visualization (significance level={})'.format(significance)
#         data_generator.showAnomalies(anomalies,isAnomaly,block = i==len(significances)-1,title=title) # plot results

# if __name__ == '__main__':
#     main()