## Pixel defense of adversarial samples

In [None]:
import pylab as plt
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans 
from mpl_toolkits.mplot3d import Axes3D 
from sklearn import preprocessing
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
import pandas as pd
from numpy import *
from matplotlib.pyplot import *
import cv2 as cv
import glob as glob
import os


### 1. Define One-class classifiers

In [None]:

def elliptic_envelope(y,is_use=True):
    if(is_use):
        #########################################
        fraction = 0.005
        envelope =  EllipticEnvelope(contamination = fraction) 
        X_train = y.reshape(-1,1)
        try:
            envelope.fit(X_train)
        except:
          print("elliptic_envelope occurred") 
          return []
        df_class3 = pd.DataFrame(X_train)
        #df_class3['deviation'] = envelope.decision_function(X_train)
        df_class3['anomaly'] = envelope.predict(X_train)
        a3 = df_class3.loc[df_class3['anomaly'] == -1]  
        ob_list2 = [x for x in a3.index]
        print('EllipticEnvelope: ',ob_list2)
        return ob_list2    
    else:
        return []
    #########################################

def isolation_forest(y, is_use=True):
    if(is_use):
        fraction = 0.005
        data = y.reshape(-1,1)
        model =  IsolationForest(contamination = fraction)
        model.fit(data)
        # add the data to the main  
        ob_data = pd.Series(model.predict(data))
        ob_list3 = [j for i,j in zip(ob_data,range(0,len(data))) if i == -1]
        #print(ob_list)
        print('IsolationForest: ',ob_list3)
        return ob_list3
    else:
        return []    
    #########################################


def one_class_svm(y, is_use=True):
    if(is_use):        
        #########################################
        fraction = 0.005
        model =  OneClassSVM(nu = fraction) 
        data = pd.DataFrame(y)
        model.fit(data)
        pds = pd.Series(model.predict(data)) 
        ob_list1 = [j for i,j in zip(pds,range(0,len(data))) if i == -1] 
        print('OneClassSVM: ',ob_list1)
        return ob_list1
    else:
        return []
    
def getDistanceByPoint(data, model):
    distance = pd.Series()
    for i in range(0,len(data)):
        Xa = np.array(data[i])
        Xb = model.cluster_centers_[model.labels_[i]-1]
        distance.set_value(i, np.linalg.norm(Xa-Xb))
    return distance

def my_kmeans(y, is_use=True):
    if(is_use):
        fraction = 0.005
        data = y.reshape(-1,1)
        kmeans = KMeans(n_clusters = 2)
        X_clustered = kmeans.fit_predict(data)
        #print(X_clustered)
        #Define our own color map
        LABEL_COLOR_MAP = {0:'r', 1: 'b'}
        label_color = [LABEL_COLOR_MAP[l] for l in X_clustered]
        distance = getDistanceByPoint(data, kmeans)
        number_of_outliers = int(fraction*len(distance))
        threshold = distance.nlargest(number_of_outliers).min()
        # anomaly21 contain the anomaly result of method 2.1 Cluster (0:normal, 1:anomaly) 
        ob_data = (distance >= threshold).astype(int)
        ob_list4 = [index for index in range(len(ob_data)) if ob_data[index] == 1]
        print('Kmeans: ',ob_list4)
        return ob_list4
    else:
        return []
    #########################################
    
def my_lof(y, is_use=True):
    if(is_use):
        fraction = 0.005
        data = y.reshape(-1,1)        
        model_lof = LocalOutlierFactor(contamination=fraction)        
        model_lof.fit(data)
        ob_data = model_lof.fit_predict(data)        
        ob_list5 = [index for index in range(len(ob_data)) if ob_data[index] == -1]
        print('LocalOutlierFactor: ',ob_list5)
        return ob_list5
    else:
        return []
    #########################################   

In [None]:
# CIFAR-10
IMG_HEIGHT = 32
IMG_WIDTH = 32
IMG_CH = 3

# COVID-19
# IMG_HEIGHT = 224
# IMG_WIDTH = 224
# IMG_CH = 1

# ImagenNet
# IMG_HEIGHT = 299
# IMG_WIDTH = 299
# IMG_CH = 3


## 2. Space Transformation and perturbed pixel localization

### 2.1 Transfrom image to 3 pricinple component

In [None]:
# Test with images
def findMajority(arr, n):    
    L = []    
    for i in range(n):
        count = 0
        
        for j in range(n):
            if(arr[i] == arr[j]):
                count += 1
 
        if(count > 1.5):  # 3/2         
            L.append(arr[i])
    print(list(set(L)))
    return L

def attacked_pixel_loc_pca3(img, use_pca=True):
    img_transformed = []
    if(use_pca):
        X = img.copy()
        X = np.reshape(X, (-1, IMG_CH)) # [32,32,3] => [1024, 3]
        pca = PCA(3)
        #pca = KernelPCA(3,"cosine") # nonlinear PCA with "cosine" kernel
        #pca = KernelPCA(4,"cosine") # nonlinear PCA(4 components) with "cosine" kernel 
        #pca = KernelPCA(5,"cosine") # nonlinear PCA (5 components) with "cosine" kernel
        #pca = KernelPCA(3, "rbf") # nonlinear PCA with "rbf" kernel

        img_transformed = pca.fit(X).transform(X)  # e.g., [1024, 3]

        t = linspace(0, IMG_HEIGHT*IMG_WIDTH-1, IMG_HEIGHT*IMG_WIDTH)   
        y = zeros(len(t))             
        for i in range(len(t)):
            y[i] = img_transformed[i,1]
    else:
        img_transformed = img.reshape(-1,IMG_CH)   

    # original image
    ob_list = []
    for i in range(img_transformed.shape[1]):        
        pca_ch = img_transformed[:,i]
        ob_list.extend(elliptic_envelope(pca_ch,True)) 
        ob_list.extend(isolation_forest(pca_ch, True))
        ob_list.extend(my_lof(pca_ch,True))          
        
    final_list = findMajority(ob_list, len(ob_list))   
    
    print('final:',final_list)
    return final_list

### 2.2 Transfrom image to 2 pricinple component

In [None]:
def attacked_pixel_loc_pca2(img):
    H,W,C = img.shape
    X = img.copy()
    X = np.reshape(X, (-1, IMG_CH))
    pca = PCA(2)
    img_transformed = pca.fit(X).transform(X)

    t = linspace(0, IMG_HEIGHT*IMG_WIDTH-1, IMG_HEIGHT*IMG_WIDTH)    
    y = zeros(len(t))              
    for i in range(len(t)):
        #data = X.reshape(-1,3)
        y[i] = img_transformed[i,1]

    plot(t, y)
    show()
    
    pca_ch0 = img_transformed[:,0] 
    pca_ch1 = img_transformed[:,1] 
    
    ob_list = []
    for i in range(img_transformed.shape[1]):        
        pca_ch = img_transformed[:,i]
        ob_list.extend(elliptic_envelope(pca_ch,True)) 
        ob_list.extend(isolation_forest(pca_ch, True))
        ob_list.extend(my_lof(pca_ch,True))          
        
    final_list = findMajority(ob_list, len(ob_list))
    
    print('final:',final_list)
    return final_list

### 2.3 Locate attacks and recover the purturbed pixels.

In [None]:
# Test with one image
if (0):
    filename = '/home/001-experiments/3_0_8.png'
    dp3_name = '/home/001-experiments/3_0_8_pca2.png'
    in_img = cv.imread(filename,1)
    if in_img is None:
        print(filename)
        print('error!')
    head, img_name = os.path.split(filename)        
    pixel_list = attacked_pixel_loc_pca2(in_img)

    X_line = in_img.reshape(-1,IMG_CH)
    for index_max in pixel_list:
        left_point = index_max-1
        right_point = index_max+1
        up_point = (index_max//IMG_WIDTH-1)*IMG_WIDTH+index_max%IMG_WIDTH
        down_point = (index_max//IMG_WIDTH+1)*IMG_WIDTH+index_max%IMG_WIDTH
        if(left_point < IMG_WIDTH*IMG_WIDTH and right_point < IMG_WIDTH*IMG_WIDTH and up_point < IMG_WIDTH*IMG_WIDTH and down_point < IMG_WIDTH*IMG_WIDTH):            
            for c_i1 in range(IMG_CH):                
                X_line[index_max,c_i1] = X_line[left_point,c_i1]
        elif(right_point >= IMG_WIDTH*IMG_WIDTH-1):
            for c_i2 in range(IMG_CH):                                
                X_line[index_max,c_i2] = X_line[left_point,c_i2]
        elif(left_point <= 0):
            for c_i3 in range(IMG_CH):                
                X_line[index_max,c_i3] = X_line[right_point,c_i3]

    X_line_reshape = X_line.reshape(IMG_WIDTH,IMG_WIDTH,3)
    cv.imwrite(dp3_name,X_line_reshape)

In [None]:

def main(in_list, choice):
    count = 0
    final_txt = open('pixel_list.txt','a')
    for filename in in_list:          
        in_img = cv.imread(filename,1)
        if in_img is None:
            print(filename)
            continue
        head, img_name = os.path.split(filename)        
        defense_path = head

        pixel_list = []
        if(choice == 1):
            pixel_list = attacked_pixel_loc_pca3(in_img, False)  
            defense_path = head+'/defense/'            
        elif(choice == 2):
            pixel_list = attacked_pixel_loc_pca2(in_img)  
            defense_path = head[:-8]+'/defense_pca2/'
        elif(choice == 3):
            pixel_list = attacked_pixel_loc_pca3(in_img, True)
            defense_path = head[:-8]+'/defense/'
        
        if not os.path.exists(defense_path):
            os.makedirs(defense_path)
        
        defense_path = defense_path + img_name
        final_txt.write(str(pixel_list)+'\n')
        
        print(defense_path)
        X_line = in_img.reshape(-1,IMG_CH)        
        for index_max in pixel_list:
            left_point = index_max-1
            right_point = index_max+1
            up_point = (index_max//IMG_WIDTH-1)*IMG_WIDTH+index_max%IMG_WIDTH
            down_point = (index_max//IMG_WIDTH+1)*IMG_WIDTH+index_max%IMG_WIDTH
            if(left_point < IMG_WIDTH*IMG_WIDTH and right_point < IMG_WIDTH*IMG_WIDTH and up_point < IMG_WIDTH*IMG_WIDTH and down_point < IMG_WIDTH*IMG_WIDTH):            
                for c_i1 in range(IMG_CH):                    
                    X_line[index_max,c_i1] = X_line[left_point,c_i1]
            elif(right_point >= IMG_WIDTH*IMG_WIDTH-1):
                for c_i2 in range(IMG_CH):                                    
                    X_line[index_max,c_i2] = X_line[left_point,c_i2]
            elif(left_point <= 0):
                for c_i3 in range(IMG_CH):                    
                    X_line[index_max,c_i3] = X_line[right_point,c_i3]

        X_line_reshape = X_line.reshape(IMG_WIDTH,IMG_WIDTH,3)        
        print(defense_path)
        cv.imwrite(defense_path,X_line_reshape)
        
        count += 1
        if(count > 2):
            #return        
            pass
    final_txt.close()

## 3. Defense 

### 3.1 Defense on non-targeted adversarial samples

In [None]:

choice_id = 2  # tranfromation type 1: no pca, 2: pca2, 3;pca3
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
for no, cls_name in zip(range(10),class_names):
    attacked_path = 'resnet_data_p3/' + str(no)+'_'+cls_name+'/attacked/'    
    attacked_list = glob.glob(attacked_path + '*.png')
    main(attacked_list,choice_id)


### 3.2 Defense on targeted adversarial samples

In [None]:
TEST = True
choice_id = 3
if(TEST):
    attacked_path = '/home/attacked_p1/'
    attacked_list = glob.glob(attacked_path + '*.png')
#     print(attacked_list)
    main(attacked_list,choice_id)