<a href="https://colab.research.google.com/github/vincentliuheyang/ART-Issue/blob/main/adversarial_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
!pip install adversarial-robustness-toolbox
!pip install tensorflow
import numpy as np
from art.attacks.extraction import CopycatCNN, KnockoffNets, FunctionallyEquivalentExtraction
from art.classifiers import TensorFlowV2Classifier
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling1D, Flatten, BatchNormalization, Dropout
import os

In [13]:
class White_Box_Attack_Model:
  def __init__(self,
          trainx,
          trainy,
          im_shape=(28, 28, 1),
          epochs=5,
          batch_size=32,
          num_classes=10,
          c1=32,
          c2=64,
          d1=128):

    #if tf.executing_eagerly():
    #    tf.compat.v1.disable_eager_execution()

    self.trainx = trainx[:trainx.shape[0]-trainx.shape[0] // 10, ...]
    self.trainy = trainy[:trainx.shape[0]-trainx.shape[0] // 10, ...]
    self.testx = trainx[trainx.shape[0]-trainx.shape[0] // 10:, ...]
    self.testy = trainy[trainx.shape[0]-trainx.shape[0] // 10:, ...]

    vm = VictimModel(self.trainx, self.trainy, self.testx, self.testy, im_shape=im_shape, epochs=epochs, batch_size=batch_size, num_classes=num_classes, c1=c1, c2=c2, d1=d1)
    self.victim_model = TensorFlowV2Classifier(vm.model, nb_classes=num_classes, input_shape=im_shape, clip_values=(0, 1), train_step=train_step)
    self.attack_catalogue = {"Probabilistic CopycatCNN": CopycatCNN(classifier=self.victim_model,
                                batch_size_fit=batch_size,
                                batch_size_query=64,
                                nb_epochs=num_epochs,
                                nb_stolen=trainx.shape[0],
                                use_probability=True),
                  "Argmax CopycatCNN": CopycatCNN(classifier=self.victim_model,
                                batch_size_fit=batch_size,
                                batch_size_query=64,
                                nb_epochs=num_epochs,
                                nb_stolen=trainx.shape[0],
                                use_probability=False),
                  "Probabilistic KnockoffNets": KnockoffNets(classifier=self.victim_model,
                                batch_size_fit=batch_size,
                                batch_size_query=64,
                                nb_epochs=num_epochs,
                                nb_stolen=trainx.shape[0],
                                use_probability=True),
                  "Argmax KnockoffNets": KnockoffNets(classifier=self.victim_model,
                                batch_size_fit=batch_size,
                                batch_size_query=64,
                                nb_epochs=num_epochs,
                                nb_stolen=trainx.shape[0],
                                use_probability=False),
                  }

    self.classifier_stolen = None
    self.dir = None

  def fit(self, name):
      attack = self.attack_catalogue[name]
      self.classifier_stolen = attack.extract(self.trainx, self.trainy, thieved_classifier=self.victim_model)

  def evaluate(self, x=None, y=None):
      if x is None and y is None:
          _, acc = self.classifier_stolen._model.evaluate(self.testx, self.testy)
      else:
          _, acc = self.classifier_stolen._model.evaluate(x, y)
      return acc

In [14]:
class VictimModel:
  def __init__(self,
        trainx,
        trainy,
        testx,
        testy,
        valx=None,
        valy=None,
        im_shape=(28,28,1),
        epochs=10,
        batch_size=32,
        num_classes=10,
        c1=32,
        c2=64,
        d1=128,
        dir="victim/training/",
        model_file="victim.h5",
        ):
      if valx is not None:
          self.trainx = trainx
          self.trainy = trainy
          self.valx = valx
          self.valy = valy
      else:
          self.trainx = trainx[:trainx.shape[0]-trainx.shape[0] // 10, ...]
          self.trainy = trainy[:trainx.shape[0]-trainx.shape[0] // 10, ...]
          self.valx = trainx[trainx.shape[0]-trainx.shape[0] // 10:, ...]
          self.valy = trainy[trainx.shape[0]-trainx.shape[0] // 10:, ...]
      self.testx = testx
      self.testy = testy
      self.epochs = epochs
      self.batch_size = batch_size
      self.model = Sequential()
      self.model.add(Conv2D(c1, kernel_size=(3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=im_shape))
      self.model.add(BatchNormalization())
      self.model.add(Dropout(0.2))
      self.model.add(Conv2D(c2, (3, 3), activation='relu', kernel_initializer='he_uniform'))
      self.model.add(BatchNormalization())
      self.model.add(Dropout(0.2))
      self.model.add(Flatten())
      self.model.add(Dense(d1, activation='relu', kernel_initializer='he_uniform'))
      self.model.add(Dense(num_classes, activation='softmax'))
      self.dir = dir
      self.model_file = model_file

  def fit(self, checkpoint_path="target.cpkt"):
    checkpoint_dir = self.dir + checkpoint_path
    checkpoint_dir = os.path.dirname(checkpoint_dir)
    callback = tf.keras.callbacks.EarlyStopping(patience=4, verbose=1)
    history = self.model.compile(loss=tf.keras.losses.categorical_crossentropy, optimizer="sgd", metrics=['accuracy'])

    self.model.fit(self.trainx, self.trainy, epochs=self.epochs, callbacks=[callback], batch_size=self.batch_size, validation_data=(self.valx, self.valy))
    self.model.save_weights(checkpoint_dir)
    latest = tf.train.latest_checkpoint(checkpoint_dir=checkpoint_dir)
    self.model.load_weights((latest))
    self.model.save_weights(self.dir + self.model_file)
    return history

  def evaluate(self, x=None, y=None):
    self.model.compile(loss=tf.keras.losses.categorical_crossentropy, optimizer="sgd", metrics=['accuracy'])
    if x is None and y is None:
        _, acc = self.model.evaluate(self.testx, self.testy)
    else:
        _, acc = self.model.evaluate(x, y)
    return acc

  def load_model(self):
    self.model.load_weights((self.dir + self.model_file))

In [16]:
num_epochs = 5

@tf.function
def train_step(model, images, labels):
    optimizer=tf.keras.optimizers.SGD(learning_rate=1e-3)
    loss_fn=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

    with tf.GradientTape() as tape:
        predictions = model(images)
        loss = loss_fn(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss(loss)
    train_acc_metric(labels, predictions)
    return loss


In [17]:
from art.utils import load_mnist


def load_minst_data():
  (x_train, y_train), (x_test0, y_test0), _, _ = load_mnist()
  len_steal = 5000
  indices = np.random.permutation(len(x_test0))
  x_steal = x_test0[indices[:len_steal]]
  y_steal = y_test0[indices[:len_steal]]
  x_test = x_test0[indices[len_steal:]]
  y_test = y_test0[indices[len_steal:]]
  return (x_train, y_train), (x_test, y_test), (x_steal, y_steal)


(x_train, y_train), (x_test, y_test), (x_steal, y_steal) = load_minst_data()

attack = ["Probabilistic CopycatCNN", "Argmax CopycatCNN", "Probabilistic KnockoffNets", "Argmax KnockoffNets"]


WBAM = White_Box_Attack_Model(x_steal, y_steal)

for a in attack:
  WBAM.fit(a)
  print(WBAM.evaluate())

  '"`sparse_categorical_crossentropy` received `from_logits=True`, but '


ValueError: ignored