In [1]:
import os
import cv2
from matplotlib import pyplot as plt
import random
import pandas as pd
import numpy as np
import uuid

In [2]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

In [3]:
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [4]:
#setting gpu growth
gpu = tf.config.experimental.list_physical_devices('GPU')
for g in gpu:
    tf.config.experimental.set_memory_growth(g, True)

In [5]:
pospath = os.path.join('data', 'positive')
negpath = os.path.join('data', 'negative')
anchor = os.path.join('data', 'anchor')

In [6]:
# os.makedirs(pospath)
# os.makedirs(negpath)
# os.makedirs(anchor)

In [7]:
# cap  = cv2.VideoCapture(0)
# while cap.isOpened():
#     ret, frame = cap.read()
#     frame = frame[80:250+80, 200:250+200, :]    
#     cv2.imshow('image', frame)   
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         img = os.path.join(anchor, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img, frame)
        
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         img = os.path.join(pospath, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img, frame)
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()    

In [8]:
# for directories in os.listdir("lfw"):
#     for files in os.listdir(os.path.join('lfw',directories)):
#         ex = os.path.join("lfw", directories, files)
#         new = os.path.join(negpath, files)
#         os.replace(ex, new)

In [9]:
def createdataset(path):
    return tf.data.Dataset.list_files(path+"\*.jpg").take(150) 

In [10]:
pos = createdataset(pospath)
neg = createdataset(negpath)
anch = createdataset(anchor)

In [11]:
print(pos, neg, anch, sep='\n')

<TakeDataset element_spec=TensorSpec(shape=(), dtype=tf.string, name=None)>
<TakeDataset element_spec=TensorSpec(shape=(), dtype=tf.string, name=None)>
<TakeDataset element_spec=TensorSpec(shape=(), dtype=tf.string, name=None)>


In [12]:
test = pos.as_numpy_iterator()
test.next()

b'data\\positive\\2b5b7cbc-d0af-11ed-9782-e45e37ecec6b.jpg'

In [13]:
tf.ones(len(anch))

<tf.Tensor: shape=(150,), dtype=float32, numpy=
array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.],
      dtype=float32)>

In [14]:
positives = tf.data.Dataset.zip((anch, pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(anch)))))
negatives = tf.data.Dataset.zip((anch, neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anch)))))
rawdata = positives.concatenate(negatives)

In [15]:
print(positives, negatives, rawdata, sep='\n')

<ZipDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>
<ZipDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>
<ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>


In [16]:
test = rawdata.as_numpy_iterator()
test.next()

(b'data\\anchor\\b12fe2ca-d0ae-11ed-96e5-e45e37ecec6b.jpg',
 b'data\\positive\\c0956b3f-d0ae-11ed-abaf-e45e37ecec6b.jpg',
 1.0)

In [17]:
def preprocess(path):
    byteimg = tf.io.read_file(path)
    img = tf.io.decode_jpeg(byteimg)
    img = tf.image.resize(img, (100, 100))
    img /= 255.0
    return img

In [18]:
[[1,2,3],
 [4,5,6],
 [7,8,9]]

[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

In [19]:
def preprocess_twin(inputimg, validationimg, label):
    return (preprocess(inputimg), preprocess(validationimg), label)

In [20]:
data = rawdata.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [21]:
test = data.as_numpy_iterator()
test.next()

(array([[[0.5245098 , 0.5507353 , 0.56421566],
         [0.5156863 , 0.5534314 , 0.5620098 ],
         [0.5083333 , 0.5534314 , 0.55539215],
         ...,
         [0.6580882 , 0.72083336, 0.77181375],
         [0.64436275, 0.7129902 , 0.75514704],
         [0.6382353 , 0.7088235 , 0.7480392 ]],
 
        [[0.5183824 , 0.5644608 , 0.5713235 ],
         [0.5176471 , 0.5637255 , 0.57058823],
         [0.50686276, 0.5529412 , 0.5598039 ],
         ...,
         [0.6495098 , 0.7144608 , 0.7593137 ],
         [0.6512255 , 0.7147059 , 0.764951  ],
         [0.65588236, 0.71936274, 0.76960784]],
 
        [[0.50784314, 0.57058823, 0.57058823],
         [0.51960784, 0.57598037, 0.57916665],
         [0.51004905, 0.55833334, 0.5644608 ],
         ...,
         [0.6490196 , 0.7176471 , 0.7676471 ],
         [0.6598039 , 0.72843134, 0.77843136],
         [0.6517157 , 0.7223039 , 0.7713235 ]],
 
        ...,
 
        [[0.3987745 , 0.41936275, 0.42818627],
         [0.36838236, 0.389951  , 0.38897

In [22]:
len(data)

300

In [23]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [24]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [25]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [26]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [27]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, input_emb, validation_emb):
        return tf.math.abs(input_emb - validation_emb)

In [28]:
inputimg = Input(name='input_img', shape=(100, 100, 3))
validationimg = Input(name='val_img', shape=(100, 100, 3))
print(inputimg, validationimg, sep='\n')

KerasTensor(type_spec=TensorSpec(shape=(None, 100, 100, 3), dtype=tf.float32, name='input_img'), name='input_img', description="created by layer 'input_img'")
KerasTensor(type_spec=TensorSpec(shape=(None, 100, 100, 3), dtype=tf.float32, name='val_img'), name='val_img', description="created by layer 'val_img'")


In [29]:
siameselayer = L1Dist()


In [30]:
inputembedding = embedding(inputimg)
validationembedding = embedding(validationimg)
print(inputembedding, validationembedding, sep='\n')

KerasTensor(type_spec=TensorSpec(shape=(None, 4096), dtype=tf.float32, name=None), name='embedding/dense/Sigmoid:0', description="created by layer 'embedding'")
KerasTensor(type_spec=TensorSpec(shape=(None, 4096), dtype=tf.float32, name=None), name='embedding/dense/Sigmoid:0', description="created by layer 'embedding'")


In [31]:
distances = siameselayer(inputembedding, validationembedding)
distances

<KerasTensor: shape=(None, 4096) dtype=float32 (created by layer 'l1_dist')>

In [32]:
classifier = Dense(1, activation='sigmoid')(distances)
classifier

<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'dense_1')>

In [33]:
testnetwork = Model(inputs=[inputimg, validationimg], outputs=classifier, name='SiameseNetwork')
testnetwork

<keras.engine.functional.Functional at 0x239222e9b10>

In [34]:
def make_siamese_model():
    inputimg = Input(name='inputimg', shape=(100, 100, 3))
    validationimg = Input(name='validationimg', shape=(100, 100, 3))
    siameselayer = L1Dist()
    siameselayer._name = 'distance'
    distances = siameselayer(embedding(inputimg), embedding(validationimg))
    classifier = Dense(1, activation='sigmoid')(distances)
    return Model(inputs=[inputimg, validationimg], outputs=classifier, name='SiameseNetwork')
    

In [35]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 inputimg (InputLayer)          [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validationimg (InputLayer)     [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['inputimg[0][0]',               
                                                                  'validationimg[0][0

In [36]:
bcl = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [37]:
checkpointdir = './checkpoints'
checkpointprefix = os.path.join(checkpointdir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [38]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X = batch[:2]
        Y = batch[2]
        y_pred = siamese_model(X, training=True)
        loss = bcl(Y, y_pred)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss

In [39]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print("\n EPOCH : {}/{}".format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        if epoch%50==0:
            checkpoint.save(file_prefix=checkpointprefix)

In [40]:
EPOCHS = 50
train(train_data, EPOCHS)


 EPOCH : 1/50


: 

: 