import modules and increase gpu growth

In [None]:
import cv2
import numpy as np
import random


import os
from matplotlib import pyplot as plt

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,Conv2D,Dense,MaxPooling2D,Input,Flatten

In [None]:
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu,True)

In [None]:
pos_path=os.path.join('data','data','positive')
neg_path=os.path.join('data','data','negative')
anc_path=os.path.join('data','data','anchor')

In [None]:
import shutil

zip_path = "data.zip"
extract_to = "data"

shutil.unpack_archive(zip_path, extract_to)
print("Unzipped successfully")


Unzipped successfully


dataset augmentation part


In [None]:
def aug(img):
    data=[]
    for i in range(1):
        img = tf.image.stateless_random_brightness(img,max_delta=0.02,seed=(1,2))
        img = tf.image.stateless_random_contrast(img,lower=0.5,upper=1,seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img,seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img,min_jpeg_quality=70,max_jpeg_quality=100,seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img,lower=0.8,upper=1,seed=(np.random.randint(100),np.random.randint(100)))

        data.append(img)
    return data

In [None]:
import uuid
def aug_twin(img_path):

    img=cv2.imread(img_path)
    aug_img=aug(img)
    for i in (aug_img):
        cv2.imwrite(os.path.join(pos_path, f"{uuid.uuid1()}.jpg"),i.numpy())

In [None]:
import uuid
def aug_twin2(img_path):

    img=cv2.imread(img_path)
    aug_img=aug(img)
    for i in (aug_img):
        cv2.imwrite(os.path.join(anc_path, f"{uuid.uuid1()}.jpg"),i.numpy())

In [None]:
for img_path in (os.listdir(pos_path)):
    aug_twin(os.path.join(pos_path,img_path))

In [None]:
for img_path in (os.listdir(anc_path)):
    aug_twin2(os.path.join(anc_path,img_path))

In [None]:
anchor_imgs=tf.data.Dataset.list_files(anc_path+'/*.jpg')
positive_imgs=tf.data.Dataset.list_files(pos_path+'/*.jpg')
negative_imgs=tf.data.Dataset.list_files(neg_path+'/*.jpg')

train and test dataset creation

In [None]:
anchor=anchor_imgs.take(950)
positive=positive_imgs.take(950)
negative=negative_imgs.take(950)


In [None]:
def preprocessing(file_path):
    #get binary image
    bin_img=tf.io.read_file(file_path)
    #convert to jpeg
    img=tf.io.decode_jpeg(bin_img)
    img = tf.image.central_crop(img, 0.7)
    img=tf.image.resize(img,(100,100))
    #scaling
    img=img/255.0
    return img

In [None]:
positives=tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives=tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
print(len(negatives))
data=positives.concatenate(negatives)
len(data)

950


1900

In [None]:
def preprocessing_twin(anch_file,test_file,num):
    return (preprocessing(anch_file),preprocessing(test_file),num)

In [None]:
data=data.map(preprocessing_twin)
data=data.cache()
data=data.shuffle(buffer_size=10000)

In [None]:
train_data=data.take(round(len(data)*.7))
print(len(train_data))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)

1330


In [None]:
test_data=data.skip(round(len(data)*.7))
test_data=data.take(round(len(data)*.3))
print(len(test_data))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

570


embedding layer to convert from image to tensor

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3),name='input_image')
    c1=Conv2D(64,(10,10),activation='relu')(inp)
    m1=MaxPooling2D(64,(2,2),padding='same')(c1)

    c2=Conv2D(128,(7,7),activation='relu')(m1)
    m2=MaxPooling2D(64,(2,2),padding='same')(c2)

    c3=Conv2D(128,(4,4),activation='relu')(m2)
    m3=MaxPooling2D(64,(2,2),padding='same')(c3)

    c4=Conv2D(256,(4,4),activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096,activation='sigmoid')(f1)
    return Model(inputs=inp,outputs=x,name='embedding')

In [None]:
class L1dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
    def call(self, input_embedding,val_embedding):
        return -tf.math.abs(input_embedding-val_embedding)
    def get_config(self):
        return super().get_config()

In [None]:
distance=L1dist()

In [None]:
def siamese_network():
    embedding=make_embedding()
    input_img=Input(shape=(100,100,3),name='input_image')
    val_img=Input(shape=(100,100,3),name='Validation_image')
    e1 = embedding(input_img)
    e2 = embedding(val_img)

    distance = L1dist()(e1,e2)
    clasiifier= Dense(1,activation="sigmoid")(distance)

    return Model(inputs=[input_img,val_img],outputs=clasiifier,name='siamese_network')

In [None]:
siamese_model=siamese_network()

loss function and optimizers

In [None]:
bin_cross_ent=tf.losses.BinaryCrossentropy()
opt=tf.optimizers.Adam(1e-4)

In [None]:
checkpoint_dir='./checkpoint_folder'
os.makedirs(checkpoint_dir, exist_ok=True)
checkpoint_prefix=os.path.join(checkpoint_dir,'ckpt')
checkpoint=tf.train.Checkpoint(opt=opt,siamese_model=siamese_model)

In [None]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        x=batch[:2] # input image and anchor image
        y_true=batch[2] # label
        y_pred=siamese_model(x,training=True)
        loss = bin_cross_ent(y_true,y_pred)

    print(loss)

    grad=tape.gradient(loss,siamese_model.trainable_variables) # gradient for loss calculation
    opt.apply_gradients(zip(grad,siamese_model.trainable_variables)) # calculate weights and update the model
    return loss # return loss

In [None]:
from tensorflow.keras.metrics import Precision,Recall

In [None]:
def train(data,EPOCH):
    for epoc in range(1,EPOCH+1): # number of epochs
        r=Recall()    # evaluation metric
        p=Precision()
        print('/n epochs : {}/{}'.format(epoc,EPOCH) )
        progbar=tf.keras.utils.Progbar(len(data)) # progress bar
        # loop through each batch
        for idx,batch in enumerate(data):
            loss=train_step(batch) # model calling
            x=batch[:2]
            ypred=siamese_model.predict(x)
            r.update_state(batch[2],ypred)
            p.update_state(batch[2],ypred)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        if (epoc%10==0):
            checkpoint.save(file_prefix=checkpoint_prefix)


In [None]:
EPOCH=40

In [None]:
train(train_data,EPOCH)

/n epochs : 1/40
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 284ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 255ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 246ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 270ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 293ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 260ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 259ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 254ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 259ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 250ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 232ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━

KeyboardInterrupt: 

In [None]:
siamese_model.save('siamesemodel_BCE.keras')