## Импорт библиотек для работы с байесовской нейросетью

In [1]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
import tensorflow_probability as tfp
# import idx2numpy
from tensorflow.keras.callbacks import Callback
#tf.config.run_functions_eagerly(False)
learning_rate = 0.001
num_epochs = 3
batch_size = 128
num_monte_carlo = 50
#from BBLENET5 import BLENET5

In [2]:
pwd

'/home/bond005/jupyter-notebooks'

In [3]:
from numpy.random import seed
seed(1)
tf.random.set_seed(2)

## Выделение видеопамяти на GPU

In [4]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

## Предобработка данных

In [5]:
IMAGE_SHAPE = [28, 28, 1]
NUM_TRAIN_EXAMPLES = 60000
NUM_HELDOUT_EXAMPLES = 10000
NUM_CLASSES = 10

In [6]:
(x_train, y_train), (x_test, y_test) = mnist.load_data() ## x -картинки, y - метки, (60000,28,28)
x_train = x_train.reshape((60000,28,28,1))
x_train = x_train.astype('float32')/255
x_test = x_test.reshape((10000,28,28,1))
x_test = x_test.astype('float32')/255
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
tfd = tfp.distributions

In [7]:
# class CustomCallback(Callback):
    
#     def on_train_batch_begin(self, batch, logs=None):
#         keys = list(logs.keys())
#         print("...Training: start of batch {}; got log keys: {}".format(batch, keys))

#     def on_train_batch_end(self, batch, logs=None):
#         keys = list(logs.keys())
#         print("...Training: end of batch {}; got log keys: {}".format(batch, keys))


## Построение байесовской нейросети (архитектура LeNet-5)

In [8]:
kl_divergence_function = (lambda q, p, _: tfd.kl_divergence(q, p) /  tf.cast(30.0 * NUM_TRAIN_EXAMPLES, dtype=tf.float32))

In [9]:
# model = tf.keras.models.Sequential()
# model.add(tfp.layers.Convolution2DFlipout(6, kernel_size=5, padding='SAME',
#                                           kernel_divergence_fn=kl_divergence_function, bias_divergence_fn = kl_divergence_function,
#                                           activation='relu',input_shape=(28,28,1)))
# #model.add(tf.keras.layers.BatchNormalization())
# model.add(tf.keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2],padding='SAME'))
# model.add(tfp.layers.Convolution2DFlipout(16, kernel_size=5, padding='SAME',
#                                           kernel_divergence_fn=kl_divergence_function,bias_divergence_fn = kl_divergence_function,
#                                           activation='relu'))
# #model.add(tf.keras.layers.BatchNormalization())
# model.add(tf.keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2],padding='SAME'))
# model.add(tfp.layers.Convolution2DFlipout(120, kernel_size=5, padding='SAME',
#                                           kernel_divergence_fn=kl_divergence_function,bias_divergence_fn = kl_divergence_function,
#                                           activation='relu'))
# #model.add(tf.keras.layers.BatchNormalization())
# model.add(tf.keras.layers.Flatten())
# model.add(tfp.layers.DenseFlipout(84, kernel_divergence_fn=kl_divergence_function,bias_divergence_fn = kl_divergence_function,activation=tf.nn.relu))
# model.add(tfp.layers.DenseFlipout(NUM_CLASSES, kernel_divergence_fn=kl_divergence_function,bias_divergence_fn = kl_divergence_function,
#                                   activation=tf.nn.softmax))


In [10]:
model = tf.keras.Sequential([
tfp.layers.Convolution2DFlipout(6, kernel_size=5, padding='SAME',
                                kernel_divergence_fn=kl_divergence_function,
                                bias_divergence_fn = kl_divergence_function,
                                kernel_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                bias_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    is_singular=True,
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                activation='relu',input_shape=(28,28,1)),
tf.keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2],padding='SAME'),
tfp.layers.Convolution2DFlipout(16, kernel_size=5, padding='SAME',
                                kernel_divergence_fn=kl_divergence_function,
                                bias_divergence_fn = kl_divergence_function,
                                kernel_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                bias_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    is_singular=True,
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2],padding='SAME'),
tfp.layers.Convolution2DFlipout(120, kernel_size=5, padding='SAME',
                                kernel_divergence_fn=kl_divergence_function,
                                bias_divergence_fn = kl_divergence_function,
                                kernel_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                bias_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                                    is_singular=True,
                                    loc_initializer=RandomNormal(stddev=0.1, seed=42),
                                    untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                                ),
                                activation='relu'),
tf.keras.layers.Flatten(),
tfp.layers.DenseFlipout(84, kernel_divergence_fn=kl_divergence_function,
                        bias_divergence_fn = kl_divergence_function,
                        kernel_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                            loc_initializer=RandomNormal(stddev=0.1, seed=42),
                            untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                        ),
                        bias_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                            is_singular=True,
                            loc_initializer=RandomNormal(stddev=0.1, seed=42),
                            untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                        ),
                        activation='relu'),
tfp.layers.DenseFlipout(NUM_CLASSES, kernel_divergence_fn=kl_divergence_function,
                        bias_divergence_fn = kl_divergence_function,
                        kernel_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                            loc_initializer=RandomNormal(stddev=0.1, seed=42),
                            untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                        ),
                        bias_posterior_fn=tfp.layers.default_mean_field_normal_fn(
                            is_singular=True,
                            loc_initializer=RandomNormal(stddev=0.1, seed=42),
                            untransformed_scale_initializer=RandomNormal(mean=-3.0, stddev=0.1, seed=42)
                        ),
                        activation='softmax')])

Instructions for updating:
Please use `layer.add_weight` method instead.


In [11]:
#k = model.losses[1]
#sum(model.losses)

In [12]:
optimizer = tf.keras.optimizers.Adam(learning_rate)

In [13]:
model.compile(optimizer, loss='categorical_crossentropy',metrics=['accuracy'])

In [14]:
#sum(model.losses)

## Обучаем нашу нейросеть

In [None]:
model.fit(x_train, y_train, batch_size=batch_size, epochs=num_epochs)

Epoch 1/3
Epoch 2/3

In [None]:
test_loss,test_acc=model.evaluate(x_test,y_test)
test_acc

In [None]:
# emnist_path = 'C:/Users/kamen/DIPLOM/'
# #X-test - буквы для теста, Y-test - метки букв
# X_test = idx2numpy.convert_from_file(emnist_path + 'emnist-letters-test-images-idx3-ubyte')
# Y_test = idx2numpy.convert_from_file(emnist_path + 'emnist-letters-test-labels-idx1-ubyte')
# X_test = X_test.reshape((20800,28,28,1))
# X_test = X_test.astype('float32')/255
# X_test=X_test[:10000,:,:,:]
# Y_test=to_categorical(Y_test)
# tf.shape(X_test)
# X_test = np.rot90(X_test,1,axes=(-2,-3))
# X_test=X_test[:,:,::-1,:]
# validation_set=np.concatenate([X_test,x_test])


In [None]:
array = model.predict(x_test[:,:,:,:])[0]

In [None]:
picture = x_test[24,:,:,:].reshape((1,28,28,1))

## Функция сэмплирования N раз полученных массивов вероятностей

In [None]:
 def Sampling(test_set):
    print(' ... Running monte carlo inference')
    probs = tf.stack([model.predict(test_set)
        for _ in range(num_monte_carlo)],axis=0)
    mean_probs = tf.reduce_mean(probs, axis=0)
    return mean_probs


# Начало лишних в данный момент функций (определение порога распознавания - в данный момент лишние и закоменчены)

In [None]:
# mean_probability=Sampling(validation_set)

In [None]:
# Predict_set_letters_label = np.zeros(len(validation_set))#Поиск порога отказа
# Predict_set_letters_prob = np.zeros(len(validation_set))
# for i in range(len(validation_set)):
#     Predict_set_letters_label[i] = np.argmax(mean_probability[i,:])
#     Predict_set_letters_prob[i]=np.max(mean_probability[i,:])

In [None]:
# failure_threshold=tf.reduce_mean(Predict_set_letters_prob,axis=0)
# failure_threshold=tf.reshape(failure_threshold,[]).numpy()

In [None]:
# print(failure_threshold,"Порог минимальной вероятности")

In [None]:
# def Predict_a_picture(picture_to_predict):
#     array_probability = Sampling(picture_to_predict)
#     probability = np.max(array_probability)
#     label = np.argmax(array_probability)  
#     if (probability<failure_threshold):
#         print("This is not a digit")
#     else:
#         print("Это цифра",labels,"с вероятностью",probability)
#     return 1
    

# Конец лишних функций - далее работаем с текущей задачей

### Функция построения состязательной атаки для конкретной картинки при помощи FGSM метода

In [None]:
loss_object = tf.keras.losses.CategoricalCrossentropy()
def create_adversarial_pattern(input_image):
    predict_array = Sampling(input_image)
    number_index =np.argmax(predict_array)
    input_image=tf.convert_to_tensor(input_image)
    with tf.GradientTape() as tape:
        tape.watch(input_image)
        prediction = model(input_image)
        # эту строчку нужно сэмплировать
        print((prediction))
        label = tf.one_hot(number_index,predict_array.shape[-1])
        label = tf.reshape(label, (1, predict_array.shape[-1]))
        loss = loss_object(label, prediction)#+kl_divergence(...)
    gradient = tape.gradient(loss, input_image)
    signed_grad = tf.sign(gradient)
    signed_grad = signed_grad.numpy()
    return signed_grad

## Построение гистограммы вероятностей + изображение под действием состязательной атаки

In [None]:
def build_adv_final(img):
    indices= list(range(0,10))
    attack = create_adversarial_pattern(img)
    for i in range(25):
        data_array = Sampling(np.clip(img+0.01*i*attack,a_max = 1.,a_min = 0.))[0]
        fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10,5))
        axes[0].bar(indices,data_array)
        axes[0].set_title('Атака FGSM - методом, eps =%s '%(0.01*(i)))
        axes[0].set_xlabel('Метка класса')
        axes[0].set_ylabel('Вероятность')
        axes[0].set_ylim(0,1.0)
        image = np.clip(img[0,:,:,:]+0.01*i*attack[0,:,:,:],a_max = 1., a_min =0.)
        image = tf.clip_by_value(image, -1, 1)
        axes[1].imshow(image,cmap='gray')
        plt.show()
        print(np.max(data_array))

In [None]:

test_pic = x_test[159,:,:,:]
test_pic = test_pic.reshape((1,28,28,1))
build_adv_final(test_pic)

In [None]:
# for i in range(25):
#     attacked_picture,data_array=adversarial_attack_impact(img,5,(i+1)*0.01)
#     fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10,5))
#     axes[0].bar(indices,data_array[0])
#     axes[0].set_title('Атака FGSM - методом, eps =%s '%(0.01*(i+1)))
#     axes[0].set_xlabel('Метка класса')
#     axes[0].set_ylabel('Вероятность')
#     axes[0].set_ylim(0,1.0)
#     axes[1].imshow(attacked_picture[0,:,:,0])
#     plt.show()
    
