## TASK 3
#### --> New try: CNN new model with Mobile Net
Inspired by this tutorial by keras: https://keras.io/examples/vision/siamese_network/

Notebook to be run in Google Colab for computational efficiency

In [None]:
#Import necessary libraries
import pandas as pd
import numpy as np
import pathlib
import os
import random
import keras
from keras.preprocessing import image
from keras.models import Model
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input
from keras.callbacks import EarlyStopping
import tensorflow as tf
from sklearn.model_selection import train_test_split
from google.colab import drive


In [None]:
#Autorize access to files in the Google Drive (needed for the Colab notebook to work)
drive.mount('/content/drive')

In [None]:
#Define the essential constants
height = 224
width = 224
epochs = 5
test_size = 59544

In [None]:
#Define the necessary functions for preprocessing, training and evaluation 

def preprocess_triplet(triplet, train):
    #Loading and preprocessing of triplets of images
    index = tf.strings.split(triplet)
    triplet = []
    for k in range(3):
        image = tf.io.read_file('drive/My Drive/Colab Notebooks/food/' + index[k] + '.jpg')
        image = tf.image.decode_jpeg(image, channels=3)
        image = tf.cast(image, tf.float32)
        image = tf.image.resize(image, (height, width))
        image = tf.keras.applications.mobilenet_v3.preprocess_input(image)
        triplet.append(image)
    if train:
        return tf.stack(triplet, axis=0), 1
    else:
        return tf.stack(triplet, axis=0)

def load_data(data, train=True):
    data = tf.data.TextLineDataset(data)
    data = data.map(lambda triplet: preprocess_triplet(triplet, train), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    return data

def compute_distances(outputs):
    distance_plus = tf.reduce_sum(tf.square(outputs[..., 0] - outputs[..., 1]), axis=1)
    distance_minus = tf.reduce_sum(tf.square(outputs[..., 0] - outputs[..., 2]), axis=1)
    return distance_plus, distance_minus

def triplet_loss(_, outputs):
    distance_plus, distance_minus = compute_distances(outputs)
    return tf.reduce_mean(tf.math.softplus(distance_plus - distance_minus))

def accuracy(_, outputs):
    distance_plus, distance_minus = compute_distances(outputs)
    return tf.reduce_mean(tf.cast(tf.greater_equal(distance_minus, distance_plus), tf.float32))

In [None]:
#Split the dataset into training and validation sets
with open('drive/My Drive/Colab Notebooks/train_triplets.txt', 'r') as file:
    triplets = [line for line in file.readlines()]

train, val = train_test_split(triplets, test_size=0.2)
train_size = len(train)

with open('drive/My Drive/Colab Notebooks/train_set.txt', 'w') as file:
    for triplet in train:
        file.write(triplet)

with open('drive/My Drive/Colab Notebooks/val_set.txt', 'w') as file:
    for triplet in val:
        file.write(triplet)

train_dataset = load_data('drive/My Drive/Colab Notebooks/train_set.txt')
val_dataset = load_data('drive/My Drive/Colab Notebooks/val_set.txt')
test_dataset = load_data('drive/My Drive/Colab Notebooks/test_triplets.txt', train=False)


In [None]:
#Define architecture of the model using a pretrained model (Large minimalistic MobileNet --> https://keras.io/api/applications/mobilenet/)
pretrained_model = tf.keras.applications.MobileNetV3Large(include_top=False, minimalistic=True, input_shape=(height, width, 3))
pretrained_model.trainable = False
custom_layers = tf.keras.Sequential([
                                    tf.keras.layers.GlobalAveragePooling2D(),
                                    tf.keras.layers.Dropout(0.3),
                                    tf.keras.layers.Flatten(),
                                    tf.keras.layers.Dense(128, activation=None),
                                    tf.keras.layers.Lambda(lambda t: tf.math.l2_normalize(t, axis=1))
                                    ])
                            
inputs = tf.keras.Input(shape=(3, height, width, 3))
output_triplet = []
for k in range(3):
    output_triplet.append(custom_layers(pretrained_model(inputs[:, k, ...])))
output = tf.stack(output_triplet, axis=-1)
model = tf.keras.Model(inputs=inputs, outputs=output)
model.summary()

In [None]:
#Compile and training of the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss=triplet_loss, metrics=[accuracy])
train_dataset = train_dataset.shuffle(1024, reshuffle_each_iteration=True).repeat().batch(32)
val_dataset = val_dataset.batch(32)
model.fit(train_dataset, steps_per_epoch=int(train_size/32), epochs=epochs, validation_data=val_dataset)

In [None]:
#Evaluation of the model and predictions
distance_positive, distance_negative = compute_distances(model.output)
preds = tf.cast(tf.greater_equal(distance_negative, distance_positive), tf.int8)
pred_model = tf.keras.Model(inputs=model.inputs, outputs=preds)

test_dataset = test_dataset.batch(64).prefetch(2)
preds = pred_model.predict(test_dataset, steps=int(test_size/64), verbose=1)

In [None]:
preds

In [None]:
#Saving the predictions in the correct format
np.savetxt('drive/My Drive/Colab Notebooks/sub.txt', preds, fmt='%i')