In [None]:
!pip install adversarial-robustness-toolbox

### Copycat CNN


In [None]:
import numpy as np
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.optimizers import Adam

# ラッパーおよびユーティリティをインポートする
from art.estimators.classification.keras import KerasClassifier
from art.utils import load_mnist

# MNISTデータセットをロードする
(X_train, y_train), (X_test, y_test), \
    min_pixel_value, max_pixel_value = load_mnist()

nb_classes=10

# 攻撃対象のモデルを定義する
model = Sequential()
model.add(Conv2D(1,kernel_size=(7, 7), activation='relu', 
                 input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(4, 4)))
model.add(Flatten())
model.add(Dense(nb_classes, activation='softmax'))
model.compile(loss=categorical_crossentropy,
              optimizer=Adam(learning_rate=0.01),
              metrics=['accuracy'])

victim_classifier = KerasClassifier(model,
                                    clip_values=(0, 1), 
                                    use_logits=False)
victim_classifier.fit(X_train, y_train, nb_epochs=5, batch_size=128)

In [None]:
# 窃取先のモデルの雛形を定義する
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', 
                 input_shape=(28, 28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes, activation='softmax'))
model.compile(loss=categorical_crossentropy,
              optimizer=Adam(learning_rate=0.01),
              metrics=['accuracy'])

thieved_classifier = KerasClassifier(model,
                                     clip_values=(0, 1), 
                                     use_logits=False)

In [None]:
# 攻撃手法をインポートする
from art.attacks.extraction.copycat_cnn import CopycatCNN

attack = CopycatCNN(classifier=victim_classifier,
                    batch_size_fit=16,
                    batch_size_query=16,
                    nb_epochs=10,
                    nb_stolen=1000)

# 攻撃結果として訓練済のサロゲートモデルを得る
thieved_classifier = attack.extract(x=X_train,
                                    thieved_classifier=thieved_classifier)

# 結果を表示する
victim_preds = np.argmax(victim_classifier.predict(x=X_train[:100]), 
                         axis=1)
thieved_preds = np.argmax(thieved_classifier.predict(x=X_train[:100]),
                          axis=1)
acc = np.sum(victim_preds == thieved_preds) / len(victim_preds)
print('Accuracy of the surrogate model: {}%'.format(acc * 100))

### FGSM

In [None]:
# 改変前のX_testに対するスコアを表示する
preds = victim_classifier.predict(X_test)
acc = np.sum(np.argmax(preds, axis=1)
             == np.argmax(y_test, axis=1)) / len(y_test)
print('\nAccuracy on benign test examples: {}%'.format(acc * 100))

# 攻撃手法をインポートする
from art.attacks.evasion import FastGradientMethod

attack = FastGradientMethod(estimator=victim_classifier, eps=.1)

# 攻撃の結果としてAdversarial Exampleを得る
X_test_adv = attack.generate(x=X_test)

# 改変後のX_testに対するスコアを表示する
preds = victim_classifier.predict(X_test_adv)
acc = np.sum(np.argmax(preds, axis=1)
             == np.argmax(y_test, axis=1)) / len(y_test)
print('\nAccuracy on adversarial test examples: {}%'.format(acc * 100))

# 生成したAdversarial Exampleをプロットする
from matplotlib import pyplot as plt
plt.matshow(X_test_adv[0, :].reshape((28, 28)))
plt.clim(0, 1)

### Carlini & Wagner Attack

In [None]:
# 攻撃手法をインポートする
from art.attacks.evasion.carlini import CarliniL2Method

# ターゲット型攻撃だが、ランダムなターゲットを指定することもできる
from art.utils import random_targets

# ここではL2ノルム最小化を試みる
attack = CarliniL2Method(classifier=victim_classifier,
                         targeted=True,
                         max_iter=10)
params = {'y': random_targets(y_test, victim_classifier.nb_classes)}

# 攻撃の結果としてAdversarial Exampleを得る
X_test_adv = attack.generate(x=X_test, **params)

### ZOO Attack

In [None]:
import lightgbm as lgb

# ラッパーおよびユーティリティをインポートする
from art.estimators.classification import LightGBMClassifier
from art.utils import load_mnist

# MNISTデータセットをロードする
(X_train, y_train), (X_test, y_test), \
    min_pixel_value, max_pixel_value = load_mnist()

# 今回は5枚の画像にのみ摂動を加える
X_test = X_test[0:5]
y_test = y_test[0:5]

nb_samples_train = X_train.shape[0]
nb_samples_test = X_test.shape[0]
X_train = X_train.reshape((nb_samples_train, 28 * 28))
X_test = X_test.reshape((nb_samples_test, 28 * 28))

# 攻撃対象のモデルを訓練する
params = {'objective': 'multiclass',
          'metric': 'multi_logloss',
          'num_class': 10}

lgb_train = lgb.Dataset(X_train, label=np.argmax(y_train, axis=1))
lgb_test = lgb.Dataset(X_test, label=np.argmax(y_test, axis=1))
model = lgb.train(params=params, train_set=lgb_train, num_boost_round=100, 
                  valid_sets=[lgb_test])

victim_classifier = LightGBMClassifier(model=model,
                                       clip_values=(min_pixel_value, max_pixel_value))

In [None]:
# 攻撃手法をインポートする
from art.attacks.evasion import ZooAttack

attack = ZooAttack(classifier=victim_classifier,
                   confidence=0.5,
                   targeted=False,
                   learning_rate=1e-1,
                   max_iter=200,
                   binary_search_steps=100,
                   initial_const=1e-1,
                   nb_parallel=250,
                   batch_size=1,
                   variable_h=0.01)

# 攻撃の結果としてAdversarial Exampleを得る
X_test_adv = attack.generate(x=X_test)

### Adversarial Training

In [None]:
# MNISTデータセットをロードする
(X_train, y_train), (X_test, y_test), \
    min_pixel_value, max_pixel_value = load_mnist()

nb_classes=10

# 攻撃対象のモデルを定義する
model = Sequential()
model.add(Conv2D(1,kernel_size=(7, 7), activation='relu', 
                 input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(4, 4)))
model.add(Flatten())
model.add(Dense(nb_classes, activation='softmax'))
model.compile(loss=categorical_crossentropy,
              optimizer=Adam(learning_rate=0.01),
              metrics=['accuracy'])

victim_classifier = KerasClassifier(model,
                                    clip_values=(0, 1), 
                                    use_logits=False)
victim_classifier.fit(X_train, y_train, nb_epochs=5, batch_size=128)

attack = FastGradientMethod(estimator=victim_classifier, eps=.1)

In [None]:
# 防御手法をインポートする
from art.defences.trainer.adversarial_trainer import AdversarialTrainer

adv_tranier = AdversarialTrainer(victim_classifier, attack)
adv_tranier.fit(X_train, y_train, batch_size=100, nb_epochs=2)

### Randomized Smoothing

In [None]:
# TensorflowのEager Execution機能をオンオフするため、Jupyterのカーネルを再起動する
# 「すべてのセルを実行」などで複数セルを実行している場合、手動で以降のセルから再開する必要がある
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import numpy as np
import tensorflow as tf
tf.compat.v1.enable_eager_execution()
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.optimizers import Adam

# ラッパーおよびユーティリティをインポートする
from art.estimators.classification.keras import KerasClassifier
from art.utils import load_mnist

# MNISTデータセットをロードする
(X_train, y_train), (X_test, y_test), \
    min_pixel_value, max_pixel_value = load_mnist()

nb_classes=10

# 攻撃対象のモデルを定義する
model = Sequential()
model.add(Conv2D(1,kernel_size=(7, 7), activation='relu', 
                 input_shape=(28, 28, 1)))
model.add(MaxPooling2D(pool_size=(4, 4)))
model.add(Flatten())
model.add(Dense(nb_classes, activation='softmax'))
model.compile(loss=categorical_crossentropy,
              optimizer=Adam(learning_rate=0.01),
              metrics=['accuracy'])

In [None]:
# ARTのRandomized SmoothingはTensorFlow 2の利用を前提とする
from art.estimators.certification.randomized_smoothing \
import TensorFlowV2RandomizedSmoothing
# 訓練用のパラメータを定義する
nb_classes=10
nb_epochs = 40
batch_size = 128
input_shape = X_train.shape[1:]
alpha = 0.001
sample_size = 100

# TensorFlowのパラメータ更新用関数を定義する
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
def train_step(model, images, labels):        
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_object(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

loss_object = tf.keras.losses.CategoricalCrossentropy(from_logits=False)

In [None]:
from matplotlib import pyplot as plt

# 異なる分散のガウシアンノイズを定義する
sigmas = {
    'Smoothed Classifier, sigma=0.1': 0.1,
    'Smoothed Classifier, sigma=0.25': 0.25,
    'Smoothed Classifier, sigma=0.5': 0.5
}
classifiers = {}

def get_cert_acc(radius, pred, y_test):

    rad_list = np.linspace(0, 2.25, 201)
    cert_acc = []
    num_cert = len(np.where(radius > 0)[0])
    
    for r in rad_list:
        rad_idx = np.where(radius > r)[0]
        y_test_subset = y_test[rad_idx]
        cert_acc.append(np.sum(pred[rad_idx] == \
                               np.argmax(y_test_subset, axis=1)) / num_cert)
        
    return cert_acc

for name in sigmas:
    sigma = sigmas[name]

    # 「平滑化された」分類器を訓練する
    classifier = \
    TensorFlowV2RandomizedSmoothing(model=model,
                                    nb_classes=nb_classes,
                                    input_shape=input_shape,
                                    loss_object=loss_object,
                                    train_step=train_step,
                                    channels_first=False,
                                    clip_values=(min_pixel_value, 
                                                 max_pixel_value),
                                    sample_size=sample_size,
                                    scale=sigma,
                                    alpha=alpha)

    classifier.fit(X_train, y_train, nb_epochs=nb_epochs,
                   batch_size=batch_size)
    
    # Certified Accuracyを取得する
    cert_preds, radius = classifier.certify(X_test, n=500)
    
    # 半径ごとにCertified Accuracyをプロットする
    rad_list = np.linspace(0, 2.25, 201)
    plt.plot(rad_list, get_cert_acc(radius, cert_preds, y_test),
             label=name)

    classifiers[name] = classifier

plt.xlabel('radius')
plt.ylabel('certified accuracy')
plt.legend()
plt.show()

### BadNets

In [None]:
# TensorflowのEager Execution機能をオンオフするため、Jupyterのカーネルを再起動する
# 「すべてのセルを実行」などで複数セルを実行している場合、手動で以降のセルから再開する必要がある
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import numpy as np
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.optimizers import Adam

# ラッパーおよびユーティリティをインポートする
from art.estimators.classification.keras import KerasClassifier
from art.utils import load_mnist, preprocess

# MNISTデータセットをロードする
# 今回は正規化される前のデータを加工するため、raw=Trueとする
(X_train_raw, y_train_raw), (X_test_raw, y_test_raw), \
    min_pixel_value, max_pixel_value = load_mnist(raw=True)
nb_classes=10

# 攻撃手法をインポートする
from art.attacks.poisoning import PoisoningAttackBackdoor
from art.attacks.poisoning.perturbations import add_pattern_bd

# 画像の右下にトリガーを埋め込む
max_val = np.max(X_train_raw)
def add_modification(x):
    return add_pattern_bd(x, pixel_value=max_val)

# データセットを汚染する
def poison_dataset(X_clean, y_clean, percent_poison, poison_func):
    X_poison = np.copy(X_clean)
    y_poison = np.copy(y_clean)
    is_poison = np.zeros(np.shape(y_poison))
    
    sources=np.arange(nb_classes)
    targets=(np.arange(nb_classes) + 1) % nb_classes

    # 訓練データから汚染対象のデータを選択し、ノイズを加える
    for i, (src, tgt) in enumerate(zip(sources, targets)):
        n_points_in_tgt = np.size(np.where(y_clean == tgt))
        num_poison = round((percent_poison * n_points_in_tgt)
                           / (1 - percent_poison))

        src_imgs = X_clean[y_clean == src]

        n_points_in_src = np.shape(src_imgs)[0]
        indices_to_be_poisoned = np.random.choice(n_points_in_src, 
                                                  num_poison)

        imgs_to_be_poisoned = np.copy(src_imgs[indices_to_be_poisoned])

        # 攻撃を初期化する
        attack = PoisoningAttackBackdoor(add_modification)
        
        # 攻撃を実行する
        imgs_to_be_poisoned, poison_labels = \
        attack.poison(imgs_to_be_poisoned, y=np.ones(num_poison) * tgt)
        
        X_poison = np.append(X_poison, imgs_to_be_poisoned, axis=0)
        y_poison = np.append(y_poison, poison_labels, axis=0)
        is_poison = np.append(is_poison, np.ones(num_poison))

    is_poison = is_poison != 0

    return is_poison, X_poison, y_poison
    
# 訓練データの33%を汚染する
percent_poison = .33

(is_poison_train, X_poisoned_train_raw, y_poisoned_train_raw) = \
poison_dataset(X_train_raw, y_train_raw, percent_poison, add_modification)
X_train, y_train = preprocess(X_poisoned_train_raw, y_poisoned_train_raw)
X_train = np.expand_dims(X_train, axis=3)

(is_poison_test, X_poisoned_test_raw, y_poisoned_test_raw) = \
poison_dataset(X_test_raw, y_test_raw, percent_poison, add_modification)
X_test, y_test = preprocess(X_poisoned_test_raw, y_poisoned_test_raw)
X_test = np.expand_dims(X_test, axis=3)

# 訓練データをシャッフルする
n_train = len(y_train)
shuffled_indices = np.arange(n_train)
np.random.shuffle(shuffled_indices)
X_train = X_train[shuffled_indices]
y_train = y_train[shuffled_indices]

In [None]:
# 攻撃対象のモデルを定義する
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), 
                 activation='relu', 
                 input_shape=X_train.shape[1:]))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes, activation='softmax'))
model.compile(loss=categorical_crossentropy, 
              optimizer=Adam(learning_rate=0.01),
              metrics=['accuracy'])

victim_classifier = KerasClassifier(model,
                                    clip_values=(0, 1), 
                                    use_logits=False)
# 汚染されたデータでモデルを訓練する
victim_classifier.fit(X_train, y_train, nb_epochs=30, batch_size=128)

# 汚染されていないデータに対するスコアを表示する
clean_X_test = X_test[is_poison_test == 0]
clean_y_test = y_test[is_poison_test == 0]

clean_preds = victim_classifier.predict(clean_X_test)
acc = np.sum(np.argmax(clean_preds, axis=1)
    == np.argmax(clean_y_test, axis=1)) / len(clean_y_test)
print('\nAccuracy on clean test examples: {}%'.format(acc * 100))

# 汚染されたデータに対するスコアを表示する
poison_X_test = X_test[is_poison_test]
poison_y_test = y_test[is_poison_test]

poison_preds = victim_classifier.predict(poison_X_test)
acc = np.sum(np.argmax(poison_preds, axis=1)
    == np.argmax(poison_y_test, axis=1)) / len(poison_y_test)
print('\nAccuracy on poisoned test examples: {}%'.format(acc * 100))

# データ全体に対するスコアを表示する
clean_correct = np.sum(np.argmax(clean_preds, axis=1) 
                       == np.argmax(clean_y_test, axis=1))
poison_correct = np.sum(np.argmax(poison_preds, axis=1)
                        == np.argmax(poison_y_test, axis=1))
total_correct = clean_correct + poison_correct
total = len(clean_y_test) + len(poison_y_test)
total_acc = total_correct / total

print("\nOverall accuracy on test examples: {}%".format(total_acc * 100))

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

c = 1 # プロット対象のクラス
i = 0 # 画像の添字

c_idx = np.where(np.argmax(poison_y_test,1) == c)[0][i]

plt.imshow(poison_X_test[c_idx].squeeze())
plt.show()

print('Prediction: {}'.format(np.argmax(poison_preds[c_idx])))

### Activation Clustering

In [None]:
# 防御手法をインポートする
from art.defences.detector.poison import ActivationDefence

defence = ActivationDefence(victim_classifier, X_train, y_train)

# PCAで次元削減を施したのち、2つのクラスタに分割する
report, is_clean_lst = defence.detect_poison(nb_clusters=2,
                                             nb_dims=10,
                                             reduce='PCA')

[clusters_by_class, _] = defence.cluster_activations()

# 指定したクラスのデータのクラスタリング結果をプロットする関数を定義する
def plot_class_clusters(sprites_by_class, n_class, n_clusters):
    for q in range(n_clusters):
        plt.figure(1, figsize=(25,25))
        plt.tight_layout()
        plt.subplot(1, n_clusters, q+1)
        plt.title('Class '+ str(n_class)+ ', Cluster '+ str(q),
                  fontsize=40)
        sprite = sprites_by_class[n_class][q]
        plt.imshow(sprite, interpolation='none')
        
# 訓練データをクラスごとに分割し、
sprites_by_class = defence.visualize_clusters(X_train, save=False)
# ここではクラス1に対するクラスタリング結果をプロットする
plot_class_clusters(sprites_by_class, 1, 2)