In [1]:
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Conv2D, MaxPooling2D, Layer

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [13]:
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw', directory)):
    EX_PATH = os.path.join('lfw', directory, file)
    NEW_PATH = os.path.join(NEG_PATH, file)
    os.replace(EX_PATH, NEW_PATH)

In [4]:
import uuid
uuid.uuid1()

UUID('6d9ed7b0-f818-11ef-881b-824a5e8c2f45')

In [81]:
import cv2

cap = cv2.VideoCapture(1)
while cap.isOpened(): 
    ret, frame = cap.read()
    frame = frame[150:450+250, 600:900+250, :]

    #Anchors Collection
    if cv2.waitKey(1) & 0xFF == ord('a'):
        #Unique file name
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    #Positive Collection
    if cv2.waitKey(1) & 0xFF == ord('p'):
        #Unique file name
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    cv2.imshow('Image Collection Webcam', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

In [163]:
#plt.imshow(frame)

In [4]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

In [5]:
dir_test = anchor.as_numpy_iterator()

In [6]:
#Loading, Scaling and Resizing Images in a pipeline
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img/255.0
    return img

In [7]:
img = preprocess("/Users/lwinmin/General/Projects/Jupiter-CNN/Image Recognition/data/anchor/a5d4e380-f75b-11ef-a077-824a5e8c2f45.jpg")

In [9]:
#plt.imshow(img)

In [10]:
#dataset.map(preprocess)

In [8]:
#Creating Positive and Negative Samples
positives = tf.data.Dataset.zip(anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))
negatives =tf.data.Dataset.zip(anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))

data = positives.concatenate(negatives)

In [9]:
data

<_ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [10]:
tf.zeros(len(anchor))

<tf.Tensor: shape=(9,), dtype=float32, numpy=array([0., 0., 0., 0., 0., 0., 0., 0., 0.], dtype=float32)>

In [11]:
#exm = sample.next()

In [12]:
#Train and Test 
def preprocess_twin(input_img, val_img, label):
    return(preprocess(input_img), preprocess(val_img), label)

In [356]:
#test = preprocess_twin(*exm)

In [357]:
#plt.imshow(test[0])

In [13]:
#Caching, Batching and Splitting the Pipeline
#Dataloader Pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [19]:
round(len(data)*.7)

8

In [20]:
data

<_ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [14]:
#Training

train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [22]:
train_data

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None, 100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.float32, name=None))>

In [23]:
train_sample = train_data.as_numpy_iterator()

In [24]:
train_sam =train_sample.next()

In [25]:
len(train_sam[0])

8

In [15]:
#Testing

test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [49]:
model_embedding.summary()

In [16]:
#Model Engineering
#Embedding Layers

def embedding_layer():
    inp = Input(shape=(105, 105, 3), name='input_image') #Input
    
    c1 = Conv2D(64, (10,10), activation='relu')(inp) #1st Layer 
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1) #2nd Layer 
    
    c2 = Conv2D(128, (7,7), activation='relu')(m1) #3rd Layer
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2) #4th Layer
    
    c3 = Conv2D(128, (4,4), activation='relu')(m2) #5th Layer
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3) #6th Layer
    
    c4 = Conv2D(256, (4,4), activation='relu')(m3) #7th Layer
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    
    return Model(inputs=[inp] , outputs= [d1], name='embedding')

In [17]:
#Siamese distance class
from tensorflow.keras.layers import Layer, Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.models import Model
import tensorflow as tf


class L1Dist(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [20]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.models import Model

# Define the embedding model
def embedding_layer():
    inp = Input(shape=(105, 105, 3), name='input_image')  # Input
    
    c1 = Conv2D(64, (10, 10), activation='relu')(inp)  # 1st Layer 
    m1 = MaxPooling2D(64, (2, 2), padding='same')(c1)  # 2nd Layer 
    
    c2 = Conv2D(128, (7, 7), activation='relu')(m1)  # 3rd Layer
    m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)  # 4th Layer
    
    c3 = Conv2D(128, (4, 4), activation='relu')(m2)  # 5th Layer
    m3 = MaxPooling2D(64, (2, 2), padding='same')(c3)  # 6th Layer
    
    c4 = Conv2D(256, (4, 4), activation='relu')(m3)  # 7th Layer
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

# Siamese distance class
class L1Dist(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        # Ensure that both inputs are tensors
        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        return tf.math.abs(input_embedding - validation_embedding)

# Input images
input_image = Input(name='input_img', shape=(105, 105, 3))
validation_image = Input(name='validation_img', shape=(105, 105, 3))

# Create the embedding model
model_embedding = embedding_layer()

# Get embeddings for input and validation images
inp_emb = model_embedding(input_image)
val_emd = model_embedding(validation_image)

# Initialize the Siamese layer
siamese_layer = L1Dist()

# Apply the Siamese layer to compute the distance
output = siamese_layer(inp_emb, val_emd)

# Print the output shape
print(f"Output shape: {output.shape}")


Output shape: (1, None, 4096)


In [21]:
def Siamese_Model():

    #Handling Inputs
    input_image = Input(name='input_img', shape=(105,105,3)) #Anchor
    validation_image = Input(name='validation_img', shape=(105,105,3)) #validation

    #Combine distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(model_embedding(input_image), model_embedding(validation_image))


    #Classificarion Layer
    classifier = Dense(1, activation='sigmoid')(distances) #Final output 1 layer

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='Siamese_Network')

In [30]:
Siamese_Net = Model(inputs=[input_image, validation_image], outputs=classifier, name='Siamese_Network')

In [32]:
Siamese_Net.summary()

In [70]:
siam_model = Siamese_Model()

In [71]:
siam_model.summary()

In [72]:
#Training a Siamese Neural Network
loss = tf.losses.BinaryCrossentropy()

In [73]:
opter = tf.keras.optimizers.Adam(1e-4) #0.0001

In [109]:
checkpoints_dir = './training_checkpoints'
checkpoints_prefix = os.path.join(checkpoints_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opter, siam_model=siam_model)

In [110]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape: 
        X1= batch[:2]  # Extract anchor & pos/neg
        y = batch[2]  # Extract label

        yhat = siam_model(X, training=True) 

        #Compute loss
        loss = binary_cross_loss(y, yhat)

    # Compute gradients
    grad = tape.gradient(loss, siam_model.trainable_variables)  
    opt.apply_gradients(zip(grad, siam_model.trainable_variables))  

    return loss


In [111]:
#Training Loop
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data))

        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)

        #saving the checkpoints
        if epoch% 10 == 0:
            checkpoint.save(file_prefix=checkpoints_prefix)

In [112]:
EPOCHS = 50

In [113]:
train(train_data, EPOCHS)


 Epoch 1/50


NameError: in user code:

    File "/var/folders/0t/tlg8p4n91y1g2zyg12tmx_n00000gn/T/ipykernel_23604/2250223949.py", line 7, in train_step  *
        yhat = Siamese_Net(X, training=True)

    NameError: name 'X' is not defined
