In [1]:
import tensorflow as tf

# Prepare Triplet Dataset

In [2]:
import pathlib
import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import glob

from itertools import permutations

In [25]:
img_paths = list()
for i in range(3):
    subject_id = str(i).zfill(3)
    subject_path = f"dataset/casia_cropped/{subject_id}"
    print(subject_path)
    for j in range(5):
        img_path = f"{subject_path}/{subject_id}_{j}.jpg"
        img_paths.append(img_path)

dataset/casia_cropped/000
dataset/casia_cropped/001
dataset/casia_cropped/002


In [26]:
img_paths

['dataset/casia_cropped/000/000_0.jpg',
 'dataset/casia_cropped/000/000_1.jpg',
 'dataset/casia_cropped/000/000_2.jpg',
 'dataset/casia_cropped/000/000_3.jpg',
 'dataset/casia_cropped/000/000_4.jpg',
 'dataset/casia_cropped/001/001_0.jpg',
 'dataset/casia_cropped/001/001_1.jpg',
 'dataset/casia_cropped/001/001_2.jpg',
 'dataset/casia_cropped/001/001_3.jpg',
 'dataset/casia_cropped/001/001_4.jpg',
 'dataset/casia_cropped/002/002_0.jpg',
 'dataset/casia_cropped/002/002_1.jpg',
 'dataset/casia_cropped/002/002_2.jpg',
 'dataset/casia_cropped/002/002_3.jpg',
 'dataset/casia_cropped/002/002_4.jpg']

In [27]:
img_paths[0].split("/")[-2]

'000'

In [28]:
anchor_paths = list()
positive_paths = list()
negative_paths = list()

for anchor_path, positive_path, negative_path in permutations(img_paths, 3):
    anchor_subject = anchor_path.split("/")[-2]
    positive_subject = positive_path.split("/")[-2]
    negative_subject = negative_path.split("/")[-2]
    
    if (anchor_subject == positive_subject) and (anchor_subject != negative_subject):
        anchor_paths.append(str(anchor_path))
        positive_paths.append(str(positive_path))
        negative_paths.append(str(negative_path))

In [29]:
len(anchor_paths)

600

In [30]:
def preprocess_image(filename, target_shape):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """

    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image

In [131]:
images = dict()
target_shape = (160, 160)

triplets = tf.zeros([0, 3, 160, 160, 3])
# positive_imgs = list()
# negative_imgs = list()

for anchor_path, positive_path, negative_path in zip(anchor_paths, positive_paths, negative_paths):
    try:
        anchor_img = images[anchor_path]
    except:
        anchor_img = preprocess_image(anchor_path, target_shape)
        images[anchor_path] = anchor_img
    
    try:
        positive_img = images[positive_path]
    except:
        positive_img = preprocess_image(positive_path, target_shape)
        images[positive_path] = positive_img
        
    try:
        negative_img = images[negative_path]
    except:
        negative_img = preprocess_image(negative_path, target_shape)
        images[negative_path] = negative_img
    triplet = tf.stack([anchor_img, positive_img, negative_img], axis=0)
    triplet = tf.expand_dims(triplet, axis=0)
    triplets = tf.concat([triplets, triplet], axis=0)

In [132]:
triplets.shape

TensorShape([600, 3, 160, 160, 3])

In [133]:
triplets = tf.data.Dataset.from_tensors(triplets)

In [123]:
len(images.keys())

15

In [134]:
len(triplets)

1

In [124]:
labels = tf.data.Dataset.from_tensor_slices(tf.ones((len(triplets),)))
triplet_dataset = tf.data.Dataset.zip((triplets, labels))

TypeError: Invalid `datasets`. `datasets` is expected to be a (nested) structure of `tf.data.Dataset` objects but encountered object of type <class 'tensorflow.python.framework.ops.EagerTensor'>.

In [89]:
sample = next(iter(triplet_dataset))

In [90]:
print(len(sample))
print("x:", len(sample[0]))

2
x: 3


In [102]:
num_samples = len(triplets)

triplet_dataset = triplet_dataset.shuffle(buffer_size=1024)
train_dataset = triplet_dataset.take(round(num_samples * 0.8))
val_dataset = triplet_dataset.skip(round(num_samples * 0.8))

# train_dataset = train_dataset.batch(32, drop_remainder=False)
# train_dataset = train_dataset.prefetch(8)

# val_dataset = val_dataset.batch(32, drop_remainder=False)
# val_dataset = val_dataset.prefetch(8)

In [103]:
len(train_dataset)

480

In [110]:
batch = next(iter(train_dataset))
len(batch)

2

In [105]:
len(batch[0])

3

In [106]:
print(batch[0][0].shape)
print(batch[0][1].shape)
print(batch[0][2].shape)

(160, 160, 3)
(160, 160, 3)
(160, 160, 3)


# Train

In [107]:
from tensorflow.keras.layers import Input, Layer, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras import metrics
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping

In [42]:
from facenet.facenet import load_model as load_facenet_model

In [43]:
def cosine_distance(x, y):
    y_true = tf.linalg.l2_normalize(x, axis=-1)
    y_pred = tf.linalg.l2_normalize(y, axis=-1)
    cosine_similarity = tf.reduce_sum(y_true * y_pred, axis=-1) # range [-1,1]
    distance = 1 - cosine_similarity # range [0, 2]
    return distance

In [44]:
# Test cosine_distance function

arr0 = [
    [1,1,1,1],
    [1,0,1,0],
]
arr0 = tf.convert_to_tensor(arr0, dtype=tf.float32)
arr1 = [
    [1,1,1,1],
    [1,1,1,1],
]
arr1 = tf.convert_to_tensor(arr1, dtype=tf.float32)

print(arr0)
print(arr1)
print(cosine_distance(arr0, arr1))

tf.Tensor(
[[1. 1. 1. 1.]
 [1. 0. 1. 0.]], shape=(2, 4), dtype=float32)
tf.Tensor(
[[1. 1. 1. 1.]
 [1. 1. 1. 1.]], shape=(2, 4), dtype=float32)
tf.Tensor([0.        0.2928933], shape=(2,), dtype=float32)


In [45]:
def triplet_loss(templates, margin=0.4):
    
    anchor, positive, negative = templates
    
    positive_distance = cosine_distance(anchor,positive)
    negative_distance = cosine_distance(anchor,negative)
    
    basic_loss = positive_distance - negative_distance + margin
    loss = K.maximum(basic_loss,0.0)
    
    return loss

In [76]:
# Adopting the TensorFlow Functional API

def build_siamese_model(embedding_model):
    target_shape = embedding_model.input_shape[1:3] + (3,)
    anchor = Input(shape=target_shape, name='anchor_input')
    A = embedding_model(anchor)

    positive = Input(shape=target_shape, name='positive_input')
    P = embedding_model(positive)

    negative = Input(shape=target_shape, name='negative_input')
    N = embedding_model(negative)

    loss = Lambda(triplet_loss)([A, P, N])

    model = Model(inputs=[anchor,positive,negative],outputs=loss)
    
    return model

In [47]:
# Create a custom loss function since there are no ground truths label

def identity_loss(y_true, y_pred):
    return K.mean(y_pred)

In [49]:
facenet = load_facenet_model("facenet/facenet_weights.h5")
print(facenet.input_shape[1:3])

(160, 160)


In [50]:
model = build_siamese_model(facenet)

model.compile(loss=identity_loss, optimizer=optimizers.Adam(learning_rate=1e-4))

callbacks=[EarlyStopping(
    patience=2,
    verbose=1,
    restore_best_weights=True,
    monitor="val_loss",
)]

In [109]:
model.fit(
    train_dataset,
    epochs=50, 
    batch_size=64,
    validation_data=val_dataset,
    callbacks=callbacks
)

Epoch 1/50


ValueError: in user code:

    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1160, in train_function  *
        return step_function(self, iterator)
    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1146, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 1135, in run_step  **
        outputs = model.train_step(data)
    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\engine\training.py", line 993, in train_step
        y_pred = self(x, training=True)
    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\tjthanapat\anaconda3\envs\tf\lib\site-packages\keras\engine\input_spec.py", line 216, in assert_input_compatibility
        raise ValueError(

    ValueError: Layer "model" expects 3 input(s), but it received 1 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(3, 160, 160, 3) dtype=float32>]


In [61]:
len(val_dataset)

4