In [None]:
import numpy as np
from math import sqrt
from scipy import stats
import os
from scipy import stats
import struct
import itertools
import time
import matplotlib
import matplotlib.pyplot as plt
import sys
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import cdist
from sklearn.metrics.pairwise import pairwise_distances
from scipy.linalg import eigh
from sklearn import svm
from sklearn.linear_model import LogisticRegression
import scipy as sp
import math
from scipy.ndimage.filters import gaussian_filter
from skimage.transform import resize
from skimage.transform import rescale
import png
import imageio
import json
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torch.autograd import Variable
import torchvision.datasets
import torch.nn.functional as f

#Function for training a model
def train_model(model, train_loader, optimizer, loss_criterion, num_epochs=30, num_step_to_display_progress = 250, attack_mode = False, attack_type = 'linf', attack_threshold = 0.01, num_attack_steps = 30, lrate_adv = 0.1):
    model.train()
    total_step = len(train_loader)
    loss_list = []
    acc_list = []
    print("Starting training...")
    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(train_loader):

            # Run the forward pass
            images, labels = images.to(device), labels.to(device)
            #print(images.size())
            if attack_mode:
                attack(model, loss_criterion, images, labels, attack_type, attack_threshold , num_attack_steps , lrate_adv)
            
            outputs = model(images)
            loss = criterion(outputs, labels)

            loss_list.append(loss.item())

            # Backprop and perform Adam optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Track the accuracy
            total = labels.size(0)
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == labels).sum().item()
            acc_list.append(correct / total)

            if (i + 1) % num_step_to_display_progress == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'
                      .format(epoch + 1, num_epochs, i + 1, total_step, loss.item(),
                              (correct / total) * 100))


#Function for evaluating a model
def evaluate_model(model, train_loader, test_loader, loss_criterion = None, attack_mode = False, attack_type = 'linf', attack_threshold = 0.01, num_attack_steps = 30, lrate_adv = 0.1, shape_test = None):
    print("Starting testing...")
    model.eval()
    
    adv_data = None;
    if shape_test!=None:
        adv_data = np.zeros(shape_test)
    
    for param in model.parameters():
        param.requires_grad=False
        
    #Train accuracy
    if not attack_mode:
        correct = 0
        total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Train Accuracy of the model on the {} train images: {} %'.format(total, (correct / total) * 100))

    #Test Accuracy
    correct = 0
    total = 0
    current_count = 0;
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        if attack_mode:
            attack(model, loss_criterion, images, labels, attack_type, attack_threshold , num_attack_steps , lrate_adv)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        if shape_test!=None:
            adv_data[current_count:current_count+images.size(0),:] = images.to("cpu").numpy();
        current_count += images.size(0)

    print('Test Accuracy of the model on the {} test images: {} %'.format(total, (correct / total) * 100))
        
    for param in model.parameters():
        param.requires_grad=True
        
    return adv_data
                

    
    
def attack(model, loss_criterion, images, labels, attack_type = 'linf', attack_threshold = 0.01, num_attack_steps = 30, lrate_adv = 0.001):
    #print("attacking")
    for param in model.parameters():
        param_requires_grad_original_state = param.requires_grad
        param.requires_grad=False
    
    images_original = images.clone()
    for i in range(num_attack_steps):
        #print(i)
        images.requires_grad = True
        outputs = model(images)
        loss = loss_criterion(outputs, labels)
        loss.backward()
        
        with torch.no_grad():
            
            if attack_type == 'linf':
                ##l-inf attack
                
                images_grad_step = lrate_adv*torch.sign(images.grad)
                #Take grad step
                images += images_grad_step
                #Project
                images += (-images + images_original + torch.clamp(images-images_original, -attack_threshold, attack_threshold))
                
            else:
                ##l2 attack
                images_grad = images.grad
                num_images = images.size(0)
                images_grad_flat_normalized = f.normalize(images_grad.view(num_images,-1),dim=1)
                images_grad_step = lrate_adv*images_grad_flat_normalized.view(images.size())

                #Take grad step
                images += images_grad_step

                #Project
                total_step = (images-images_original).view(num_images,-1)
                total_step_size = total_step.norm(dim=1)
                total_step_size = torch.clamp(total_step_size,0,attack_threshold)
                normalized_total_step = f.normalize(total_step,dim=1)
                total_step = normalized_total_step*total_step_size.view(images.size(0),-1)

                images += (-images + images_original + total_step.view(images.size()))
                #images = torch.clamp(images,0,1)
                ##l-2 attack ends

    for param in model.parameters():
        param.requires_grad = param_requires_grad_original_state
    
    images.requires_grad = False
    

#extract laplacian features
def extract_laplacian_features(train_data, test_data, num_features = 40, scaled = True, weighted = True, param = 0.025):
    x_full = np.concatenate((train_data, test_data))
    print("Finding distance matrix...")
    dist_mat = pairwise_distances(x_full)

    print("Creating laplacian matrix...")
    if weighted:
        weights_mat = np.exp(-param*np.square(dist_mat))
    else:
        weights_mat = (dist_mat <= param)
    
    adj = weights_mat - np.eye(np.size(dist_mat,0))
    deg = np.diag(np.sum(adj,1))
    
    if not scaled:
        lap = deg - adj
    else:
        normalization = np.diag(np.sqrt(1./np.diag(deg)))
        lap = np.dot(np.dot(normalization,(deg - adj)),normalization)
        
    print("Finding eigen decomposition...")
    
    S, V = eigh(lap, eigvals=(0,num_features))
    
    train_features = V[0:np.size(train_data,0), 1:1+num_features]
    test_features = V[np.size(train_data,0):np.size(x_full,0), 1:1+num_features]
    print("Done!")
    return train_features, test_features

#Function for saving a model
def save_model(model, PATH):
    print("Saving model...")
    torch.save(model.state_dict(), PATH)
    
#Function for loading a saved model
def load_model(model, PATH):
    print("Loading model...")
    model.load_state_dict(torch.load(PATH))
    model.eval()

In [None]:
##In this block, we define the architecture of a linear model and a one hiddern layer neural net

# Linear net
class LinearNet(nn.Module):
    def __init__(self, num_features):
        super(LinearNet, self).__init__()
        self.fc = nn.Linear(num_features, 10)
        
    def forward(self, x):
        out = self.fc(x)
        return out
    
# One-hidder layer neural network
class NeuralNet(nn.Module):
    def __init__(self, num_features):
        super(NeuralNet, self).__init__()
        self.fc1 = nn.Linear(num_features, 200)
        self.fc2 = nn.Linear(200,10)
        
    def forward(self, x):
        x = f.relu(self.fc1(x))
        out = self.fc2(x)
        return out


In [None]:
class NumpyToTorch:
    """Given a numpy dataset and corresponding labels, creates a class in torch usable form"""

    def __init__(self, numpy_datapoints, numpy_labels):

        self.datapoints = torch.from_numpy(numpy_datapoints).float()
        self.labels = torch.from_numpy(numpy_labels).long()

    def __len__(self):
        return len(self.datapoints)

    def __getitem__(self, idx):
        return (self.datapoints[idx], self.labels[idx])


In [None]:
#Load MNIST data 
def imshow(img):
  plt.imshow(np.reshape(img, (dim,dim)))


def read_data(fname_root):
  fname_img = fname_root + "-images-idx3-ubyte"
  fname_lbl = fname_root + "-labels-idx1-ubyte"
  with open(fname_lbl, 'rb') as flbl:
    magic, num = struct.unpack(">II", flbl.read(8))
    lbl = np.fromfile(flbl, dtype=np.int8)
  with open(fname_img, 'rb') as fimg:
    magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = np.fromfile(fimg, dtype=np.uint8).reshape(len(lbl), rows * cols)
  return img.astype(np.float32), lbl.astype(int)

def preprocess(x): return x/255

    
x_all, y_all = read_data("../../datasets/MNIST/raw/train")
x_test_all, y_test_all = read_data("../../datasets/MNIST/raw/t10k")
x_all, x_test_all = map(preprocess, [x_all, x_test_all])
n_classes = int(1 + np.max(y_all))
print("Data loaded")

x, y = x_all[0:10000,:], y_all[0:10000]
x_test, y_test = x_test_all[0:1000,:], y_test_all[0:10000]
shape_test = np.shape(x_test);

dim = 28
batch_size = 500

trainset = NumpyToTorch(x, y)
testset = NumpyToTorch(x_test, y_test)

# Data loader
train_loader = DataLoader(dataset=trainset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=testset, batch_size=batch_size, shuffle=False)

#Set device to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
#Training and evaluating a neural net over MNIST images

#Set to false if you want to train the model from scratch
PRETRAINED = False

#Set to true if you want to save the new trained model
SAVENET = False

model = NeuralNet(dim**2)
model = model.to(device)


#Load pretrained model or train the model
if PRETRAINED:
    load_model(model, 'pretrained_models/MNIST_NN.pth')
else:
    # Hyperparameters
    num_epochs = 30
    num_classes = 10
    learning_rate = 0.001

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    #train_model(model, train_loader, optimizer, criterion, num_epochs, 10)
    
    #For adversarial training:
    train_model(model, train_loader, optimizer, criterion, num_epochs, 10, attack_mode = True, attack_type = 'l2', attack_threshold = 2, num_attack_steps = 40, lrate_adv = 0.1)

#Evaluate the model
#evaluate_model(model, train_loader, test_loader)

#Evaluate the model on adversarial test images
print("Evaluating performance on adversarial test data...")
x_test_adv = evaluate_model(model, train_loader, test_loader, loss_criterion = criterion, attack_mode = True, attack_type = 'l2', attack_threshold = 2, num_attack_steps = 40, lrate_adv = 0.1, shape_test = shape_test)

if SAVENET:
    save_model(model, 'pretrained_models/MNIST_NN.pth')
    
    

In [None]:
##Extract spectral features
x_train_lap_features, x_test_lap_features = extract_laplacian_features(x, x_test_adv, num_features = 40, scaled = True, weighted = True, param = 0.1)
#x_train_lap_features, x_test_lap_features = extract_laplacian_features(x, x_test_adv, num_features = 40, scaled = True, weighted = False, param = 9)

##Train linear classifier over spectral features
train_lap_dataset = NumpyToTorch(x_train_lap_features[:,0:40], y)
test_lap_dataset = NumpyToTorch(x_test_lap_features[:,0:40], y_test)

# Data loader
train_lap_loader = DataLoader(dataset=train_lap_dataset, batch_size=batch_size, shuffle=True)
test_lap_loader = DataLoader(dataset=test_lap_dataset, batch_size=batch_size, shuffle=False)

# Hyperparameters
num_epochs = 150
num_classes = 10
batch_size = 500
learning_rate = 1

model_linear = LinearNet(40)
model_linear = model_linear.to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_linear.parameters(), lr=learning_rate)

train_model(model_linear, train_lap_loader, optimizer, criterion, num_epochs, 10)

#Evaluate the model
evaluate_model(model_linear, train_lap_loader, test_lap_loader)
