In [2]:
#import standered depencies
import cv2 as cv
import random 
from matplotlib import pyplot as plt
import numpy as np
import os

In [3]:
#import tensorflow depencies - Functional API
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

In [4]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

In [5]:
# Steup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negetive')
ANC_PATH = os.path.join('data', 'anchor')

In [9]:
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

In [10]:
# !tar -xf lfw.tgz

In [7]:
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [6]:
# import uuid to generate unique image name
import uuid

In [11]:
cap = cv.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    # cut down the frame 250x250px chanel:3
    frame = frame[120:120+250, 200:200+250, :]
    
    # collect anchor img
    if cv.waitKey(1) & 0XFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # store the image in anchor path
        cv.imwrite(imgname, frame)
    
    # collect possitive img
    if cv.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # store the image in positive path
        cv.imwrite(imgname, frame)
    
    
    # show the image to the screen
    cv.imshow('Image Collection', frame)
    
    # if 'q' is pressed then window will be closed
    if cv.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv.destroyAllWindows()

In [15]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negetive = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [16]:
dir_test = anchor.as_numpy_iterator()

In [20]:
dir_test.next()

b'data\\anchor\\555498c0-67df-11ec-8e1f-acd564b5385f.jpg'

In [7]:
def preprocess(file_path):
    
    # img is loaded by tenserflow
    byte_img = tf.io.read_file(file_path)
    
    # img is decoded by tenserflow
    img = tf.io.decode_jpeg(byte_img)
    
    # resizing the image by 100x100px
    img = tf.image.resize(img, (100, 100))
    
    # scale the image between 0-1
    img = img / 255.0
    return img

In [23]:
img = preprocess('data\\anchor\\555498c0-67df-11ec-8e1f-acd564b5385f.jpg')

In [24]:
img.numpy().min()

0.0

In [32]:
img.numpy().max()

0.99215686

In [35]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negetives = tf.data.Dataset.zip((anchor, negetive, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negetives)

In [36]:
data

<ConcatenateDataset shapes: ((), (), ()), types: (tf.string, tf.string, tf.float32)>

In [37]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [38]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [67]:
data

<ShuffleDataset shapes: ((100, 100, None), (100, 100, None), ()), types: (tf.float32, tf.float32, tf.float32)>

In [66]:
# training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [68]:
train_data

<PrefetchDataset shapes: ((None, 100, 100, None), (None, 100, 100, None), (None,)), types: (tf.float32, tf.float32, tf.float32)>

In [69]:
# testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

## Bulid embadding layer

In [8]:
def make_embedding():
    inp = Input(shape=(100, 100, 3), name='input_img')
    
    # first block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [9]:
embedding = make_embedding()

In [10]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_img (InputLayer)      [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

## Buliding distance layer

In [11]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

## Make Siamese Model

In [12]:
def make_siamese_model():
    
    # Anchor input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation input in the network
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [13]:
siamese_model = make_siamese_model()

In [14]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

---------

## Setup loss and Optimizer

In [107]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [108]:
opt = tf.keras.optimizers.Adam(1e-4)

## Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

# Build Train step function

In [15]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        x = batch[:2]
        # Get Label
        y = batch[2]
        
        # Forward psss
        yhat = siamese_model(x, training=True)
        
        # Calculate Loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calulate upadted weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

# Bulid Training loop

In [16]:
def train(data, EPOCHS):
    
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data))
        
        # Look through each batch
        for idx, batch in enumerate(train_data):
            # Run train steps here
            train_step(batch)
            progbar.update(idx+1)
            
        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

# Train the Model

In [17]:
model = tf.keras.models.load_model('siamese_model.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [23]:
model.predict((test_input, test_val))

NameError: name 'test_input' is not defined

In [18]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [19]:
INPUT_PATH = os.path.join('application_data', 'input_image')
VERIFICATION_PATH = os.path.join('application_data', 'verification_image')

In [6]:
os.makedirs(INPUT_PATH)
os.makedirs(VERIFICATION_PATH)

In [20]:
len(os.listdir(VERIFICATION_PATH))

50

In [21]:
def verify(model, detection_threshold, verification_threshold):
    
    results = []
    for image in os.listdir(VERIFICATION_PATH):
        input_img = preprocess(os.path.join(INPUT_PATH, 'input_image.jpg'))
        validation_img = preprocess(os.path.join(VERIFICATION_PATH, image))

        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(VERIFICATION_PATH))
    verified = verification > verification_threshold

    return results, verified


In [23]:
cap = cv.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    
    frame = frame[120:120+250, 200:200+250, :]
    
    cv.imshow('verification', frame)
    
    if(cv.waitKey(10) & 0XFF == ord('v')):
        cv.imwrite(os.path.join(INPUT_PATH, 'input_image.jpg'), frame)
        
        results, verified = verify(model, 0.1, 0.1)
        print(verified)
        
    if(cv.waitKey(10) & 0XFF == ord('q')):
        break
    
cap.release()
cv.destroyAllWindows()

False
True
True
True
False
True
True
False
False
True
False
False
True
True
False
True
False
True
False
False
True
True
False
False
True
True
