<a href="https://colab.research.google.com/github/oktaviacitra/deep-metric-learning/blob/main/CIFAR10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive

origin_path = "/content/drive"
drive.mount(origin_path)
folder_path = origin_path + "/MyDrive/Learning Journey/CIFAR10"

Mounted at /content/drive


In [2]:
from tensorflow.keras.datasets.cifar10 import load_data

(x_train, y_train), (x_test, y_test) = load_data()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [3]:
from tensorflow.image import per_image_standardization
from tensorflow.keras.utils import to_categorical

x_train, x_test = per_image_standardization(x_train).numpy(), per_image_standardization(x_test).numpy()
y_train, y_test = to_categorical(y_train), to_categorical(y_test)

In [4]:
x = x_train.copy()
y = y_train.copy()

In [5]:
!pip install -q -U keras-tuner

[K     |████████████████████████████████| 135 kB 32.9 MB/s 
[K     |████████████████████████████████| 1.6 MB 62.4 MB/s 
[?25h

In [9]:
def data_generator(batch_size=256):
  while True:
    x = create_batch(batch_size)
    y = np.zeros((batch_size, 3*512))
    yield x, y

def create_batch(batch_size=256, split="train"):
  x_anchors = np.zeros((batch_size, 32, 32, 3))
  x_positives = np.zeros((batch_size, 32, 32, 3))
  x_negatives = np.zeros((batch_size, 32, 32, 3))
  
  x = x_train if split == "train" else x_val
  y = y_train if split == "train" else y_val
  
  for i in range(0, batch_size):
      random_index = random.randint(0, x.shape[0] - 1)
      x_anchor = x[random_index]
      y = y[random_index]
      
      indices_for_pos = np.squeeze(np.where(y == y))
      indices_for_neg = np.squeeze(np.where(y != y))
      
      x_positive = x[indices_for_pos[random.randint(0, int(indices_for_pos.size - 1))]]
      x_negative = x[indices_for_neg[random.randint(0, int(indices_for_neg.size - 1))]]
      
      x_anchors[i] = x_anchor
      x_positives[i] = x_positive
      x_negatives[i] = x_negative
      
  return [x_anchors, x_positives, x_negatives]

In [23]:
import keras_tuner as kt
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.layers import Input,Flatten, GlobalAveragePooling2D, GlobalMaxPool2D, concatenate, Input
from tensorflow.keras.models import Model
import random

class HyperModel(kt.HyperModel):

  def triplet_loss(y_true, y_pred):
    anchor, positive, negative = y_pred[:,:512], y_pred[:,512:2*512], y_pred[:,2*512:]
    positive_dist = tf.reduce_mean(tf.square(anchor - positive), axis=1)
    negative_dist = tf.reduce_mean(tf.square(anchor - negative), axis=1)

    return tf.maximum(positive_dist - negative_dist + 0.2, 0.)

  def build(self, hp):
    vgg16 = VGG16(include_top=False,
                      weights=None,
                      input_shape=(32, 32, 3))
        
    vector = vgg16.output
    vector = Flatten(name="flatten")(vector)
    embedding_model = Model(inputs=vgg16.input, outputs=vector)

    input_anchor = Input(shape=(32, 32, 3))
    input_positive = Input(shape=(32, 32, 3))
    input_negative = Input(shape=(32, 32, 3))

    embedding_anchor = embedding_model(input_anchor)
    embedding_positive = embedding_model(input_positive)
    embedding_negative = embedding_model(input_negative)

    input = [input_anchor, input_positive, input_negative]
    output = concatenate([embedding_anchor, embedding_positive, embedding_negative], axis=1)

    model = Model(input, output, name="dml")

    learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4, 1e-5])
    optimizer = hp.Choice('optimizer', values=[Adam(learning_rate=learning_rate), SGD(learning_rate=learning_rate)])

    model.compile(loss=self.triplet_loss, optimizer=optimizer)

    return model

  def fit(self, hp, model, *args, **kwargs):
    return model.fit(*args,
                     self.data_generator(2048),
                     epochs=3,
                     verbose=False,
                     **kwargs,
                     )

In [17]:
import kerastuner
import numpy as np
from sklearn.model_selection import KFold

class CVTuner(kerastuner.engine.tuner.Tuner):
  def run_trial(self, trial, x, y, batch_size=32, epochs=1):
    cv = KFold(5)
    val_losses = []
    for train_indices, test_indices in cv.split(x):
      x_train, x_test = x[train_indices], x[test_indices]
      y_train, y_test = y[train_indices], y[test_indices]
      model = self.hypermodel.build(trial.hyperparameters)
      model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs)
      val_losses.append(model.evaluate(x_test, y_test))
    self.oracle.update_trial(trial.trial_id, {'val_loss': np.mean(val_losses)})
    self.save_model(trial.trial_id, model)

tuner = CVTuner(hypermodel=HyperModel(),
                oracle=kerastuner.oracles.BayesianOptimization(
                    objective='val_loss',
                    max_trials=2))
tuner.search(x, y, batch_size=64, epochs=30)

  """Entry point for launching an IPython kernel.


ValueError: ignored

In [25]:
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
import numpy as np

kf = KFold(n_splits = 5)

VAL_ACC = []
VAL_LOSS = []
fold_count = 1


for train_indices, val_indices in kf.split(x,y):
  x_train = np.array([x[train_index] for train_index in train_indices])
  y_train = np.array([y[train_index] for train_index in train_indices])

  x_val = np.array([x[val_index] for val_index in val_indices])
  y_val = np.array([y[val_index] for val_index in val_indices])

  tuner = kt.RandomSearch(HyperModel(),
                          objective="val_loss",
                          max_trials=2,
                          overwrite=True,
                          directory=folder_path +"/dir",
                          project_name="tune_hypermodel"
                          )
  
  callbacks = [
      ModelCheckpoint(folder_path + "/tuner/{epoch:02d}-{val_loss:.4f}.h5",
                      monitor="val_loss",
                      mode="min",
                      save_best_only=True,
                      verbose=1),
      EarlyStopping(monitor='val_loss', patience=5)
  ]

  tuner.search(x_train, y_train, epochs=3,
              validation_data=(x_val, y_val),
              callbacks=callbacks,
              verbose=2)

TypeError: ignored

https://medium.com/the-owl/k-fold-cross-validation-in-keras-3ec4a3a00538