In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from datetime import datetime
import argparse
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model, Input
from tensorflow.keras.applications import resnet

In [23]:
def preprocess_image(filename, target_shape=(224, 224)):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """

    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    image = tf.reshape(image, (1,) + target_shape + (3,))
    image = resnet.preprocess_input(image)
    return image

In [38]:
# Handling data

train = np.loadtxt("handout/train_triplets.txt", dtype=str, delimiter=" ")
train = train[:2]

anchor_images = [f"handout/food/{number}.jpg" for number in list(train[:, 0])]
positive_images = [f"handout/food/{number}.jpg" for number in list(train[:, 1])]
negative_images = [f"handout/food/{number}.jpg" for number in list(train[:, 2])]

image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)

anchor_dataset = anchor_dataset.map(preprocess_image)
positive_dataset = positive_dataset.map(preprocess_image)
negative_dataset = negative_dataset.map(preprocess_image)

one_anchor_dataset = anchor_dataset.take(round(.5 * image_count))
two_anchor_dataset = anchor_dataset.skip(round(.5 * image_count))
one_positive_dataset = positive_dataset.take(round(.5 * image_count))
two_positive_dataset = positive_dataset.skip(round(.5 * image_count))
one_negative_dataset = negative_dataset.take(round(.5 * image_count))
two_negative_dataset = negative_dataset.skip(round(.5 * image_count))

one_dataset = tf.data.Dataset.zip((one_anchor_dataset, one_positive_dataset, one_negative_dataset, tf.data.Dataset.from_tensor_slices(np.ones(len(one_anchor_dataset)).reshape(-1, 1))))
two_dataset = tf.data.Dataset.zip((two_anchor_dataset, two_negative_dataset, two_positive_dataset, tf.data.Dataset.from_tensor_slices(np.zeros(len(two_anchor_dataset)).reshape(-1, 1))))
dataset = one_dataset.concatenate(two_dataset)
dataset = dataset.shuffle(buffer_size=1024)

train_dataset = dataset.take(round(.8 * image_count))
validation_dataset = dataset.skip(round(.8 * image_count))

X_train = train_dataset.map(lambda anchor, positive, negative, label: (anchor, positive, negative))
X_validation = validation_dataset.map(lambda anchor, positive, negative, label: (anchor, positive, negative))
y_train = train_dataset.map(lambda anchor, positive, negative, label: label)
y_validation = validation_dataset.map(lambda anchor, positive, negative, label: label)

train_dataset = tf.data.Dataset.zip((X_train, y_train))
validation_dataset = tf.data.Dataset.zip((X_validation, y_validation))

In [39]:
# Model

class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)

target_shape = (224, 224)

base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(anchor_input),
    embedding(positive_input),
    embedding(negative_input),
)

layer = layers.Dense(2, activation="relu")(tf.stack(distances, axis=1))
layer = layers.Dense(1, activation="sigmoid", name="BinaryPrediction")(layer)

model = Model(inputs=[anchor_input, positive_input, negative_input], outputs=layer)

In [40]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(train_dataset, epochs=1, validation_data=validation_dataset) 

