In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from keras.initializers import glorot_uniform
from keras.applications.resnet50 import ResNet50, preprocess_input
from keras import layers
from keras import Sequential
from dataset import create_image_path_ds, preprocess_image
from OneM_tripless import build_txt


In [None]:
def create_ds(i):
  positive_train_1 = create_image_path_ds(f'./lfw/lists/0{i}_train_same.txt','./lfw/faces_png/')
  negative_train_1 = create_image_path_ds(f'./lfw/lists/0{i}_train_diff.txt','./lfw/faces_png/')
  positive_test_1 = create_image_path_ds(f'./lfw/lists/0{i}_test_same.txt','./lfw/faces_png/')
  negative_test_1 = create_image_path_ds(f'./lfw/lists/0{i}_test_diff.txt','./lfw/faces_png/')
  train_ds = positive_train_1.concatenate(negative_train_1)
  val_ds = positive_test_1.concatenate(negative_test_1)
  return train_ds, val_ds
def create_ds_1m():
  positive_train_1 = create_image_path_ds(f'./train_same.txt','./siamese/')
  negative_train_1 = create_image_path_ds(f'./train_diff.txt','./siamese/')
  positive_test_1 = create_image_path_ds(f'./test_same.txt','./siamese/')
  negative_test_1 = create_image_path_ds(f'./test_diff.txt','./siamese/')
  train_ds = positive_train_1.concatenate(negative_train_1)
  val_ds = positive_test_1.concatenate(negative_test_1)
  return train_ds, val_ds
# build_txt('same', './train_same.txt', 4000)
# build_txt('diff', './train_diff.txt', 4000)
# build_txt('same', './test_same.txt', 1000)
# build_txt('diff', './test_diff.txt', 1000)


In [None]:
train_ds, val_ds = create_ds(1)
for i in range(2, 10):
  train_ds_1, val_ds_1 = create_ds(i)
  train_ds = train_ds.concatenate(train_ds_1)
  val_ds = val_ds.concatenate(val_ds_1)

In [None]:
for i,j,k in train_ds.take(1):
  print(i,j,k)

In [None]:
IMG_SIZE = 224

def process(ds):
  ds = ds.map(lambda x1, x2, y: (preprocess_image(x1), preprocess_image(x2), y))
  ds = ds.map(lambda x1, x2, y: (preprocess_input(x1), preprocess_input(x2), y))
  return ds

train_ds = process(train_ds)
val_ds = process(val_ds)

In [None]:
train_ds = train_ds.shuffle(buffer_size=1024)
train_ds.element_spec

(TensorSpec(shape=(224, 224, 3), dtype=tf.float32, name=None),
 TensorSpec(shape=(224, 224, 3), dtype=tf.float32, name=None),
 TensorSpec(shape=(), dtype=tf.float32, name=None))

In [None]:
train_ds = train_ds.batch(64)
val_ds = val_ds.batch(64)


In [None]:
train_ds = train_ds.map(lambda x1,x2,y: ((x1,x2),y))
val_ds = val_ds.map(lambda x1,x2,y: ((x1,x2),y))

In [None]:
from keras.models import Model

res50 = ResNet50(
    include_top=True,
    weights='imagenet',
    input_tensor=None,
    input_shape=None,
    pooling='avg',
    classes=1000
)

for layer in res50.layers:
    layer.trainable = False
embedding = Sequential([
        Model(inputs=res50.input, outputs=res50.layers[-2].output, name='res50_extractor'),
        layers.Flatten(),
        layers.Dense(2048, activation='relu', name='fc_rd0'),
        layers.BatchNormalization(name='fc_db_rd1'),
        layers.Dense(512, activation='sigmoid', name='fc_rd1'),
        layers.BatchNormalization(name='fc_db_rd2'),
        layers.Dense(128, activation='sigmoid', name='fc_rd2'),
        layers.LayerNormalization(name='fc_db_rd3', axis=1),
    ])
embedding.summary()

In [None]:
@tf.keras.saving.register_keras_serializable()
class L1Dist(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, anchor_embedding, validation_embedding):
        return tf.math.reduce_euclidean_norm(anchor_embedding - validation_embedding, axis=1)


In [None]:
anchor_input = layers.Input(shape=(224,224,3), name='anchor_input')
validation_input = layers.Input(shape=(224,224,3), name='validation_input')


l1_layer = L1Dist(name='l1_distance')
distances = l1_layer(embedding(anchor_input), embedding(validation_input))

model = Model(inputs=[anchor_input, validation_input], outputs=distances)
model.summary()

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
import gc
epochs = 5
# train(train_ds, epochs)
checkpoint = tf.train.Checkpoint(model)

for i in range(10):
  save_path = checkpoint.save('./tmp/training_checkpoints_lfw')
  gc.collect()
  print(save_path)
  history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=epochs,
    batch_size=64
  )

In [None]:
model.save('siamese_1M_2.keras')

In [None]:
img_a = preprocess_image('./WIN_20231231_04_09_40_Pro.jpg',(IMG_SIZE, IMG_SIZE))
img_b = preprocess_image('./WIN_20231231_03_50_45_Pro.jpg',(IMG_SIZE, IMG_SIZE))
img_c = preprocess_image('./WIN_20231231_04_15_02_Pro.jpg',(IMG_SIZE, IMG_SIZE))
img_d = preprocess_image('./WIN_20231231_03_50_38_Pro.jpg',(IMG_SIZE, IMG_SIZE))

i_a = np.expand_dims(img_a, axis=0)
i_b = np.expand_dims(img_b, axis=0)
i_c = np.expand_dims(img_c, axis=0)
i_d = np.expand_dims(img_d, axis=0)

plt.subplot(1,3,1)
plt.imshow(img_d)
plt.subplot(1,3,2)
plt.imshow(img_b)
plt.subplot(1,3,3)
plt.imshow(img_c)
plt.show()

print(model.predict([i_b, i_a]))
print(model.predict([i_a, i_c]))