In [1]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [3]:
# Set data paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [4]:
# # Create dirs
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

In [5]:
# # Uncompress LFW database
# # Download dataset from http://vis-www.cs.umass.edu/lfw/#download
# # Into /facial_recognition folder
# filename = 'lfw.tgz'
# !tar -xf filename   # Extract data

In [6]:
# # Put all the LFW data into the negative folder
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [7]:
# Import uuid to generate unique identifier
import uuid

In [9]:
# Collect data
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    # Cut frame to 250x250p
    dim = 250
    x_offset = 250
    y_offset = 150
    frame = frame[y_offset:y_offset+dim, x_offset:x_offset+dim, :]

    # Collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        img_name = os.path.join(ANC_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)

    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        img_name = os.path.join(POS_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(img_name, frame)

    cv2.imshow('Image Collection', frame)

    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

# Release the webcam
cap.release()
cv2.destroyAllWindows()

### Load and preprocess images

In [8]:
# Get directories
anchor = tf.data.Dataset.list_files(ANC_PATH + os.sep + '*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH + os.sep + '*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH + os.sep + '*.jpg').take(300)

In [9]:
# Scale and resize
def preprocess_image(file_path):
    """Receives a path to an img and returns a 100x100p normalized image"""
    # Read image
    raw_img = tf.io.read_file(file_path)

    # Load image
    img = tf.io.decode_jpeg(raw_img)

    # Preprocessing
    img = tf.image.resize(img, (100,100))
    
    # Normalizing
    img = img/255.0
    return img

In [10]:
# Create labeled dataset
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)
data

<ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [11]:
# Build train and test partition
def preprocess_twin(input_img, validation_img, label) -> tuple:
    '''Receives input and validation image with the corresponding label and returns
    a tuple containing the preprocessed input and validation image, as well as the label'''
    return (preprocess_image(input_img), preprocess_image(validation_img), label)

In [12]:
samples = data.as_numpy_iterator()

In [13]:
example = samples.next()
print(example)

(b'data\\anchor\\20982ff9-856b-11ed-bce5-34e6adf636cc.jpg', b'data\\positive\\3f3121a7-856b-11ed-b610-34e6adf636cc.jpg', 1.0)


In [14]:
res = preprocess_twin(*example)

In [15]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [16]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [17]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### Model Engineering

In [18]:
# Build embedding layer
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    # First block of convolution-maxpooling
    # Convolution layer
    c1 = Conv2D(64, (10, 10), activation='relu')(inp)     # 91x91p  64 ch
    # Maxpooling layer
    m1 = MaxPooling2D(64, (2, 2), padding='same')(c1)     # 46x46p  64 ch
    
    # Second block
    c2 = Conv2D(128, (7, 7), activation='relu')(m1)       # 40x40p  128 ch
    m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)     # 20x20p  128 ch
    
    # Third block
    c3 = Conv2D(128, (4, 4), activation='relu')(m2)       # 17x17p  128 ch
    m3 = MaxPooling2D(64, (2, 2), padding='same')(c3)     #  9x9p   128 ch

    # Fourth block
    c4 = Conv2D(256, (4, 4), activation='relu')(m3)       #  6x6p   256 ch
    f1 = Flatten()(c4)                                    #  1xdim (dim = 6x6x256 = 9216)
    d1 = Dense(4096, activation='sigmoid')(f1)            # 4096 feature vector


    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [21]:
embedding = make_embedding()

In [22]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [19]:
# Build L1 distance layer
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, input_embedding, validation_embedding):         # Input/anchor and pos/neg data
        return tf.math.abs(input_embedding - validation_embedding)

In [20]:
def make_siamese_model():
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
    

In [25]:
siamese_model = make_siamese_model()

In [26]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

### Training

In [21]:
# Setup loss and optimizer
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

In [22]:
# Checkpoint callbacks
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

NameError: name 'siamese_model' is not defined

In [23]:
# Build train step function
@tf.function
def train_step(batch):
    # Record operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]

        # Forward pass
        y_pred = siamese_model(X, training=True)
        # Loss
        loss = binary_cross_loss(y, y_pred)  # True value and predicted value
    
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    # Updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss



In [24]:
# Build training loop
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print(f'\n Epoch {epoch}/{EPOCHS}')
        prog_bar = tf.keras.utils.Progbar(len(data))

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step
            train_step(batch)
            prog_bar.update(idx+1)
        
        # Solve checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [25]:
# Train model
EPOCHS = 50

In [30]:
train(train_data, EPOCHS)


 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


### Evaluate Model

In [15]:
from tensorflow.keras.metrics import Precision, Recall

In [33]:
# Grab a batch of data
test_input, test_val, y_true = list(test_data.as_numpy_iterator().next())

In [34]:
y_true

array([0., 0., 0., 1., 1., 1., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0.],
      dtype=float32)

In [35]:
y_pred = siamese_model.predict([test_input, test_val])
y_pred



array([[0.4980272 ],
       [0.49699104],
       [0.4963625 ],
       [0.499376  ],
       [0.49924907],
       [0.49844855],
       [0.49627998],
       [0.49963847],
       [0.5000499 ],
       [0.4974298 ],
       [0.49962646],
       [0.49821162],
       [0.5000547 ],
       [0.49861953],
       [0.4995303 ],
       [0.4997551 ]], dtype=float32)

In [36]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_pred]

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]

In [37]:
y_true

array([0., 0., 0., 1., 1., 1., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0.],
      dtype=float32)

In [38]:
# Recall
rec = Recall()
rec.update_state(y_true, y_pred)
rec.result().numpy()

0.0

In [39]:
# Precision
prec = Precision()
prec.update_state(y_true, y_pred)
prec.result().numpy()

0.0

### Save model

In [78]:
siamese_model.save('siamese_model.h5')



In [26]:
# Reload model
model = tf.keras.models.load_model('siamese_model.h5',
        custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [27]:
model.predict([test_input, test_val])

NameError: name 'test_input' is not defined

In [28]:
model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

### Real time test

In [29]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for  image in os.listdir(os.path.join('app_data', 'verification_images')):
        input_img = preprocess_image(os.path.join('app_data', 'input_images', 'input_image.jpg'))
        validation_img = preprocess_image(os.path.join('app_data', 'verification_images', image))
        prediction = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(prediction)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('app_data', 'verification_images')))
    verified = verification > verification_threshold

    return results, verified


### OpenCV Real Time Verification

In [32]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    # Cut frame to 250x250p
    dim = 250
    x_offset = 250
    y_offset = 150
    frame = frame[y_offset:y_offset+dim, x_offset:x_offset+dim, :]

    cv2.imshow('Verification', frame)

    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to input_image folder
        path = os.path.join('app_data', 'input_images', 'input_image.jpg')
        cv2.imwrite(path, frame)
        # Apply verification function
        results, verified = verify(model, 0.5, 0.5)
        print(verified)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

True
True


In [45]:
cap.release()
cv2.destroyAllWindows()

In [33]:
np.sum(np.squeeze(results) > 0.9)

38