In [1]:
from tensorflow.keras.initializers import HeUniform, GlorotNormal
from tensorflow.keras.layers import GlobalAveragePooling2D
from keras.layers import LeakyReLU

In [2]:
from keras.regularizers import L2

In [3]:
import os
from itertools import combinations, product
from random import sample, shuffle, seed

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.math import reduce_sum, square, reduce_mean, maximum, sqrt
from tensorflow import random
from tensorflow.keras.utils import Sequence
from tensorflow.keras.optimizers import Adam, RMSprop
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate, Dropout, BatchNormalization
from keras import layers
from tensorflow.keras import optimizers
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.models import Model, Sequential
from keras.applications import resnet
from keras.callbacks import TensorBoard

from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import Concatenate
from keras.layers.core import Lambda, Flatten, Dense
from keras.initializers import glorot_uniform
from keras.applications import resnet


import numpy as np
import cv2
from math import ceil

In [4]:
np.random.seed(42)
random.set_seed(42)

img_h, img_w = 155, 220

dirs_list=os.listdir("E:\Academics\Project\BHSig260-Hindi\BHSig260-Hindi")

path="E:\Academics\Project\BHSig260-Hindi\BHSig260-Hindi\\"

In [5]:
def get_dataset(from_dir, to_dir):
    org_sign=[]
    forg_sign=[]
    for directory in dirs_list[from_dir:to_dir]:
        images = os.listdir(path+directory)
        images.sort()
        images = [directory+'\\'+x for x in images]
        forg_sign.append(images[:30])
        org_sign.append(images[30:])

    data=[]
    for i in range(len(org_sign)):
        j=0
        for signs in list(map(list,combinations(org_sign[i],2)))[:170]:
            data.append((*signs, forg_sign[i][j%30]))
            j+=1
    del org_sign, forg_sign, images
    return data

In [6]:
X_train=get_dataset(0, 110)
X_valid=get_dataset(110, 135)
X_test=get_dataset(135, 160)

In [7]:
import logging
logging.basicConfig(filename="newfile.log",format='%(asctime)s %(message)s',filemode='w')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

In [8]:
class SignatureSequence(Sequence):
    
    def __init__(self, X, batch_size, dim):
        self.X=X
        self.dim = dim
        self.batch_size = batch_size
        

    def __len__(self):
        return ceil(len(self.X) / self.batch_size)-1

    def __getitem__(self, idx):
        batch_X = self.X[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        part_1 = np.empty((self.batch_size, *self.dim,3))
        part_2 = np.empty((self.batch_size, *self.dim,3))
        part_3 = np.empty((self.batch_size, *self.dim,3))
        
        for i in range(len(batch_X)):
            part_1[i,]=self.image_preprocessing(batch_X[i][0])
            part_2[i,]=self.image_preprocessing(batch_X[i][1])
            part_3[i,]=self.image_preprocessing(batch_X[i][2])
            batch_X_pro=(part_1 ,part_2, part_3)
        return batch_X_pro

    def image_preprocessing(self, signature):
        signature = cv2.imread(path+signature)
        resized_signature = cv2.resize(signature,(220,155))
        blur = cv2.GaussianBlur(resized_signature,(3,3),-125)
        gray_signature=cv2.cvtColor(blur, cv2.COLOR_BGR2GRAY)
        ret,thr_img = cv2.threshold(gray_signature, 0, 255, cv2.THRESH_OTSU)
        normalized_signature=1-(thr_img/255)
        rgb_batch = np.repeat(normalized_signature[..., np.newaxis], 3, -1)
        #signature_expanded = normalized_signature[:, :, np.newaxis]
        return np.array(rgb_batch)
    def on_epoch_end(self):        
        np.random.shuffle(self.X)
        logger.debug("Called"+str(self.X[0]))

In [9]:
input_shape=(155,220,3)
base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=input_shape, include_top=False
)

flatten = GlobalAveragePooling2D()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu", kernel_regularizer=L2(0.001))(flatten)
dense1 = Dropout(0.2)(dense1)
dense1 = layers.BatchNormalization()(dense1)

dense2 = layers.Dense(256, activation="relu", kernel_regularizer=L2(0.001))(dense1)
dense2 = Dropout(0.2)(dense2)
dense2 = layers.BatchNormalization()(dense2)

output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

In [10]:
from keras import backend as K

In [11]:
class DistanceLayer(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        
        sum_square1 = tf.math.reduce_sum(tf.math.square(anchor - positive), axis=1, keepdims=True)
        ap_distance = tf.math.sqrt(tf.math.maximum(sum_square1, tf.keras.backend.epsilon()))
        
        sum_square2 = tf.math.reduce_sum(tf.math.square(anchor - negative), axis=1, keepdims=True)
        an_distance = tf.math.sqrt(tf.math.maximum(sum_square2, tf.keras.backend.epsilon()))
        
        
        '''ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)'''
        
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=input_shape)
positive_input = layers.Input(name="positive", shape=input_shape)
negative_input = layers.Input(name="negative", shape=input_shape)

distances = DistanceLayer()(
    embedding(anchor_input),
    embedding(positive_input),
    embedding(negative_input),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [22]:
class SiameseModel(Model):
    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)
        square_pred = tf.math.square(an_distance)
        margin_square = tf.math.square(tf.math.maximum(self.margin - (an_distance), 0))
        loss = tf.math.reduce_mean((1 - ap_distance) * square_pred + (ap_distance) * margin_square)      
        
        '''loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)'''
        return loss

    @property
    def metrics(self):
        return [self.loss_tracker]

In [23]:
from tensorflow.keras import metrics

In [28]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))

In [29]:
nan_terminate=tf.keras.callbacks.TerminateOnNaN()
early_stp=EarlyStopping(patience=2, restore_best_weights=True)
checkpoint_cb=ModelCheckpoint('shallow_best_model.h5', save_best_only=True, save_weights_only=True)

In [31]:
dim=(155,220)
batch_size=64
train_batch=SignatureSequence(np.array(X_train), batch_size, dim)
valid_batch=SignatureSequence(np.array(X_valid),batch_size, dim)
history=siamese_model.fit(train_batch, validation_data=valid_batch, epochs=3, steps_per_epoch=len(X_train)//batch_size, validation_steps=len(X_valid)//batch_size, callbacks=[checkpoint_cb, nan_terminate, early_stp])

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [25]:
dim=(155,220)
batch_size=64

In [33]:
test_batch=SignatureSequence(np.array(X_test),batch_size, dim)

In [34]:
siamese_model.evaluate(test_batch)



8.780429197940975e-05

In [49]:
siamese_model.evaluate(test_batch)



0.09352517873048782

In [32]:
siamese_model.evaluate(test_batch)



0.003445808310061693

In [47]:
import matplotlib.pyplot as plt

In [39]:
sample = test_batch[0]
#visualize(*sample)
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(anchor),
    embedding(positive),
    embedding(negative),
)

cosine_similarity = metrics.CosineSimilarity()

positive_similarity = euclidean_distance(anchor_embedding, positive_embedding)
#positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print(positive_similarity.numpy())

negative_similarity = euclidean_distance(anchor_embedding, negative_embedding)
#negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print(negative_similarity.numpy())

0.02135119
0.024533257


In [40]:
post=[]
neg=[]
for anchor, positive, negative in test_batch:
    anchor_embedding, positive_embedding, negative_embedding = (embedding(anchor), embedding(positive), embedding(negative))
    post.append(euclidean_distance(anchor_embedding, positive_embedding).numpy())
    neg.append(euclidean_distance(anchor_embedding, negative_embedding).numpy())

In [41]:
min(neg)

0.021137705

In [42]:
max(post)

0.026107714

In [46]:
result=[]
for x, y in zip(post, neg):
    result.append(str(x-y)[0]=='-')

In [49]:
result.count(True)

58

In [50]:
result.count(False)

8

In [54]:
58/66

0.8787878787878788

In [36]:
def euclidean_distance(x, y ):
    sum_square = tf.math.reduce_sum(tf.math.square(x - y), axis=1, keepdims=True)
    return tf.math.reduce_sum(tf.math.sqrt(tf.math.maximum(sum_square, tf.keras.backend.epsilon())))

In [48]:
def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])

In [56]:
siamese_model.save_weights('resnet_triplet_best.h5')

In [None]:
model.load_weights('temp.h5')

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_curve, plot_roc_curve
import matplotlib.pyplot as plt

In [None]:
def map_arr(arr):
    return list(map(int, arr)) 
def acc_th(th=0.1, step=0.1):
    global preds
    accs={}
    for i in range(0, 40, 1):
        th+=step
        accs[th]=(accuracy_score(map_arr(np.array(y_test)[:2816]==0), map_arr(preds[:2816]<th)))
    best_th=max(accs, key=accs.get)
    fpr, tpr, thresholds=roc_curve( map_arr(np.array(y_test)[:2816]==0), map_arr(preds[:2816]<best_th))
    plt.plot(fpr)
    plt.plot(1-tpr)
    plt.show()
    return "Acc: "+str(max(accs.values())), 'Best Threshold: '+str(best_th)