In [1]:
import cv2
import os 
import random
import numpy as np
import matplotlib.pyplot as plt
import uuid
from tqdm import trange

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.metrics import Precision, Recall
import tensorflow as tf

  from scipy.sparse import issparse  # pylint: disable=g-import-not-at-top


In [13]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("atulanandjha/lfwpeople")

# print("Path to dataset files:", path)

In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')
POS_PATH, NEG_PATH, ANC_PATH

In [5]:
os.makedirs(POS_PATH, exist_ok=True)
os.makedirs(NEG_PATH, exist_ok=True)
os.makedirs(ANC_PATH, exist_ok=True)

In [6]:
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [7]:
# cap = cv2.VideoCapture(0)
# while cap.isOpened():
#     ret, frame = cap.read()
#     frame = frame[50:50+270, 180:180+270, :]
#     frame = cv2.flip(frame, 1)
#     if cv2.waitKey(1) & 0xFF==ord('s'):
#         img_name = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img_name, frame)

#     if cv2.waitKey(1) & 0xFF==ord('a'):
#         img_name = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
#         cv2.imwrite(img_name, frame)


#     cv2.imshow("frame", frame)
#     if cv2.waitKey(1) & 0xFF==ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()

In [8]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\\*.jpg').take(1020)
positive = tf.data.Dataset.list_files(POS_PATH+'\\*.jpg').take(1020)
negative = tf.data.Dataset.list_files(NEG_PATH+'\\*.jpg').take(1020)

In [6]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (128, 128))
    img = img / 255.0
    return img

In [10]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [11]:
# sample = data.as_numpy_iterator()
# ex = sample.next()

In [12]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [13]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [14]:
train_data = data.take(round(len(data) * .7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [15]:
test_data = data.skip(round(len(data) * .7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [4]:
def make_embedding():
    inp = Input(shape=(128, 128, 3), name='input_image')

    c1 = Conv2D(64, (10, 10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2, 2), padding='same')(c1)

    c2 = Conv2D(128, (7, 7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)

    c3 = Conv2D(128, (4, 4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2, 2), padding='same')(c3)

    c4 = Conv2D(256, (4, 4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [5]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 128, 128, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 119, 119, 64)      19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 60, 60, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 54, 54, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 27, 27, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 24, 24, 128)       26

In [6]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        # Convert inputs to tensors if they're lists
        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        return tf.math.abs(input_embedding - validation_embedding)

In [7]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(128,128,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(128,128,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [8]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         86146368    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [21]:
binary_cross_loss = tf.losses.BinaryCrossentropy(from_logits=True)
opt = tf.keras.optimizers.Adam(1e-4)

In [22]:
os.makedirs('./training_checkpoints', exist_ok=True)

In [23]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [24]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        yhat = tf.reshape(yhat,(-1, 1))
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [25]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [26]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

In [25]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

In [None]:
m = Recall()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

In [None]:
# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

In [None]:
siamese_model.save('siamesemodelv2.h5')

In [None]:
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
siamese_model.predict([test_input, test_val])

In [None]:
siamese_model.summary()

In [None]:
os.listdir(os.path.join('application_data', 'verification_images'))

In [None]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

In [None]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)

In [4]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
         
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    detection = np.sum(np.array(results) > detection_threshold)
    
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

In [None]:
import cv2
import os

cap = cv2.VideoCapture(0)
c = 5

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame = cv2.flip(frame, 1)

    x, y, w, h = 180, 50, 270, 270
    frame1 = frame[y:y + h, x:x + w, :]

    start_point = (x, y)
    end_point = (x + w, y + h)
    thickness = 2

    # Draw the rectangle
    if c == 5:
        color = (255, 0, 0)  # Blue
        frame = cv2.rectangle(frame, start_point, end_point, color, thickness)

    # Verify condition
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame1)
        results, verified = verify(siamese_model, 0.5, 0.5)  # Example verify function
        if verified:
            c = 1
            print("verified")
        else:
            c = 0
            print("unverified")

    # Change rectangle color and add text based on verification result
    if c == 1:
        color = (0, 255, 0)  # Green
        text = "Verified"
        frame = cv2.rectangle(frame, start_point, end_point, color, thickness)
    elif c == 0:
        color = (0, 0, 255)  # Red
        text = "Unverified"
        frame = cv2.rectangle(frame, start_point, end_point, color, thickness)
    else:
        text = ""

    # Add text below the rectangle
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.6
    font_thickness = 2
    text_size = cv2.getTextSize(text, font, font_scale, font_thickness)[0]
    text_x = x + (w - text_size[0]) // 2  # Center the text horizontally
    text_y = y + h + 25  # Place the text below the rectangle
    cv2.putText(frame, text, (text_x, text_y), font, font_scale, color, font_thickness)

    # Display the frame
    cv2.imshow('Verification', frame)

    # Exit condition
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[50:50+270, 180:180+270, :]
    frame = cv2.flip(frame, 1)

    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.5, 0.5)
        if verified:
            print("Verified")
        else:
            print("Unverified")
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
# cap = cv2.VideoCapture(1)
# while True:
#     ret, frame = cap.read()
#     frame = cv2.flip(frame, 1)
    
#     start_point = (200, 30)
#     end_point = (340, 200)
#     color = (255, 0, 0)
#     thickness = 2
#     frame = cv2.rectangle(frame, start_point, end_point, color, thickness)
#     frame_cropped = frame[start_point[1]:end_point[1], start_point[0]:end_point[0]]
#     cv2.imshow("Frame", frame_cropped)
#     if cv2.waitKey(1) & 0xFF==ord('q'):
#         break
# cap.release()
# cv2.destroyAllWindows()