In [1]:
import tensorflow as tf
tf.random.set_seed(42)
tf.config.run_functions_eagerly(True)
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
import shutil

#from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Activation, Dense, Dropout, Conv2D, MaxPooling2D, Flatten, Concatenate, BatchNormalization
from tensorflow.keras.constraints import MaxNorm
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import load_img, img_to_array, array_to_img
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import mobilenet_v2

In [2]:
train_triplets_path = '../input/triplets/train_triplets.txt'
test_triplets_path = '../input/triplets/test_triplets.txt'
food_path = '/content/drive/MyDrive/ml_task4/food.zip'
image_path = '../input/food-img/food'

In [3]:
train_triplets = pd.read_csv(train_triplets_path, delim_whitespace=True, header=None, names =['anchor','positive','negative'], dtype='str')
test_triplets = pd.read_csv(test_triplets_path, delim_whitespace=True, header=None, names =['anchor','positive','negative'], dtype='str')

In [7]:
# IMG_WIDTH = 96
# IMG_HEIGHT = 96

def load_image(img, training):
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.cast(img, tf.float32)
    img = img / 127.5 - 1
    img = tf.image.resize(img, (IMG_HEIGHT, IMG_WIDTH))
    if training:
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_flip_up_down(img)
    return img


def load_triplets(triplet, training):
    ids = tf.strings.split(triplet)
    anchor = load_image(tf.io.read_file(image_path + '/' + ids[0] + '.jpg'), training)
    truthy = load_image(tf.io.read_file(image_path + '/'  + ids[1] + '.jpg'), training)
    falsy = load_image(tf.io.read_file(image_path + '/'  + ids[2] + '.jpg'), training)
    if training:
        return tf.stack([anchor, truthy, falsy], axis=0), 1
    else:
        return tf.stack([anchor, truthy, falsy], axis=0)

def make_dataset(dataset_filename, training=True):
  #makes dataset from text with triplets
  dataset = tf.data.TextLineDataset(dataset_filename)

  dataset = dataset.map(
    lambda triplet: load_triplets(triplet, training),
    num_parallel_calls=tf.data.experimental.AUTOTUNE)
  return dataset

<ParallelMapDataset shapes: ((3, 96, 96, 3), ()), types: (tf.float32, tf.int32)>

In [15]:
target_shape = (224, 224)
IMG_WIDTH = 224
IMG_HEIGHT = 224
#target_shape = (200,200)

def preprocess_image(filename):
  image_string = tf.io.read_file(image_path + '/' + filename + '.jpg')
  image = tf.image.decode_jpeg(image_string, channels=3)
  image = tf.image.convert_image_dtype(image, tf.float32)

  image = tf.image.resize(image, target_shape)
  return image

def preprocess_triplets(anchor, positive, negative):
    anchor = preprocess_image(anchor)
    positive = preprocess_image(positive)
    negative = preprocess_image(negative)
    
    return tf.stack([anchor, positive, negative], axis=0), 1

In [5]:
anchor_images = train_triplets['anchor']
positive_images = train_triplets['positive']
negative_images = train_triplets['negative']
image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024,reshuffle_each_iteration=True)
dataset = dataset.map(preprocess_triplets)

#makes a tuple of tensors of shape (3,96,96,3)

train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))

train_dataset = train_dataset.repeat()
train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(8)

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(8)

  "Even though the tf.config.experimental_run_functions_eagerly "


In [6]:
dataset

<MapDataset shapes: ((3, 224, 224, 3), ()), types: (tf.float32, tf.int32)>

In [11]:
def create_model():
  base_cnn = tf.keras.applications.MobileNetV2(weights="imagenet", input_shape=target_shape + (3,), include_top=False)

  #flatten = layers.Flatten()(base_cnn.output)
  flatten = tf.keras.layers.GlobalAveragePooling2D()(base_cnn.output)
  dense1 = layers.Dense(128, activation="relu")(flatten)
  output = layers.Lambda(lambda t: tf.math.l2_normalize(t, axis=1))(dense1)
  
  base_cnn.trainable = False 
  embedding = Model(base_cnn.input, output, name="Embedding")

  inputs = Input(shape=(3, IMG_HEIGHT, IMG_WIDTH, 3))
  anchor, positive, negative = inputs[:, 0, ...], inputs[:, 1, ...], inputs[:, 2, ...]
#   anchor_input = Input(name="anchor", shape=(IMG_HEIGHT, IMG_HEIGHT, 3))
#   positive_input = Input(name="positive", shape=(IMG_HEIGHT, IMG_HEIGHT, 3))
#   negative_input = Input(name="negative", shape=(IMG_HEIGHT, IMG_HEIGHT, 3))

  anchor_embedding = embedding(mobilenet_v2.preprocess_input(anchor))
  positive_embedding = embedding(mobilenet_v2.preprocess_input(positive))
  negative_embedding = embedding(mobilenet_v2.preprocess_input(negative))

  embeddings = tf.stack([anchor_embedding, positive_embedding, negative_embedding], axis=-1)
  siamese_network = Model(inputs=inputs, outputs=embeddings)
  siamese_network.summary()
  return siamese_network

In [12]:
def compute_distances(embeddings):
  #change this to triplet output in the future
  anchor, positive, negative = embeddings[..., 0], embeddings[..., 1], embeddings[..., 2]
  ap_distance = tf.reduce_sum(tf.square(anchor - positive), 1)
  an_distance = tf.reduce_sum(tf.square(anchor - negative), 1)
  return (ap_distance, an_distance)

def triplet_loss(_, embeddings):
  ap_distance, an_distance = compute_distances(embeddings)
  #might want to change this to L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
  #softplus makes sure distance is positive, smooth approximation of ReLU
  return tf.reduce_mean(tf.math.softplus(ap_distance - an_distance))

def accuracy(_, embeddings):
  ap_distance, an_distance = compute_distances(embeddings)
  # equal to 1 if ap_distance <= an_distance, 0 else, calculates mean along all triplets
  return tf.reduce_mean(
    tf.cast(tf.greater_equal(an_distance, ap_distance), tf.float32))

In [16]:
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss=triplet_loss,
              metrics=[accuracy])

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 3, 224, 224, 0                                            
__________________________________________________________________________________________________
tf.__operators__.getitem_3 (Sli (None, 224, 224, 3)  0           input_4[0][0]                    
__________________________________________________________________________________________________
tf.__operators__.getitem_4 (Sli (None, 224, 224, 3)  0           input_4[0][0]                    
__________________________________________________________________________________________________
tf.__operators__.getitem_5 (Sli (None, 224, 224, 3)  0           input_4[0][0]                    
____________________________________________________________________________________________

In [None]:
train_dataset

In [17]:
history = model.fit(train_dataset, epochs=2, steps_per_epoch = image_count // 32, validation_data = val_dataset, validation_steps=10)

Epoch 1/2
Epoch 2/2


In [18]:
def preprocess_test_triplets(anchor, positive, negative):
    anchor = preprocess_image(anchor)
    positive = preprocess_image(positive)
    negative = preprocess_image(negative)
    
    return tf.stack([anchor, positive, negative], axis=0)

In [19]:
anchor_images_test = test_triplets['anchor']
positive_images_test = test_triplets['positive']
negative_images_test = test_triplets['negative']
image_count_test = len(anchor_images_test)

anchor_dataset_test = tf.data.Dataset.from_tensor_slices(anchor_images_test)
positive_dataset_test = tf.data.Dataset.from_tensor_slices(positive_images_test)
negative_dataset_test = tf.data.Dataset.from_tensor_slices(negative_images_test)

dataset_test = tf.data.Dataset.zip((anchor_dataset_test, positive_dataset_test, negative_dataset_test))
#dataset = dataset.shuffle(buffer_size=1024)
dataset_test = dataset_test.map(preprocess_test_triplets)

dataset_test = dataset_test.batch(32, drop_remainder=False)
dataset_test = dataset_test.prefetch(8)

In [20]:
dataset_test

<PrefetchDataset shapes: (None, 3, 224, 224, 3), types: tf.float32>

In [24]:
def create_inference_model(model):
    ap_distance, an_distance = compute_distances(model.output)
    predictions = tf.cast(tf.greater_equal(an_distance, ap_distance), tf.int8)
    return tf.keras.Model(inputs=model.inputs, outputs=predictions)

In [22]:
inference_model = create_inference_model(model)

In [23]:
 predictions = inference_model.predict(
        dataset_test,
        verbose=1)



In [26]:
# predictions_fixed = 1-predictions

In [28]:
# Create submission file

np.savetxt('submission.txt', predictions, fmt='%d')

TESTING THE KERAS TUTORIAL STUFF

In [None]:
# from tensorflow.keras.applications import resnet
# target_shape = (200, 200)

In [None]:
# base_cnn = resnet.ResNet50(
#     weights="imagenet", input_shape=target_shape + (3,), include_top=False
# )

# flatten = layers.Flatten()(base_cnn.output)
# dense1 = layers.Dense(512, activation="relu")(flatten)
# dense1 = layers.BatchNormalization()(dense1)
# dense2 = layers.Dense(256, activation="relu")(dense1)
# dense2 = layers.BatchNormalization()(dense2)
# output = layers.Dense(256)(dense2)

# embedding = Model(base_cnn.input, output, name="Embedding")

# trainable = False
# for layer in base_cnn.layers:
#     if layer.name == "conv5_block1_out":
#         trainable = True
#     layer.trainable = trainable

In [None]:
# class DistanceLayer(layers.Layer):
#     """
#     This layer is responsible for computing the distance between the anchor
#     embedding and the positive embedding, and the anchor embedding and the
#     negative embedding.
#     """

#     def __init__(self, **kwargs):
#         super().__init__(**kwargs)

#     def call(self, anchor, positive, negative):
#         ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
#         an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
#         return (ap_distance, an_distance)


# anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
# positive_input = layers.Input(name="positive", shape=target_shape + (3,))
# negative_input = layers.Input(name="negative", shape=target_shape + (3,))

# distances = DistanceLayer()(
#     embedding(resnet.preprocess_input(anchor_input)),
#     embedding(resnet.preprocess_input(positive_input)),
#     embedding(resnet.preprocess_input(negative_input)),
# )

# siamese_network = Model(
#     inputs=[anchor_input, positive_input, negative_input], outputs=distances
# )

In [None]:
# class SiameseModel(Model):
#     """The Siamese Network model with a custom training and testing loops.

#     Computes the triplet loss using the three embeddings produced by the
#     Siamese Network.

#     The triplet loss is defined as:
#        L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
#     """

#     def __init__(self, siamese_network, margin=0.5):
#         super(SiameseModel, self).__init__()
#         self.siamese_network = siamese_network
#         self.margin = margin
#         self.loss_tracker = metrics.Mean(name="loss")

#     def call(self, inputs):
#         return self.siamese_network(inputs)

#     def train_step(self, data):
#         # GradientTape is a context manager that records every operation that
#         # you do inside. We are using it here to compute the loss so we can get
#         # the gradients and apply them using the optimizer specified in
#         # `compile()`.
#         with tf.GradientTape() as tape:
#             loss = self._compute_loss(data)

#         # Storing the gradients of the loss function with respect to the
#         # weights/parameters.
#         gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

#         # Applying the gradients on the model using the specified optimizer
#         self.optimizer.apply_gradients(
#             zip(gradients, self.siamese_network.trainable_weights)
#         )

#         # Let's update and return the training loss metric.
#         self.loss_tracker.update_state(loss)
#         return {"loss": self.loss_tracker.result()}

#     def test_step(self, data):
#         loss = self._compute_loss(data)

#         # Let's update and return the loss metric.
#         self.loss_tracker.update_state(loss)
#         return {"loss": self.loss_tracker.result()}

#     def _compute_loss(self, data):
#         # The output of the network is a tuple containing the distances
#         # between the anchor and the positive example, and the anchor and
#         # the negative example.
#         ap_distance, an_distance = self.siamese_network(data)

#         # Computing the Triplet Loss by subtracting both distances and
#         # making sure we don't get a negative value.
#         loss = ap_distance - an_distance
#         loss = tf.maximum(loss + self.margin, 0.0)
#         return loss

#     @property
#     def metrics(self):
#         # We need to list our metrics here so the `reset_states()` can be
#         # called automatically.
#         return [self.loss_tracker]

In [None]:
# siamese_model = SiameseModel(siamese_network)
# siamese_model.compile(optimizer=optimizers.Adam(0.0001))
# siamese_model.fit(train_dataset, epochs=1, validation_data=val_dataset)

In [None]:
# dataset_test

In [None]:
# prediction = siamese_model.predict(dataset_test,verbose=1)

In [None]:
# np.savetxt('predictions.txt', predictions, fmt='%i')
# Create submission file
test_y_fixed = np.where(test_y < 0.5, 0, 1)

np.savetxt('submission.txt', test_y_fixed, fmt='%d')