# One-shot learning implementation for character recognition
Using the [omniglot dataset](https://github.com/brendenlake/omniglot/tree/master/python)

### TODO
- entrainer sans negatif ? 
- data augmentation ?
- comparer avec et sans
  - batch norm
  - batch norm au début
  - dropout
  - regul

## Setup phase
We install packages, make all imports, configure modules and download dataset

In [117]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [118]:
%%bash
pip install -q pyyaml
pip install tensorflow==2.0.0-beta1
pip install -q tensorflow-gpu==2.0.0-beta1



In [119]:
%%bash
git clone https://github.com/brendenlake/omniglot
mkdir datas
unzip -q omniglot/python/images_background.zip -d datas
unzip -q omniglot/python/images_evaluation.zip -d datas
# rm -R omniglot

fatal: destination path 'omniglot' already exists and is not an empty directory.
mkdir: cannot create directory ‘datas’: File exists
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: error:  invalid response [unzip -q ]
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: error:  invalid response [omniglot/]
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: error:  invalid response [python/im]
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: error:  invalid response [ages_eval]
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]one, [r]ename: error:  invalid response [uation.zi]
replace datas/images_background/Alphabet_of_the_Magi/character01/0709_01.png? [y]es, [n]o, [A]ll, [N]o

In [120]:
%load_ext tensorboard

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import datasets, layers, models
from tensorboard import notebook
from keras import backend as K
from IPython import display
import matplotlib.pyplot as plt
import matplotlib.pylab as pl
import pandas as pd
import numpy as np
import os, datetime, time, math, pathlib, itertools, random

keras = tf.keras
AUTOTUNE = tf.data.experimental.AUTOTUNE

print(tf.version.VERSION)
print(tf.keras.__version__)
print("GPU Available: ", tf.test.is_gpu_available())

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard
2.0.0-beta1
2.2.4-tf
GPU Available:  True


## Constants
This part will define how to build, train, and evaluate the model

In [121]:
#@markdown ## Data paths
DIR_TRAIN = "datas/images_background" #@param {type:"string"}
DIR_TEST = "datas/images_evaluation" #@param {type:"string"}
LOAD_FROM = ""  #@param ["", "save_dir", "/content/drive/My Drive/ml/weights/oneshot_chars/checkpoint_dense/weights.hdf5", "/content/drive/My Drive/ml/weights/oneshot_chars/checkpoint_conv/weights.hdf5", "/content/drive/My Drive/ml/weights/oneshot_chars/checkpoint_conv2/weights.hdf5"] {allow-input: true}
CHECKPOINTS_DIR = "drive/My Drive/ml/weights/oneshot_chars"
checkpoint_dir_name = "checkpoint_" + str(int(time.time()))


#@markdown ## Model configuration
MODEL_TYPE = "conv3" #@param ["same", "linear","dense", "conv", "conv2", "conv3"]
IMG_SIDE = 100 #@param {type:"slider", min:10, max:150, step:1}
IMG_SHAPE = (IMG_SIDE, IMG_SIDE)

#@markdown ## Training configuration
NB_EPOCHS = 100 #@param {type:"number"}
BATCH_SIZE = 32 #@param {type:"number"}
TRIPLETS_PER_IMAGE = 10 #@param {type:"number"}
LEARNING_RATE = 0.000000001 #@param {type:"number"}
MARGIN = 1 #@param {type:"number"}

#@markdown ## Evaluation configuration
ACCURACY_SAMPLE_SIZE = 400 #@param {type:"number"}

checkpoint_dir_name = "checkpoint_" + MODEL_TYPE
print("Saving in {} for this session".format(checkpoint_dir_name))
if LOAD_FROM:
  print("Loading weights from checkpoint {}".format(LOAD_FROM))

Saving in checkpoint_conv3 for this session


## General code
Helper functions

In [0]:
def getRandomIds(dataset, nMax=1000):
	ids = list(range(len(dataset[0][0])-1))
	random.shuffle(ids)
	ids = ids[:nMax]
	return ids + [i+1 for i in ids]

In [0]:
  def dist_fct(x, y):
    return np.sqrt(np.sum((x-y)**2))

In [0]:
def get_checkpoint_path(suffix=""):
  os.makedirs(os.path.join(CHECKPOINTS_DIR, checkpoint_dir_name), exist_ok=True)
  return os.path.join(
    CHECKPOINTS_DIR,
    checkpoint_dir_name,
    "weights" + suffix + ".hdf5"
  )

In [0]:
class Timer():
  def __init__(self, to_int = True):
    self.t = time.time()
    self.to_int = to_int
  
  def get(self, reset=True):
    t2 = time.time()
    d = t2 - self.t
    if self.to_int:
      d = int(d)
    if reset:
      self.t = t2
    return d

In [0]:
def plot_history(histories, key='binary_crossentropy'):
  plt.figure(figsize=(16,10))

  for name, history in histories:
    val = plt.plot(history.epoch, history.history['val_'+key],
                   '--', label=name.title()+' Val')
    plt.plot(history.epoch, history.history[key], color=val[0].get_color(),
             label=name.title()+' Train')

  plt.xlabel('Epochs')
  plt.ylabel(key.replace('_',' ').title())
  plt.legend()

  plt.xlim([0,max(history.epoch)])

In [0]:
def show_image(image):
	plt.imshow(image)
	plt.show()

## Import datas and pre-processing

In [0]:
def preprocess_image(image):
  image = tf.image.decode_image(image)
  image = tf.image.resize(image, IMG_SHAPE)
  image = (255.0 - image.numpy().astype(float) ) / 255.0
  image = image.reshape(IMG_SHAPE)
  return image

def load_and_preprocess_image(img_path):
  return preprocess_image(tf.io.read_file(str(img_path)))

In [0]:
def getDatasPaths(dir_path, max_alphabet=None):
  charSets = list(os.listdir(dir_path))
  if max_alphabet:
    charSets = max_alphabet[:max_alphabet]
  charClasses = [[(d, c) for c in os.listdir(os.path.join(dir_path, d))] for d in charSets]
  charClasses = list(itertools.chain(*charClasses))
  charDirs = [os.path.join(dir_path, d, c) for d, c in charClasses]
  imgInfos = [
      [(os.path.join(cls_path, img_f_name), label) for img_f_name in os.listdir(cls_path)]
      for cls_path, label in zip(charDirs, list(range(len(charDirs))))
  ]
  imgInfos = list(itertools.chain(*imgInfos))
  paths, labels = [[el[i] for el in imgInfos] for i in range(2)]
  cls_names = [" - ".join(cls) for cls in charClasses]
  return paths, labels, cls_names # len : nb images | nb images | nb classes

def loadDatas(dir_path, max_alphabet=None):
  paths, labels, cls_names = getDatasPaths(dir_path, max_alphabet=max_alphabet)
  images_datas = [load_and_preprocess_image(img_path) for img_path in paths]
      
  return images_datas, labels, cls_names

Now, we read all the datas

In [0]:
try:
  _ = train_images
except:
  train_images, train_labels, train_cls_names = loadDatas(DIR_TRAIN)
  test_images, test_labels, test_cls_names = loadDatas(DIR_TEST)

## Functions to feed datas to the network

First, helper functions to :
- get all images classed by label

In [0]:
def get_ids_per_cls(labels):
  ids_per_cls = []
  for i in range(len(labels)):
    while len(ids_per_cls) <= labels[i]:
      ids_per_cls.append([])
    ids_per_cls[labels[i]].append(i)
  return ids_per_cls

def sort_by_distance(l, anchor, only_ids=True):
  l2 = [(dist_fct(el, anchor), i) for i, el in enumerate(l)]
  l2.sort()
  if only_ids:
    return [i for d, i in l2]
  return [l[i] for d, i in l2]

The following function select random triplets to train the NN

In [0]:
def get_triplets_random(images, labels, trunk_model):
  nb_images = len(images)
  ids_per_cls = get_ids_per_cls(labels)
  triplets = []
  
  for i_anchor in range(nb_images):
    same_cls = [i for i in ids_per_cls[labels[i_anchor]] if i != i_anchor]
    for _ in range(TRIPLETS_PER_IMAGE):
      i_positive, i_negative = random.choice(same_cls), i_anchor
      while labels[i_negative] == labels[i_anchor]:
        i_negative = random.randint(0, nb_images-1)
      triplets.append([i_anchor, i_positive, i_negative])
  
  return triplets

This functions try to select triplets better than random ones. At first, we run the NN on all examples. Then, we try to select triplets with a positive far from the anchor, and a negative close to it.

In [0]:
# %%time
NB_CENTERS_PER_IMAGE = 3
# FACT_RANDOM = 3
# FACT_RANDOM_POSITIVE = 2

def get_triplets_dists(images, labels, trunk_model):
  timer = Timer()
  coords = trunk_model.predict(np.array(images))
  ids_per_cls = get_ids_per_cls(labels)
  centers = [np.mean([coords[i] for i in ids_per_cls[lab]], axis=0) for lab in range(len(ids_per_cls))]

  centers_away_from_cls = [[] for _ in range(len(ids_per_cls))]
  for i_cls in range(len(ids_per_cls)):
    centers_sorted = sort_by_distance(centers, centers[i_cls])
    centers_sorted = [i_center for i_center in centers_sorted if i_center != i_cls][:NB_CENTERS_PER_IMAGE]
    centers_away_from_cls[i_cls] = centers_sorted
  
  triplets = []
  useful, unuseful = 0, 0
  for anchor in range(len(images)):
    same_cls = [i for i in ids_per_cls[labels[anchor]] if i != anchor] or [anchor]
    # positives = [random.choice(same_cls) for _ in range(TRIPLETS_PER_IMAGE)]
    positives_order = sort_by_distance([coords[i] for i in same_cls], anchor)[::-1]
    positives = [same_cls[i] for i in positives_order]
    # random.shuffle(positives)

    centers_taken = centers_away_from_cls[labels[anchor]]
    negatives = list(itertools.chain(*[ids_per_cls[i_cls] for i_cls in centers_taken]))
    negatives_order = sort_by_distance([coords[i] for i in negatives], anchor)
    negatives = [negatives[i] for i in negatives_order]
    # negatives = negatives[:FACT_RANDOM*TRIPLETS_PER_IMAGE]
    # random.shuffle(negatives)

    for i in range(TRIPLETS_PER_IMAGE):
      i_positive, i_negative = i%len(positives), i%len(negatives)
      dist_diff = dist_fct(coords[anchor], coords[positives[i_positive]]) - dist_fct(coords[anchor], coords[negatives[i_negative]])
      if dist_diff + MARGIN < 0:
        unuseful += 1
      else:
        useful += 1
        triplets.append([anchor, positives[i_positive], negatives[i_negative]])
      # print(triplets[-1], [labels[j] for j in triplets[-1]])
  
  print("\ntriplets computed", timer.get(), "s", "(useful, unuseful) =", (useful, unuseful), "({:.2f}%)".format(useful / (useful + unuseful) * 100))

  return triplets

# triplets = get_triplets_dists(train_images, train_labels, trunk_model) 

# %%time
# NB_CENTERS_PER_IMAGE = 3
# # FACT_RANDOM = 3
# # FACT_RANDOM_POSITIVE = 2

# def get_triplets_dists(images, labels, trunk_model):
#   timer = Timer()
#   coords = trunk_model.predict(np.array(images))
#   ids_per_cls = get_ids_per_cls(labels)
#   centers = [np.mean([coords[i] for i in ids_per_cls[lab]]) for lab in range(len(ids_per_cls))]

#   centers_away_from_cls = [[] for _ in range(len(ids_per_cls))]
#   for i_cls in range(len(ids_per_cls)):
#     centers_sorted = sort_by_distance(centers, centers[i_cls])
#     centers_sorted = [i_center for i_center in centers_sorted if i_center != i_cls][:NB_CENTERS_PER_IMAGE]
#     centers_away_from_cls[i_cls] = centers_sorted
  
#   triplets = []
#   useful, unuseful = 0, 0
#   for anchor in range(len(images)):
#     same_cls = [i for i in ids_per_cls[labels[anchor]] if i != anchor]
#     # positives = [random.choice(same_cls) for _ in range(TRIPLETS_PER_IMAGE)]
#     positives_order = sort_by_distance([coords[i] for i in same_cls], anchor)[::-1]
#     positives = [same_cls[i] for i in positives_order]
#     # random.shuffle(positives)

#     centers_taken = centers_away_from_cls[labels[anchor]]
#     negatives = list(itertools.chain(*[ids_per_cls[i_cls] for i_cls in centers_taken]))
#     negatives_order = sort_by_distance([coords[i] for i in negatives], anchor)
#     negatives = [negatives[i] for i in negatives_order]
#     # negatives = negatives[:FACT_RANDOM*TRIPLETS_PER_IMAGE]
#     # random.shuffle(negatives)

#     for i in range(TRIPLETS_PER_IMAGE):
#       if dist_fct(coords[anchor], coords[positives[i]]) - dist_fct(coords[anchor], coords[negatives[i]]) + MARGIN < 0:
#         unuseful += 1
#       else:
#         useful += 1
#         triplets.append([anchor, positives[i], negatives[i]])
#       # print(triplets[-1], [labels[j] for j in triplets[-1]])
  
#   print("\ntriplets computed", timer.get(), "s", "(useful, unuseful) =", (useful, unuseful), "({:.2f}%)".format(useful / (useful + unuseful) * 100))

#   return triplets

# # triplets = get_triplets_dists(train_images, train_labels, trunk_model)

Now, we need a function to generate triplets during the training process. This function will be called by `fit_generator`

In [0]:
def create_triplet_generator(images, labels, trunk_model, triplets_getter, batch_size):
  triplets = []
  cur_triplet = 0
  while True:
    if cur_triplet + batch_size > len(triplets):
      triplets = triplets_getter(images, labels, trunk_model)
      random.shuffle(triplets)
      cur_triplet = 0
    
    yield (
      [ np.array([images[triplets[cur_triplet + i_triplet][i_in]] for i_triplet in range(batch_size)]) for i_in in range(3)],
      [0] * batch_size
    )

    cur_triplet += batch_size

## Model definition

In [0]:
def create_same_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    layers.BatchNormalization(),
  ], name="same_model")
  return model

def create_linear_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    # layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(32, activation='relu'),
  ], name="linear_model")
  return model

Dense part of a siasme network

In [0]:
def create_dense_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(300, activation='relu'),
    # layers.Dropout(0.2),
    layers.Dense(100, activation='relu'),
    # layers.Dropout(0.2),
    layers.Dense(8, activation='softmax')
  ], name="dense_model")
  return model

Convolutional trunk part of siasme network

In [0]:
def create_conv_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    layers.BatchNormalization(),
    layers.Reshape(IMG_SHAPE + (1,)),

    layers.Conv2D(20, (5, 5), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(40, (5, 5), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.BatchNormalization(),

    layers.Flatten(),
    layers.Dense(300, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(100, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(32, activation='softmax')
  ], name="conv_model")
  return model

In [0]:
def create_conv_2_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    # layers.BatchNormalization(),
    layers.Reshape(IMG_SHAPE + (1,)),

    layers.Conv2D(64, (8, 8), activation='relu'), # kernel_regularizer=keras.regularizers.l2(1e-4)
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(128, (8, 8), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(128, (4, 4), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(256, (4, 4), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Flatten(),
    layers.Dense(512, activation='tanh'),
    # layers.Dense(32 , activation='tanh'),

    # layers.Flatten(),
    # layers.Dense(300, activation='relu'),
    # # layers.Dropout(0.2),
    # layers.Dense(100, activation='relu'),
    # # layers.Dropout(0.2),
    # layers.Dense(32, activation='softmax')
  ], name="conv_model_2")
  return model

In [0]:
def create_conv_3_trunk_model():
  model = keras.models.Sequential([
    layers.Input(IMG_SHAPE),
    # layers.BatchNormalization(), # TODO ? 
    layers.Reshape(IMG_SHAPE + (1,)),

    layers.Conv2D(64, (8, 8), activation='relu'), # kernel_regularizer=keras.regularizers.l2(1e-4)
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(128, (8, 8), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(128, (4, 4), activation='relu'),
    layers.MaxPool2D((2, 2)),

    layers.Conv2D(256, (4, 4), activation='relu'),

    layers.Flatten(),
    layers.Dense(2048, activation='tanh'), # TODO : 4096 ?
    layers.Dense(16, activation='tanh'), # TODO : 64 ?
  ], name="conv_model_3")
  return model

The first function, given a model, create a siamese NN with this model as the common part. The second create a model with two times the same siamese network to compute triplet loss.

In [0]:
def create_siamese(trunk_model):
  inputs = [layers.Input(IMG_SHAPE) for _ in range(2)]
  parts = [trunk_model(inTensor) for inTensor in inputs]
  # diff = layers.subtract(parts)
  # out = layers.Lambda(lambda x : tf.reduce_sum(x**2, axis=(1,)))(diff)
  # out_sqrt = layers.Lambda(lambda x : tf.sqrt(x))(out)
  out_sqrt = tf.sqrt(tf.reduce_sum((parts[1]-parts[0])**2, axis=(1,)))
  return keras.models.Model(inputs=inputs, outputs=out_sqrt, name="Siamese_model"+"_"+trunk_model.name)

def create_triplet_siamese(siamese_model, margin=1.0):
  in_anchor, in_positive, in_negative = [layers.Input(IMG_SHAPE, name=name) for name in ["in_anchor", "in_positive", "in_negative"]]
  positive_dist = siamese_model([in_anchor, in_positive])
  negative_dist = siamese_model([in_anchor, in_negative])

  dist = layers.subtract([positive_dist, negative_dist])
  if margin:
    dist = layers.Lambda(lambda x : tf.maximum(x + margin, 0.0))(dist)
  # dist = layers.Lambda(lambda x : tf.square(x))(dist) # keep ?
  return keras.models.Model(inputs=[in_anchor, in_positive, in_negative], outputs=dist, name="Siamese_triplet_model")

We will now choose the model we will use

In [141]:
str2model = {
    "same" : create_same_trunk_model,
    "linear" : create_linear_trunk_model,
    "dense" : create_dense_trunk_model,
    "conv" : create_conv_trunk_model,
    "conv2" : create_conv_2_trunk_model,
    "conv3" : create_conv_3_trunk_model,
}

trunk_model = str2model[MODEL_TYPE]()
siamese_model = create_siamese(trunk_model)
model = create_triplet_siamese(siamese_model, margin=MARGIN)
trunk_model.summary()
model.summary()

Model: "conv_model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
reshape_3 (Reshape)          (None, 100, 100, 1)       0         
_________________________________________________________________
conv2d_12 (Conv2D)           (None, 93, 93, 64)        4160      
_________________________________________________________________
max_pooling2d_9 (MaxPooling2 (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_13 (Conv2D)           (None, 39, 39, 128)       524416    
_________________________________________________________________
max_pooling2d_10 (MaxPooling (None, 19, 19, 128)       0         
_________________________________________________________________
conv2d_14 (Conv2D)           (None, 16, 16, 128)       262272    
_________________________________________________________________
max_pooling2d_11 (MaxPooling (None, 8, 8, 128)        

If there's a model to restore, we will try to restore weights

In [142]:
if LOAD_FROM:
  if LOAD_FROM == "save_dir":
    LOAD_FROM = get_checkpoint_path()
  print("Load weights from", LOAD_FROM)
  # model.load_weights(LOAD_FROM)
  # model = tf.keras.models.load_model(LOAD_FROM)
  trunk_model.load_weights(LOAD_FROM)
else:
  print("No weights to load")


No weights to load


## Prediction and evaluation models and functions

In [0]:
class NearestPredictor:
  def __init__(self, trunk_model, datas=([], [])):
    self.set_datas(datas)
    self.trunk_model = trunk_model
  
  def set_datas(self, datas):
    self.images, self.labels = datas
  
  def build(self):
    self.img_coords = self.trunk_model.predict(np.array(self.images))

  def predict_in_datas(self, i):
    dists = [dist_fct(self.img_coords[i], coord) for coord in self.img_coords]
    min_j = 0
    for j, coord in enumerate(self.img_coords):
      if j != i and (min_j == i or dists[j] < dists[min_j]):
        min_j = j
    return self.labels[min_j]

  def predict(self, image):
    predict_coords = self.trunk_model.predict(np.array([image]))[0]
    dists = [dist_fct(predict_coords, coord) for coord in self.img_coords]
    min_j = 0
    for j, coord in enumerate(self.img_coords):
      if dists[j] < dists[min_j]:
        min_j = j
    return self.labels[min_j]


In [0]:
def evaluate_accuracy_in_datas(predict_obj, ids_sample):
  predict_obj.build()
  return len([1 for i in ids_sample if predict_obj.labels[i] == predict_obj.predict_in_datas(i)]) / len(ids_sample)

In [0]:
def evaluate_accuracy_train_datas():
  return evaluate_accuracy_in_datas(predict, train_datas_sample)

def evaluate_accuracy_test_datas():
  return evaluate_accuracy_in_datas(predict_testing, test_datas_sample)

In [0]:
predict = NearestPredictor(trunk_model, (train_images, train_labels))
predict_testing = NearestPredictor(trunk_model, (test_images, test_labels))
train_datas_sample = random.sample(list(range(len(train_labels))), ACCURACY_SAMPLE_SIZE)
test_datas_sample = random.sample(list(range(len(test_labels))), ACCURACY_SAMPLE_SIZE)

## Monitor and prepare training

In [0]:
def triplet_loss(y_true, y_pred):
  return K.mean(y_pred)

def accuracy(y_true, y_pred):
  return K.mean(y_pred[:] <= 0.0000001)

Training callbacks

In [0]:
class EpochStdoutLoggerCallback(tf.keras.callbacks.Callback):
  def on_epoch_begin(self, epoch, logs):
    self.time = time.time()
  
  def on_epoch_end(self, epoch, logs):
    epoch_time = time.time() - self.time
    print("Epoch {}/{} finished in {}m {}s | loss: {:.5f} - accuracy: {:.5f}".format(
      epoch+1, NB_EPOCHS, int(epoch_time) // 60, int(epoch_time) % 60, logs['loss'], logs['accuracy']
    ))

In [0]:
class AccuracyCallback(tf.keras.callbacks.Callback):
  def __init__(self, epoch_interval, accuracies=None):
    super().__init__()
    self.epoch_interval = epoch_interval
    self.accuracies = accuracies
  
  def on_epoch_end(self, epoch, logs):
    if epoch % self.epoch_interval == self.epoch_interval - 1:
      acc = evaluate_accuracy_train_datas()
      print("\nAccuracy : {}%".format(acc * 100))
      if self.accuracies:
        self.accuracies.append(acc)

In [0]:
class SaveTrunkCallback(tf.keras.callbacks.Callback):
  def __init__(self, trunk, path, load_weights_on_restart=False, batch_interval=None):
    self.trunk = trunk
    self.path = path
    self.load_weights_on_restart = load_weights_on_restart
    self.batch_interval = batch_interval
  
  def save(self):
    self.trunk.save_weights(self.path)

  def on_train_begin(self, logs=None):
    if (self.load_weights_on_restart and os.path.exists(self.path)):
      self.trunk.load_weights(self.path)
  
  def on_train_batch_end(self, batch, logs=None):
    if self.batch_interval and batch % self.batch_interval == 0:
      self.save()
  
  def on_epoch_end(self, epoch, logs=None):
    self.save()

## Training phase

In [0]:
model.compile(
    # optimizer=tf.optimizers.Adam(learning_rate=LEARNING_RATE), # 0.000000001
    optimizer=tf.optimizers.Adam(learning_rate=0.001),
    loss=triplet_loss,
    metrics=[accuracy] # TODO : This doesn't give the accuracy of the classifier itself
)

hists, accuracies = [], []

In [0]:
# !rm -R logs/*
# %tensorboard --logdir logs

In [0]:
%%time
%matplotlib inline
triplet_generator = create_triplet_generator(
    train_images, train_labels, trunk_model,
    get_triplets_dists,
    BATCH_SIZE
)

logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
checkpoint_path = get_checkpoint_path(suffix="_0")
print("Saving weights at", checkpoint_path)

callbacks = [
  SaveTrunkCallback(trunk_model, checkpoint_path, load_weights_on_restart=False, batch_interval=1000),
  # EpochStdoutLoggerCallback(),
  AccuracyCallback(1, accuracies),
  # keras.callbacks.TensorBoard(logdir, histogram_freq=1),
]

r = model.fit_generator(
    generator=triplet_generator,
    epochs = NB_EPOCHS,
    steps_per_epoch = int(len(train_images) * TRIPLETS_PER_IMAGE / BATCH_SIZE),
    callbacks=callbacks,
    # verbose=0,
  # validation_data=(x_test, y_test)
)
hists.append(r)

# TODO : valiation ; data augmentation ; get_triplet -> pas random

Saving weights at drive/My Drive/ml/weights/oneshot_chars/checkpoint_conv3/weights_0.hdf5
Epoch 1/100

triplets computed 43 s (useful, unuseful) = (192800, 0) (100.00%)

triplets computed 73 s (useful, unuseful) = (192800, 0) (100.00%)
triplets computed 85 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 2/100
triplets computed 85 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 3/100
triplets computed 87 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 4/100
triplets computed 86 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 5/100
triplets computed 91 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 6/100
triplets computed 83 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 7/100
triplets computed 84 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 8/100
triplets computed 83 s (useful, unuseful) = (192800, 0) (100.00%)

Accuracy : 0.0%
Epoch 9/100
trip

## Functions to compute / plot stats about trained models

In [0]:
def eval_dists_on_sample(predict_obj, ids_sample):
  predict_obj.build()
  same_dists, diff_dists = [], []

  for i in ids_sample:
    i_cls, i_coord = predict_obj.labels[i], predict_obj.img_coords[i]
    for j, j_coord in enumerate(predict_obj.img_coords):
      if i != j:
        if i_cls == predict_obj.labels[j]:
          same_dists.append(dist_fct(i_coord, j_coord))
        else:
          diff_dists.append(dist_fct(i_coord, j_coord))
  return same_dists, diff_dists

In [0]:
def plot_hist(arrs):
  plt.figure(figsize=(12,5))
  plt.hist(arrs,
    bins = 60,
    color = ['blue', '#D72F1A'],
    # edgecolor = 'black',
    label=["Same dists", "Diff dists"],
    density=True
  )
  plt.legend(loc='upper right')

  plt.tight_layout()
  plt.show()

## Display trained model stats

In [0]:
%%time
print("===== TRAINING STATS =====")
same_dists, diff_dists = eval_dists_on_sample(predict, train_datas_sample)

print("Accuracy : {}%".format(evaluate_accuracy_train_datas()*100))
print("Avg dist same class :", sum(same_dists) / len(same_dists))
print("Avg dist distinct classes :", sum(diff_dists) / len(diff_dists))

plot_hist([same_dists, diff_dists])
print(len(train_images), ACCURACY_SAMPLE_SIZE, ACCURACY_SAMPLE_SIZE * len(train_images))
print(len(same_dists), len(diff_dists))

In [0]:
%%time
print("===== TESTING STATS =====")
same_dists, diff_dists = eval_dists_on_sample(predict_testing, test_datas_sample)

print("Accuracy : {}%".format(evaluate_accuracy_test_datas()*100))
print("Avg dist same class :", sum(same_dists) / len(same_dists))
print("Avg dist distinct classes :", sum(diff_dists) / len(diff_dists))
plot_hist([same_dists, diff_dists])

In [0]:
# %matplotlib inline

# for _ in range(3):
#     try:
#         pl.clf()
#         pl.plot(pd.Series(data=np.random.randn(100), index=i))
#         display.display(pl.gcf())
#         display.clear_output(wait=True)
#         time.sleep(1)
#     except KeyboardInterrupt:
#         break