In [2]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt 

In [3]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [4]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [5]:
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

In [6]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset
# !tar -xf lfw.tgz

In [7]:
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw',directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [8]:
# Import uuid library to generate unique image names
import uuid

In [9]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #fame size
    frame = frame[120:120+250,200:250+250,:]
    
    #collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    #collect positive
    if cv2.waitKey(1) & 0xFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    #for showing img
    cv2.imshow("Image Collection", frame)
    #breaking 
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [9]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH + '\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH + '\*.jpg').take(300)

In [10]:
dir_test = anchor.as_numpy_iterator()

In [11]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img,(100,100))
    # Scale image to be between 0 and 1 
    img = img/255.0
    return img

In [12]:
img = preprocess('data\\anchor\\4da6b19f-dc11-11ed-a674-803049e89b68.jpg')

In [13]:
img.numpy().max()

1.0

In [63]:
#  Create Labelled Dataset
class_labels = tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))

In [15]:
iterator_labs = class_labels.as_numpy_iterator()

In [16]:
iterator_labs.next()

0.0

In [64]:
positives  = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [65]:
data

<ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [66]:
samples = data.as_numpy_iterator()

In [67]:
exampple = samples.next()

In [68]:
exampple

(b'data\\anchor\\4700701f-dc11-11ed-b9ce-803049e89b68.jpg',
 b'data\\positive\\86105255-dc11-11ed-8c40-803049e89b68.jpg',
 1.0)

In [69]:
#  Build Train and Test Partition
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [70]:
res = preprocess_twin(*exampple)

In [24]:
res[0]

<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
array([[[0.722549  , 0.5029412 , 0.31862745],
        [0.6990196 , 0.48235294, 0.28137255],
        [0.6843137 , 0.47843137, 0.26568627],
        ...,
        [0.4509804 , 0.26078433, 0.12254902],
        [0.43333334, 0.26666668, 0.12058824],
        [0.44509804, 0.29411766, 0.15392157]],

       [[0.74215686, 0.5029412 , 0.31862745],
        [0.7245098 , 0.4852941 , 0.3009804 ],
        [0.7107843 , 0.4745098 , 0.28137255],
        ...,
        [0.43333334, 0.2509804 , 0.08529412],
        [0.43333334, 0.27941176, 0.10490196],
        [0.41862744, 0.28137255, 0.13431373]],

       [[0.7411765 , 0.51960784, 0.32843137],
        [0.7137255 , 0.49215686, 0.30686274],
        [0.7       , 0.47843137, 0.29313725],
        ...,
        [0.46960783, 0.2647059 , 0.10588235],
        [0.45588234, 0.27941176, 0.14411765],
        [0.36568627, 0.22156863, 0.12058824]],

       ...,

       [[0.96960783, 1.        , 1.        ],
        [0.98

In [25]:
res[2]

1.0

In [71]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [72]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [73]:
train_samples = train_data.as_numpy_iterator()

In [74]:
train_sample = train_samples.next()

In [75]:
len(train_sample[0])

16

In [76]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [77]:
# Build Embedding Layer
def make_embedding():
    inp = Input(shape=(100,100,3), name = 'input_image')
    c1 = Conv2D(64,(10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64,(2,2), padding = 'same')(c1)
    
    c2 = Conv2D(128,(7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2), padding = 'same')(c2)
    
    c3 = Conv2D(128,(4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2), padding = 'same')(c3)
    
    c4 = Conv2D(256,(4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation = 'sigmoid')(f1)
    
    return Model(inputs = [inp],outputs = [d1], name = 'embedding')

In [78]:
embedding = make_embedding()

In [79]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d_4 (Conv2D)           (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 46, 46, 64)       0         
 2D)                                                             
                                                                 
 conv2d_5 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_6 (Conv2D)           (None, 17, 17, 128)       26

In [80]:
# Siamese L1 Distance class
class L1Dist(Layer):
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
        # similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)
    

In [81]:
l1 = L1Dist()

In [82]:
l1

<__main__.L1Dist at 0x2719f4afd90>

In [83]:
# Make Siamese Model
def make_siamese_model():
    
    # Anchor image input in the network
    input_image = Input(name= 'input_img', shape =(100,100,3))
    # Validation image in the network 
    validation_image= Input(name = 'validation_img', shape=(100,100,3))
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image),embedding(validation_image))
     # Classification layer 
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs = classifier, name = 'SiameseNetwork')

In [84]:
siamese_model = make_siamese_model()

In [85]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [86]:
# Setup Loss and Optimizer
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [87]:
opt = tf.keras.optimizers.Adam(1e-4)

In [88]:
#  Establish Checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model = siamese_model)

In [89]:
# Build Train Step Function
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        # Forward pass
        yhat = siamese_model(X, training = True)
         # Calculate loss
        loss = binary_cross_loss(y, yhat)
     # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
     # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    # Return loss
    return loss

In [90]:
def train(data, EPOCHS):
      # Loop through epochs
    for epoch in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
          # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
         # Save checkpoints
        if epoch % 10 ==0:
            checkpoint.save(file_prefix = checkpoint_prefix)
            

In [91]:
EPOCHS = 10

In [None]:
train(train_data, EPOCHS)


 Epoch 1/10

 Epoch 2/10

In [None]:
from tensorflow.keras.metrics import Precision,Recall

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
test_var = test_data.as_numpy_iterator().next()

In [None]:
# Make predictions
y_hat = siamese_model.predict([test_input, test_val])
y_hat

In [None]:
# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
# res = []
# for prediction in y_hat:
#     if prediction>0.5:
#         res.append(1)
#     else:
#         res.append(0)

In [None]:
y_true

In [None]:
# Creating a metric object 
m = Recall()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Save weights
siamese_model.save('siamesemodel.h5')

In [None]:
# Reload model 
model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
model.predict([test_input, test_val])

In [None]:
model.summary()

In [None]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image','input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','verification_images',image))
        
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
        
    detection = np.sum(np.array(results)> detection_threshold)
    verification = detection/ len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification> verification_threshold
    
    return results, verified
    

In [None]:
import cv2
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    #fame size
    frame = frame[120:120+250,200:250+250,:]
    cv2.imshow("Image Collection", frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'),frame)
        results, verified = verify(model, 0.5, 0.5)
        print(verified)
        
    #breaking 
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [1]:
np.sum(np.squeeze(results)>0.5)

NameError: name 'np' is not defined