In [221]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [222]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [261]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [223]:
POS_Path = os.path.join('data', 'positive')
NEG_Path = os.path.join('data','negative')
ANC_Path = os.path.join('data','anchor')

In [224]:
os.makedirs(POS_Path)
os.makedirs(NEG_Path)
os.makedirs(ANC_Path)

In [225]:
#http://vis-www.cs.umass.edu/lfw/
!tar -xf lfw.tgz

In [226]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join(os.path.join('lfw',directory,file))
        NEW_PATH = os.path.join(NEG_Path,file)
        os.replace(EX_PATH, NEW_PATH)

In [227]:
import uuid

In [228]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame=frame[180:180+250,200:200+250,:]

    #collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imgname=os.path.join(ANC_Path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)
    #collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname=os.path.join(POS_Path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)
    cv2.imshow('Image Collection', frame)
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
plt.imshow(frame)

In [230]:
anchor = tf.data.Dataset.list_files(ANC_Path+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_Path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_Path+'\*.jpg').take(300)

In [231]:
dir_test = anchor.as_numpy_iterator()

In [232]:
dir_test.next()

b'data\\anchor\\e121b0b0-4a8f-11ef-b186-346f2491af9a.jpg'

In [233]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img /= 255.0
    return img

In [None]:
img = preprocess(b'data\\anchor\\dd5b8044-4a8f-11ef-bdd6-346f2491af9a.jpg')
plt.imshow(img)

In [235]:
positive = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices((tf.ones(len(anchor))))))
negative = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices((tf.zeros(len(anchor))))))
data = positive.concatenate(negative)

In [236]:
samples = data.as_numpy_iterator()
example = samples.next()
example

(b'data\\anchor\\e211db47-4a8f-11ef-9fbc-346f2491af9a.jpg',
 b'data\\positive\\2d841008-4a91-11ef-af1e-346f2491af9a.jpg',
 1.0)

In [237]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

In [None]:
res=preprocess_twin(*example)
plt.imshow(res[0])
res[2]

In [239]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [240]:
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [241]:
test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [242]:
def make_embedding():
    inp = Input(shape=(100, 100, 3), name='input_image')
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [243]:
embedding = make_embedding()
embedding.summary()

In [244]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, input_embedding, validation_embedding):
        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        return tf.math.abs(input_embedding - validation_embedding)
    

In [245]:
l1 = L1Dist()

In [246]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))

    siamese_layer=L1Dist()
    siamese_layer._name='distance'
    distances=siamese_layer(embedding(input_image), embedding(validation_image))

    classifier=Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [247]:
siamese_model=make_siamese_model()
siamese_model.summary()

In [248]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [253]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint=tf.train.Checkpoint(opt=opt,siamese_model=siamese_model)

In [254]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X = batch[:2]
        y = batch[2]
        yhat = siamese_model(X, training=True)
        yhat = tf.squeeze(yhat)
        loss = binary_cross_loss(y, yhat)
    print(loss)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

In [255]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch{}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        if epoch%10==0:
            checkpoint.save(file_prefix=checkpoint_prefix)


In [262]:
EPOCHS=50
train(train_data, EPOCHS)


 Epoch1/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 2s/step

 Epoch2/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 2s/step

 Epoch3/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 2s/step

 Epoch4/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 2s/step

 Epoch5/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 2s/step

 Epoch6/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 2s/step

 Epoch7/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 2s/step

 Epoch8/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 2s/step

 Epoch9/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 2s/step

 Epoch10/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 2s/step

 Epoch11/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 2s/step

 Epoch12/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[

In [263]:
from tensorflow.keras.metrics import Precision, Recall

In [285]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [286]:
y_hat = siamese_model.predict([test_input, test_val])
y_hat = np.squeeze(y_hat, axis=0)
y_hat

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 459ms/step


array([[1.0000000e+00],
       [2.4185864e-10],
       [2.6406125e-10],
       [2.0364985e-10],
       [3.7945555e-10],
       [1.5028540e-07],
       [1.0000000e+00],
       [1.0000000e+00],
       [4.2205639e-10],
       [1.5499788e-07],
       [1.0000000e+00],
       [9.9976152e-01],
       [2.8587194e-10],
       [1.4177278e-09],
       [9.6713710e-01],
       [1.0000000e+00]], dtype=float32)

In [305]:
model_prediction=[1 if prediction>0.5 else 0 for prediction in y_hat]
model_prediction

[1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1]

In [288]:
y_true

array([1., 0., 0., 0., 0., 0., 1., 1., 0., 0., 1., 1., 0., 0., 1., 1.],
      dtype=float32)

In [290]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

In [291]:
n = Recall()
n.update_state(y_true, y_hat)
n.result().numpy()

1.0

In [None]:
plt.figure(figsize=(10, 8))
plt.subplot(1,2,1)
i=0
plt.imshow(test_input[i])
plt.subplot(1,2,2)
plt.imshow(test_val[i])
plt.show()
if(model_prediction[i]==1):
    print("Same image")
else:
    print("Different Image")

In [295]:
siamese_model.save('siamesemodel.h5')



In [297]:
model = tf.keras.models.load_model('siamesemodel.h5',custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [None]:
model.predict([test_input, test_val])

In [300]:
model.summary()