In [1]:
import os
import numpy as np
import random
import tensorflow as tf

from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.optimizers import Adam
from tensorflow.data import Dataset
AUTOTUNE = tf.data.experimental.AUTOTUNE

from sklearn.model_selection import train_test_split

%matplotlib inline
import matplotlib.pyplot as plt

IMG_HEIGHT = 224
IMG_WIDTH = 224

# path to folder in google drive, where task4_handout.zip is located
BASE_FOLDER = '/content/drive/My Drive/development/machine_learning/'

In [2]:
def load_drive():
  from google.colab import drive
  import sys
  from pathlib import Path
  drive.mount("/content/drive", force_remount=True)

  # path to base folder 
  base = Path(BASE_FOLDER)
  sys.path.append(str(base))

  zip_path = base/'task4_handout.zip'

  # unzip handout
  !cp '{zip_path}' .
  !unzip -q task4_handout.zip
  !rm task4_handout.zip

  # unzip image data
  !unzip -q food.zip
  !rm food.zip
  !rm food/.DS_Store

In [3]:
def file_count(fname):
  return sum(1 for line in open(fname))
        
def euclidean_distance(x, y):
  return tf.reduce_sum(tf.square(x - y), axis=1, keepdims=True)

def triplet_loss(y_true, y_pred):
  """custom loss function to reduce the distance of anchor and positive embeddings, while increasing the 
     distance of anchor and negative embeddings 
  """
  del y_true
  alpha = 0.2

  anchor = y_pred[:,0]
  positive = y_pred[:,1]
  negative = y_pred[:,2]

  pos_dist = euclidean_distance(anchor, positive)
  neg_dist = euclidean_distance(anchor, negative)
  ter_dist = euclidean_distance(positive, negative)

  basic_loss = pos_dist-.6*neg_dist -.4*ter_dist+alpha
  loss = tf.reduce_mean(tf.maximum(basic_loss,0))

  return loss

def accuracy(y_true, y_pred):
  """custom accuracy function defined as fraction of properly classified triplets
  """
  anchor = y_pred[:,0]
  positive = y_pred[:,1]
  negative = y_pred[:,2]

  pos_dist = euclidean_distance(anchor, positive)
  neg_dist = euclidean_distance(anchor, negative)

  return tf.reduce_mean(tf.cast(tf.greater_equal(neg_dist, pos_dist), tf.float32))

def prepare_data(triplets, validation_size=0.3):
  """create disjoint train and validation triplets
  """
  def check_disjoint(a, b):
    return set(a).isdisjoint(b)

  with open(triplets, 'r') as fin, open('train.txt', 'w') as foutTrain, open('validation.txt', 'w') as foutVal:
    lines = fin.readlines()
    imgset = [item for line in lines for item in line.split()]
    imgset = list(dict.fromkeys(imgset))

    train_lines, val_lines = train_test_split(imgset, test_size=validation_size, random_state=42)

    for line in lines:
      if check_disjoint(line.split(), val_lines):
        foutTrain.write(line)

      elif check_disjoint(line.split(), train_lines):
        foutVal.write(line)
      

def augment_triplet(triplet, label):
  """data augmentation for triplets
  """
  def augment_image(image):
    image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tf.image.random_saturation(image, 0.5, 1.5)
    image = tf.image.random_brightness(image, 32.0/255.0)
    image = tf.image.random_hue(image, 32.0/255.0)
    image = tf.image.random_contrast(image, 0.7, 1.3)
    return image
    
  a, p, n = triplet
  return (augment_image(a), augment_image(p), augment_image(n)), label


def load_triplet(triplet):

  triplet = tf.strings.split(triplet)

  anchor = load_image(triplet[0])
  positive = load_image(triplet[1])
  negative = load_image(triplet[2])

  return (anchor, positive, negative), 1

def create_dataset(triplets):
  """creates a tf.data.Dataframe based on a triplet text file
  """
  def load_image(img):
    filename = 'food/'+img+'.jpg'
    image = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.cast(image, tf.float32)
    image = preprocess_input(image)
    image = tf.image.resize(image, [IMG_HEIGHT, IMG_WIDTH])
    return image
  
  def load_triplet(triplet):
    triplet = tf.strings.split(triplet)

    anchor = load_image(triplet[0])
    positive = load_image(triplet[1])
    negative = load_image(triplet[2])

    return (anchor, positive, negative), 1

  content = tf.data.TextLineDataset(triplets)
  ds = content.map(load_triplet, num_parallel_calls=AUTOTUNE)
  return ds
  
def show(image):
  plt.figure()
  plt.imshow(image)
  plt.axis('off')

In [4]:
def create_model(emb_size):
  base_model = MobileNetV2(input_shape=(IMG_HEIGHT,IMG_WIDTH,3), include_top=False, weights='imagenet')

  for layer in base_model.layers:
      layer.trainable = False

  x = layers.GlobalAveragePooling2D()(base_model.output)
  x = layers.Dropout(0.3)(x)
  x = layers.Dense(emb_size)(x)
  x = layers.Lambda(lambda vect: tf.math.l2_normalize(vect, axis=1))(x)

  encoder = Model(base_model.input, x)

  inA = layers.Input(shape=(IMG_HEIGHT,IMG_WIDTH,3))
  inB = layers.Input(shape=(IMG_HEIGHT,IMG_WIDTH,3))
  inC = layers.Input(shape=(IMG_HEIGHT,IMG_WIDTH,3))

  encA = encoder(inA)
  encB = encoder(inB)
  encC = encoder(inC)

  stacked = layers.Lambda(lambda vects: tf.stack(vects, axis=1))([encA, encB, encC])
  model = Model((inA, inB, inC), stacked)

  return model

def create_classifier(model):
  def classify(encoded):
    a = encoded[:,0]
    b = encoded[:,1]
    c = encoded[:,2]

    pos_dist = euclidean_distance(a, b)
    neg_dist = euclidean_distance(a, c)

    return tf.cast((tf.greater_equal(neg_dist, pos_dist)), tf.float32)

  x = layers.Lambda(classify)(model.output)
  classifier = Model(model.input, x)
  
  return classifier

In [5]:
# --------------------
# data preparation
# --------------------

load_drive()

prepare_data('train_triplets.txt', validation_size=0.25)
triplets_train = file_count('train.txt')
triplets_validate = file_count('validation.txt')
print('train triplets:', triplets_train, '\nvalidate triplets:',triplets_validate, '\ntotal:', triplets_train+triplets_validate)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive
train triplets: 25378 
validate triplets: 858 
total: 26236


In [8]:
# --------------------
# train
# --------------------

train_batch_size = 32
validation_batch_size = 128
num_epochs = 6
train_len = file_count('train.txt')
validation_len = file_count('validation.txt')
train_steps = int(np.ceil(train_len / float(train_batch_size)))
validation_steps = int(np.ceil(validation_len / float(validation_batch_size)))

train_ds = create_dataset('train.txt')
train_ds = (train_ds
    .repeat()
    .shuffle(buffer_size=1000)
    .map(augment_triplet, num_parallel_calls=AUTOTUNE) # augment image data to prevent overfitting
    .batch(train_batch_size)
    .prefetch(AUTOTUNE)
)

validation_ds = create_dataset('validation.txt')
validation_ds = (validation_ds
    .repeat()
    .batch(validation_batch_size)
)


model = create_model(128)
model.compile(optimizer=Adam(lr=0.0005), loss=triplet_loss, metrics=[accuracy])

hist = model.fit(train_ds,
                 steps_per_epoch=train_steps,
                 epochs=num_epochs, 
                 validation_data=validation_ds,
                 validation_steps=validation_steps
                 )

Epoch 1/2
Epoch 2/2


In [10]:
# --------------------
# predict
# --------------------

test_steps = int(np.ceil(file_count('test_triplets.txt')/128.0))
test_ds = create_dataset('test_triplets.txt').batch(128)
classifier = create_classifier(model)
y_pred = classifier.predict(test_ds, verbose=1, steps=test_steps)
np.savetxt('submission.txt', y_pred, fmt='%d')

