# Installations

Requires installation of neural-tangents (follow [instructions](https://github.com/google/neural-tangents?tab=readme-ov-file#installation) for your environment)

In [None]:
!pip install scikit-dimension
!pip install adversarial-robustness-toolbox

# Imports

In [None]:
import tensorflow as tf
import numpy
import pandas as pd
import os
from random import seed

from tensorflow import keras
from keras import Sequential
from keras.layers import Dense, Flatten, Conv2D, AveragePooling2D
from tensorflow.keras.layers import BatchNormalization, LayerNormalization, ReLU
from keras.losses import binary_crossentropy, categorical_crossentropy
from keras.optimizers.legacy import Adam
from keras.utils import to_categorical
from keras.datasets import load_mnist

from scipy.spatial.distance import cdist

import torch

from jax import random
from jax.example_libraries import optimizers
from jax import jit, grad, vmap

import jax.numpy as np
import functools
import neural_tangents as nt
from neural_tangents import stax
import jax
import jaxlib

from art.attacks.evasion import FastGradientMethod
from art.estimators.classification import KerasClassifier, TensorFlowV2Classifier
from art.utils import load_mnist, load_cifar10
from art.attacks.evasion import SpatialTransformation

import skdim

# Load Data

In [None]:
def noisy(image):
  row,col,ch = image.shape
  gauss = 0.5*numpy.random.randn(row,col,ch)
  gauss = gauss.reshape(row,col,ch)
  noisy = image + image * gauss
  return noisy

def add_noise(images):
    noisy_images = images.copy()
    for i in range(noisy_images.shape[0]):
        noisy_images[i] = noisy(noisy_images[i])
    return noisy_images

In [None]:
(x_train, y_train), (x_test, y_test), min_pixel_value, max_pixel_value = load_mnist()
input_shape=(28, 28, 1)

x_test_no_noise = x_test.copy()
x_test = add_noise(x_test)

# Dataprep

In [None]:
# Select only class 3 and 5 for binary classification

label_first = 3
label_second = 5

select_first = y_train[:, label_first]==1
select_second = y_train[:, label_second]==1
select_idx = numpy.logical_or(select_first, select_second)
x_train = x_train[select_idx, :]
y_train = y_train[select_idx][:, [label_first, label_second]]

select_first = y_test[:, label_first]==1
select_second = y_test[:, label_second]==1
select_idx = numpy.logical_or(select_first, select_second)
x_test = x_test[select_idx, :]
x_test_no_noise = x_test_no_noise[select_idx, :]
y_test = y_test[select_idx][:, [label_first, label_second]]

## Optional: sampling to overcome potential memory problems of NTK

In [None]:
numpy.random.seed(seed=0)
idx = numpy.random.randint(y_train.shape[0], size=10000)
x_train = x_train[idx,:]
y_train = y_train[idx]

# Define the NTK

## Jax model

In [None]:
init_fn, apply_fn, kernel_fn = stax.serial(
    stax.Conv(4, (5, 5), padding="SAME"),
    stax.Relu(),
    stax.AvgPool((2, 2)),

    stax.Conv(10, (5, 5), padding="SAME"),
    stax.Relu(),
    stax.AvgPool((2, 2)),

    stax.Flatten(),

    stax.Dense(16),
    stax.Relu(),

    stax.Dense(2, W_std=2.**0.5, b_std=0.05)

    #no softmax is available: https://github.com/google/neural-tangents/issues/117
)

key = random.PRNGKey(0)
_, params = init_fn(key, input_shape=x_train.shape) # -1 at first place?

## Computation of NTK kernel

Uses empirical_ntk due to memory issues with kernel_fn with CNN;
returns covariate that is converted to a single number based on the option in `tensor_to_matrix`

In [None]:
def covariate2Number(kernel, tensor_to_matrix="det"):
  torch_tensor = torch.tensor(numpy.array(kernel))

  if tensor_to_matrix == 'norm':
    torch_norm = torch.norm(torch_tensor, p=2, dim=(2,3), keepdim=True)
    kernel = numpy.array(torch_norm.reshape(kernel.shape[0], kernel.shape[1]))
  elif tensor_to_matrix == 'min':
    torch_norm = torch.amin(input=torch_tensor, dim=(2,3))
    kernel = numpy.array(torch_norm.reshape(kernel.shape[0], kernel.shape[1]))
  elif tensor_to_matrix == 'det':
    kernel = np.linalg.det(kernel[:,:])
    kernel = kernel.reshape(kernel.shape[0], kernel.shape[1])
  return kernel

kernel_fn = nt.empirical_ntk_fn(apply_fn, vmap_axes=0, implementation=1, trace_axes=(), diagonal_axes=())
kernel_train = kernel_fn(x_train, None, params)
kernel_train = covariate2Number(kernel_train)

# Train the underlying Neural Network

In [None]:
SEED = 0

def set_seeds(my_seed=SEED):
    os.environ['PYTHONHASHSEED'] = str(my_seed)
    seed(my_seed)
    tf.random.set_seed(my_seed)
    numpy.random.seed(my_seed)

def set_global_determinism(my_seed=SEED):
    set_seeds(my_seed)

    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'

    tf.config.threading.set_inter_op_parallelism_threads(1)
    tf.config.threading.set_intra_op_parallelism_threads(1)

set_global_determinism(SEED)

## Architecture

In [None]:
model = Sequential()

model.add(Conv2D(filters=4, kernel_size=(5, 5), strides=1, input_shape=input_shape))
model.add(ReLU())
model.add(AveragePooling2D(pool_size=(2, 2)))

model.add(Conv2D(filters=10, kernel_size=(5, 5), strides=1))
model.add(ReLU())
model.add(AveragePooling2D(pool_size=(2, 2)))

model.add(Flatten())

model.add(Dense(16))
model.add(ReLU())

model.add(Dense(2, activation="softmax"))

loss_object = keras.losses.BinaryCrossentropy()
optimizer_object = keras.optimizers.Adam(learning_rate=0.01)

model.compile(loss=loss_object, optimizer=optimizer_object, metrics=["accuracy"])

## Training

In [None]:
classifier = TensorFlowV2Classifier(model=model,
                                    nb_classes=2,
                                    input_shape=x_train.shape[1:],
                                    loss_object=loss_object,
                                    optimizer=optimizer_object,
                                    clip_values=(0, 1))



classifier.fit(x_train, y_train, batch_size=32, nb_epochs=10, verbose=True)
predictions = classifier.predict(x_test)
error_test_idx = np.argmax(predictions, axis=1) != np.argmax(y_test, axis=1)
accuracy = 100*np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
accuracy = round(float(accuracy), 2)
print(f"Accuracy on test examples: {accuracy}%")

extractor = keras.Model(inputs=classifier.model.inputs,
                        outputs=[layer.output for layer in model.layers])

# Generate Adversarial Examples

In [None]:
def spatial_transformation_attack(classifier, x_test, y_test):
  attack = SpatialTransformation(classifier,
                                 max_translation=20,
                                 max_rotation=20,
                                 num_translations=1,
                                 num_rotations=1)
  return attack

In [None]:
x_test_to_attack = x_test_no_noise.copy()
attack = spatial_transformation_attack(classifier, x_test_to_attack, y_test)
n_test_adv_examples = 1000 # desired number of adv examples

x_test_adv = attack.generate(x=x_test_to_attack[:n_test_adv_examples, :],
                             y=y_test[:n_test_adv_examples,:])
y_test_adv = y_test[:n_test_adv_examples,:].copy()

In [None]:
# Evaluate on adversarial test examples

feature_layer = 6 # embeddings of this layer are used as features

predictions = classifier.predict(x_test_adv)
error_idx = np.argmax(predictions, axis=1) != np.argmax(y_test[:n_test_adv_examples,:], axis=1)
accuracy = 100*np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test[:n_test_adv_examples,:], axis=1)) / len(y_test[:n_test_adv_examples,:])
accuracy = round(float(accuracy), 2)
print(f"Accuracy on adversarial test examples: {accuracy}%")

x_train_feat = extractor(x_train)[feature_layer].numpy()
print(x_train_feat.shape)

# Keep only the erroneous adversarial test examples
x_test_adv_sel = x_test_adv.copy()
y_test_adv_sel = y_test_adv.copy()
x_test_adv_sel = x_test_adv[error_idx, :].copy()
y_test_adv_sel = y_test_adv[error_idx, :].copy()
x_test_adv_sel_feat = extractor(x_test_adv_sel)[feature_layer].numpy()
print(x_test_adv_sel.shape)
print(x_test_adv_sel_feat.shape)

# Keep only erroneous test examples
x_test_sel = x_test[error_test_idx, :].copy()
y_test_sel = y_test[error_test_idx, :].copy()

x_test_sel_feat = extractor(x_test_sel)[feature_layer].numpy()
print(x_test_sel.shape)
print(x_test_sel_feat.shape)

# Computation of LID and Hubness

In [None]:
# Helper functions to compute kernels and k-NN arrays

def sim_to_dist(sim_mat, n_set_one):
  """
  Converts similarity matrix to distance matrix.
  param: n_set_one is the number of top rows whose diagonal will be set to one
        (so that the example will not be nearerst neighbor of itself)
  """
  min_sim = float(numpy.min(sim_mat[:n_set_one, :n_set_one]))
  max_sim = float(numpy.max(sim_mat[:n_set_one, :n_set_one]))

  dist_mat = 1 - (sim_mat - min_sim) / (max_sim - min_sim)
  for i in range(n_set_one):
    dist_mat[i][i] = 1
  return dist_mat


def compute_knn_arrays(dist_mat):
  knn_dist  = numpy.sort(dist_mat)
  knn_idx  = np.argsort(dist_mat)
  return (knn_dist, knn_idx)


def compute_hubness(dist_mat, k):
  """ Measures frequency of each point being among k-NN of all rest points """
  knn_idx  = np.argsort(dist_mat.T, axis=1)[:, :k]
  knn_idx = knn_idx.flatten()
  unique, counts = numpy.unique(knn_idx, return_counts=True)
  hub_df = pd.DataFrame({'sample_id': unique, 'knn_freq': counts})
  all_ids = pd.DataFrame({'sample_id': list(range(dist_mat.shape[0]))})
  hub_df = pd.merge(all_ids, hub_df, how='left', on='sample_id')
  hub_df['hub_score'] = hub_df['knn_freq'].fillna(0.0)
  return hub_df

In [None]:
kernel = kernel_fn(x_train, x_test_sel, params)
kernel = covariate2Number(kernel)

kernel_adv = kernel_fn(x_train, x_test_adv_sel, params)
kernel_adv = covariate2Number(kernel_adv)

combined_kernel = numpy.concatenate((kernel_train.T, kernel.T, kernel_adv.T))
n_training_examples =  kernel_train.shape[0]
n_test_examples = kernel.T.shape[0]
n_test_adv_examples = kernel_adv.T.shape[0]
dist_mat = sim_to_dist(combined_kernel, n_training_examples)

## LID

In [None]:
k = 20 #the default value in ldim_mle

knn_dist, knn_idx = compute_knn_arrays(dist_mat)
ldim_mle = skdim.id.MLE(unbiased=False)
# X is set as dummy to avoid error, but it is not used since the actual informaton is in precomputed_knn_arrays
lid_values = ldim_mle.fit(X=numpy.ones((knn_dist.shape[0], 2)), y=None, precomputed_knn_arrays = (knn_dist[:, :k], knn_idx[:, :k]), smooth=False)
lid_values = lid_values.dimension_pw_

## Hubness

In [None]:
k = int(dist_mat.shape[1] * 0.6) #uses a high fraction of dataset's size

hub_scores = compute_hubness(dist_mat, k)['hub_score']
hub_train_df = pd.DataFrame({'train': hub_scores[0:n_training_examples]}).reset_index(drop=True)
hub_test_df = pd.DataFrame({'test': hub_scores[n_training_examples:(n_training_examples+n_test_examples)]}).reset_index(drop=True)
hub_test_adv_df = pd.DataFrame({'test_adv': hub_scores[(n_training_examples+n_test_examples):]}).reset_index(drop=True)
hub_df = pd.concat([hub_train_df, hub_test_df, hub_test_adv_df], axis=1)
hub_df = hub_df[['train', 'test', 'test_adv']]