Загрузка необходимых библиотек

In [1]:
try:
    import gdown
    import natsort
except:
    !pip install natsort gdown

In [2]:
try:
    import ray
    from ray import tune
except:
    !pip uninstall -y -q pyarrow
    !pip install -q -U ray[all]
    import os
    # os._exit(0)

[K     |████████████████████████████████| 49.7MB 59kB/s 
[K     |████████████████████████████████| 71kB 10.3MB/s 
[K     |████████████████████████████████| 1.0MB 54.4MB/s 
[K     |████████████████████████████████| 81kB 11.8MB/s 
[K     |████████████████████████████████| 81kB 11.5MB/s 
[K     |████████████████████████████████| 3.1MB 52.7MB/s 
[K     |████████████████████████████████| 133kB 52.0MB/s 
[K     |████████████████████████████████| 1.3MB 52.2MB/s 
[K     |████████████████████████████████| 10.1MB 53.4MB/s 
[K     |████████████████████████████████| 61kB 9.4MB/s 
[K     |████████████████████████████████| 122kB 62.5MB/s 
[K     |████████████████████████████████| 51kB 8.2MB/s 
[K     |████████████████████████████████| 1.8MB 51.3MB/s 
[K     |████████████████████████████████| 51kB 8.0MB/s 
[K     |████████████████████████████████| 1.8MB 57.9MB/s 
[K     |████████████████████████████████| 36.4MB 72kB/s 
[K     |████████████████████████████████| 204kB 59.3MB/s 
[K    

In [3]:
import tensorflow as tf
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import keras
from PIL import Image
import cv2
import os
from keras import models
from keras.layers import Conv2D, MaxPooling2D, Dense, Activation, BatchNormalization, Input, Dropout, Flatten
from keras.models import Model
from keras import optimizers
from keras.layers import Lambda
import natsort as ns
import time

Пути к оригинальным подписям, фальшивым и папке с сохранеными весами модели

In [4]:
PATH_ORG = "signatures/full_org"
PATH_FORG ="signatures/full_forg"
checkpoints_path = os.path.abspath("checkpoints/best/")


Загрузка датасета Cedar, если он отсутствует 

In [5]:
if os.path.exists('signatures.zip') is False:
    !gdown https://drive.google.com/uc?id=1PpPVry5TkfGVpbFDkwOMNx7Xew4vscW5
#

Downloading...
From: https://drive.google.com/uc?id=1PpPVry5TkfGVpbFDkwOMNx7Xew4vscW5
To: /content/signatures.zip
254MB [00:02, 125MB/s]


In [6]:
if os.path.exists('signatures') is False:
    !unzip -q -n signatures.zip

Класс, отвечающий за загрузку данных, lazy означает, что данные будут подгружаться по мере необходимости  - это медленее, чем загрузить все сразу, но не занимает места в памяти. Shuffle - перемешивать данные каждую эпоху.

In [7]:



class DataGenerator(keras.utils.Sequence):

    def __init__(self, df, batch_size=32, dim=(155, 220), n_channels=3, shuffle=True, lazy=True):
        self.dim = dim
        self.batch_size = batch_size
        self.df = df
        self.labels = df["label"].to_numpy().astype(np.int32)
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()
        self.lazy = lazy
        if self.lazy is False:
            self.data = [np.empty((df.shape[0], *dim, n_channels), dtype=np.float32),
                         np.empty((df.shape[0], *dim, n_channels), dtype=np.float32)]
            for i in range(df.shape[0]):
                image_1 = cv2.imread(df.iloc[i, 0])
                image_1 = cv2.resize(image_1, (220, 155))
                image_1 = 1-image_1/255.0

                image_2 = cv2.imread(df.iloc[i, 1])
                image_2 = cv2.resize(image_2, (220, 155))
                image_2 = 1-image_2/255.0
                self.data[0][i, :, :, :] = image_1
                self.data[1][i, :, :, :] = image_2

    def __len__(self):
        return int(np.floor(self.df.shape[0] / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # return X, y
        if self.lazy is False:
            x = []
            x.append(self.data[0][indexes, :, :, :])
            x.append(self.data[1][indexes, :, :, :])

            y = self.labels[indexes]

        else:
            rows = [self.df.iloc[k] for k in indexes]
            x, y = self.__data_generation(rows)
        return x, y

    def on_epoch_end(self):
        self.indexes = np.arange(self.df.shape[0])
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, rows):
        x_1 = np.empty((self.batch_size, *self.dim, self.n_channels))
        x_2 = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)

        for i in range(len(rows)):
            image_1 = cv2.imread(rows[i]["image_1"])
            image_1 = cv2.resize(image_1, (220, 155))
            image_1 = np.array(image_1)
            image_2 = cv2.imread(rows[i]["image_2"])
            image_2 = cv2.resize(image_2, (220, 155))
            image_2 = np.array(image_2)
            x_1[i,] = 1 - image_1 / 255.0
            x_2[i,] = 1 - image_2 / 255.0
            y[i] = rows[i]["label"]

        return [x_1, x_2], y





Евклидово расстояние (оно должно быть минмальным у оригиналов подписей и большим у оригинала и фальшивки)

In [8]:
from keras import backend as K


def euclidean_distance2(y):
    return K.sqrt(K.sum(K.square(y[0] - y[1]), axis=-1))


# def euclidean_distance(vects):
#     x, y = vects
#     sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
#     return K.sqrt(K.maximum(sum_square, K.epsilon()))


# def eucl_dist_output_shape(shapes):
#     shape1, shape2 = shapes
#     return (shape1[0], 1)


Перефразируя Харшвардхана Гупту, мы должны помнить, что цель сиамской сети не в том, чтобы классифицировать набор пар изображений, а в том, чтобы различать их. По сути, контрастная потеря - это оценка того, насколько хорошо сиамская сеть различает пары изображений. Более математически обоснованные подробности о контрастных потерях, в статье
http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf


In [9]:
def contrastive_loss(y_true, y_pred):
    margin = 1
    sqaure_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    y_true = K.cast(y_true, y_pred.dtype)
    return K.mean(y_true * sqaure_pred + (1 - y_true) * margin_square)


def accuracy(y_true, y_pred):
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

Данная функция создает сиамскую нейронную сверточную сеть  архитектуры signet.
2 сиамские сети имеют одинаковые веса, после высчитывается евклидово расстояние от их выходов и передается функции потери. Несколько сверточный словек группируют графические признаки (такие как изгибы, завитки и т.д.) сначла в маленькие, потом средние и большие признаки. После 2 полносвязанных слоя являются классификатором, который по заданным признакам старается отличить подделку от оригинала. Слой dropout позволяет уменьшить эффект переобучения.

В стандартной нейронной сети производная, полученная каждым параметром, сообщает ему, как он должен измениться, чтобы, учитывая деятельность остальных блоков, минимизировать функцию конечных потерь. Поэтому блоки могут меняться, исправляя при этом ошибки других блоков. Это может привести к чрезмерной совместной адаптации (co-adaptation), что, в свою очередь, приводит к переобучению, поскольку эти совместные адаптации невозможно обобщить на данные, не участвовавшие в обучении.

Главная идея Dropout — вместо обучения одной DNN обучить ансамбль нескольких DNN, а затем усреднить полученные результаты.

In [10]:
def make_net():
    input = Input(shape=(155, 220, 3))

    conv_1 = Conv2D(filters=96, kernel_size=(11, 11))(input)
    batch_norm_1 = BatchNormalization()(conv_1)
    activation_1 = Activation('relu')(batch_norm_1)
    max_pool_1 = MaxPooling2D(pool_size=(3, 3))(activation_1)

    conv_2 = Conv2D(filters=256, kernel_size=(5, 5))(max_pool_1)
    batch_norm_2 = BatchNormalization()(conv_2)
    activation_2 = Activation('relu')(batch_norm_2)
    max_pool_2 = MaxPooling2D(pool_size=(3, 3))(activation_1)

    dropout_1 = Dropout(rate=0.3)(max_pool_2)

    conv_3_a = Conv2D(filters=384, kernel_size=(3, 3))(dropout_1)
    activation_3_a = Activation('relu')(conv_3_a)
    conv_3_b = Conv2D(filters=256, kernel_size=(3, 3))(activation_3_a)
    activation_3_b = Activation('relu')(conv_3_b)
    max_pool_3 = MaxPooling2D(pool_size=(3, 3))(activation_3_b)

    # dropout_22 = Dropout(rate=0.3)(max_pool_3)
    # conv_4_a = Conv2D(filters=384, kernel_size=(3, 3))(dropout_22)
    # activation_4_a = Activation('relu')(conv_4_a)
    # conv_4_b = Conv2D(filters=512, kernel_size=(3, 3))(activation_4_a)
    # activation_4_b = Activation('relu')(conv_4_b)
    # max_pool_4 = MaxPooling2D(pool_size=(2, 2))(activation_4_b)

    dropout_2 = Dropout(rate=0.3)(max_pool_3)

    flat_1 = Flatten()(dropout_2)
    fc_1 = Dense(units=1024, activation='relu')(flat_1)
    dropout_3 = Dropout(rate=0.5)(fc_1)
    fc_2 = Dense(units=128, activation='relu')(dropout_3)



    input_a = Input(shape=(155, 220, 3))
    input_b = Input(shape=(155, 220, 3))

    base_net = Model(input, fc_2)
    processed_a = base_net(input_a)
    processed_b = base_net(input_b)

    distance = Lambda(euclidean_distance2)([processed_a, processed_b])
    # distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])
    model = Model([input_a, input_b], distance)
    return base_net,model


Подготовка данных

In [11]:
params = {
    'dim': (155, 220),
    'batch_size': 16,
    # 'batch_size': 64,
    'n_channels': 3,
    'shuffle': False
}


def get_data(path_org, path_forg, test_size=0.3, random_state=0, lazy=True, ext_data=0):
    org = ns.natsorted(os.listdir(path_org), alg=ns.IGNORECASE)
    forg = ns.natsorted(os.listdir(path_forg), alg=ns.IGNORECASE)
    org = [os.path.join(PATH_ORG, i) for i in org if i.endswith('.png')]
    forg = [os.path.join(PATH_FORG, i) for i in forg if i.endswith('.png')]

    org = [os.path.abspath(i) for i in org]
    forg = [os.path.abspath(i) for i in forg]

    samples = 24
    ppl = len(org) // samples

    data = []
    for i in range(ppl):
        tr = np.array([[org[j], org[j], 1] for j in range(i * samples, (i + 1) * samples)])
        tr[:, 1] = np.concatenate([tr[:-12, 1], tr[-12:, 1]])
        tr[:, 1] = np.random.permutation(tr[:, 1])
        fl = np.array([[org[j], forg[j], 0] for j in range(i * samples, (i + 1) * samples)])

        for j in range(ext_data):
            rand2 = np.random.choice(
                np.concatenate([np.arange(0, i * samples), np.arange((i + 1) * samples, ppl * samples)]),
                samples, replace=False)
            tr2 = np.array([[org[j], org[rand2[j % samples]], 0] for j in range(i * samples, (i + 1) * samples)])
            data.append(tr2)

        data.append(tr)
        data.append(fl)

    df = pd.DataFrame(np.array(data).reshape(-1, 3), columns=["image_1", "image_2", "label"])
    df = df.reindex(np.random.permutation(df.index))

    ds_train, ds_val = train_test_split(df, test_size=test_size, random_state=random_state)

    train_datagen = DataGenerator(ds_train, **params, lazy=lazy)
    validation_datagen = DataGenerator(ds_val, **params, lazy=lazy)
    return train_datagen, validation_datagen


Подготовка датасета (так как хватает памяти, загрузим его "жадно", т.е. полностью в оперативную память
) 

In [12]:
train_datagen, validation_datagen = get_data(PATH_ORG, PATH_FORG,
                                             test_size=0.3, random_state=0, lazy=False, ext_data=0)

In [13]:
x1 = (train_datagen.data[0], train_datagen.data[1])
y1 = train_datagen.labels
#
x2 = (validation_datagen.data[0], validation_datagen.data[1])
y2 = validation_datagen.labels



Используем мощную библиотеку для подстройки гиперпараметров - ray tune. Библиотека позвоялет построить минимизировать время подбора параметров, умно отсекая бесперспективные варианты. Может работать как на кластере в несколько машин, но в нашем случае будет только серия последовательных экспериментов)

Далее опишем функцию обучения, которая будет сообщает ray о текущих результатах точности на тестовой и обучающейся выборке.
В качестве метода оптимизации выберем моментный метод Адам, который благодаря накоплению импульса реже застревает в локальных экстремумах, а также увеличению влияние редко встречающихся объектов.
https://habr.com/ru/post/318970/

Авторы метода https://arxiv.org/abs/1412.6980 советуют принимать значения по-умолчанию как lr =0.001, b1=0.9,b2=0.999. Построим пространство поиска около этихзначений. А также инциализиуерм сид случайно (мультистарт), таким образом постараемся подобрать лучшие параметры оптимизатора. Также будем сохранять веса лучшей модели (для дальнейшего использования)

In [14]:



import tensorflow as tf
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler, MedianStoppingRule, HyperBandScheduler
from ray.tune.integration.keras import TuneReportCallback


class TuneCheckpoint(tf.keras.callbacks.Callback):
    """Tune Callback for Keras."""

    def __init__(self, metric='val_accuracy', logs=None):
        self.best_metric=0
        self.metric=metric
        super(TuneCheckpoint, self).__init__()

    def on_epoch_end(self, epoch, logs=None):
        if logs[self.metric]>self.best_metric:
            self.best_metric=logs[self.metric]
            with tune.checkpoint_dir(epoch) as checkpoint_dir:
                path = os.path.join(checkpoint_dir, "checkpoint")
                self.model.save(path)
                # model.save(path)


def train_signet(config,epochs, x1, x2, y1, y2,checkpoints_path):
    # https://github.com/tensorflow/tensorflow/issues/32159
    import tensorflow as tf

    input_a = Input(shape=(155, 220, 3))
    input_b = Input(shape=(155, 220, 3))

    base_net,model = make_net()
    optimizer = optimizers.Adam(learning_rate=config['lr'])

    model.compile(loss=contrastive_loss, optimizer=optimizer, metrics=[accuracy])
    # train_datagen,validation_datagen=get_data(PATH_ORG,PATH_FORG)

    # model.fit(train_datagen, validation_data=validation_datagen,verbose=2, epochs=5,
    #           callbacks=[TuneCheckpoint(),
    #                TuneReportCallback({
    #                    "val_accuracy": "val_accuracy",
    #                    "train_accuracy": "accuracy",
    #                })])
    model.fit(x1, y1, batch_size=32, validation_data=(x2, y2),epochs=epochs, callbacks=[
            # TuneCheckpoint(),
            TuneReportCallback({
                "val_accuracy": "val_accuracy",
                "train_accuracy": "accuracy",
            }),
            tf.keras.callbacks.ModelCheckpoint(
                filepath=checkpoints_path,
                monitor='val_accuracy',
                mode='max',
                save_best_only=True,
                save_weights_only=True)
            ],
            verbose=2)
    



Запустим tensoboard, обы в режиме реального времени отслеживать процесс оптимизации. Совет - включить автообновление в шестеренке и выбрать TIME SERIES и  HPARAMS пункты меню.


In [None]:
%load_ext tensorboard
%tensorboard --logdir  ~/ray_results

В качестве планировщика выбран Hyperband. HyperBand для поиска наилучших конфигураций предлагает часто выполнять последовательное деление пополам с различными бюджетами.  epochs параметр - количество эпох в каждом испытании.  num_samples - количество испытаний (новых комбинаций гиперпарамтеров). Чем больше, тем лучше и тем дольше.

In [None]:

if __name__ == "__main__":
    if ray.is_initialized() is False:
        ray.init(num_cpus=4,num_gpus=1, local_mode=False)
    sched = AsyncHyperBandScheduler(
        time_attr="training_iteration", max_t=400, grace_period=20)

    analysis = tune.run(
        tune.with_parameters(train_signet,epochs=12, x1=x1, x2=x2, y1=y1,
                             y2=y2,checkpoints_path=checkpoints_path),
        name="signet",
        scheduler=HyperBandScheduler(),
        metric="val_accuracy",
        mode="max",
        stop={
            "val_accuracy": 10,
            "training_iteration": 15
        },
        num_samples=6,
        resources_per_trial={
            "cpu": 4,
            "gpu": 1
        },
        config={
            "lr": tune.uniform(0.0005, 0.0015),
            "beta_1": tune.uniform(0.89, 0.9),
            "beta_1": tune.uniform(0.998, 0.9995),
  
        },
        keep_checkpoints_num=1,
        checkpoint_score_attr="val_accuracy",
    )


Лучший набор гиперпарметров

In [None]:
print("Best hyperparameters found were: ", analysis.best_config)

Лучший результат

In [None]:
analysis.best_result

Загрузим веса модели для проверки.

In [None]:
from keras.models import load_model

_,loaded_model = make_net()
loaded_model.load_weights(checkpoints_path)

In [None]:
org = ns.natsorted(os.listdir(PATH_ORG), alg=ns.IGNORECASE)
forg = ns.natsorted(os.listdir(PATH_FORG), alg=ns.IGNORECASE)
org = [os.path.join(PATH_ORG, i) for i in org if i.endswith('.png')]
forg = [os.path.join(PATH_FORG, i) for i in forg if i.endswith('.png')]

org = [os.path.abspath(i) for i in org]
forg = [os.path.abspath(i) for i in forg]


Должно быть большое значение - подпись и фальшивка

In [None]:
im_1 = cv2.imread(org[0])
im_2 = cv2.imread(forg[5])
im_1 = cv2.resize(im_1,(220,155))
im_2 = cv2.resize(im_2,(220,155))
im_1 =1- im_1/255.0
im_2 = 1-im_2/255.0
im_1 = np.expand_dims(im_1,axis=0)
im_2 = np.expand_dims(im_2,axis=0)
y_pred = loaded_model.predict([im_1,im_2])
y_pred

Сохраним веса в гугл диск

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!rm -rf '/content/drive/MyDrive/Colab Notebooks/Signet/best_model'

In [None]:
!cp -R '/content/checkpoints' '/content/drive/MyDrive/Colab Notebooks/Signet/best_model'