

What's differentiate metric learning from traditional classification is that we train the model to learn a compact representation

how to have examples of the same classes to be close together and the one f


References
- Original paper:
FaceNet: A Unified Embedding for Face Recognition and Clustering
https://arxiv.org/abs/1503.03832 

- Mining strategies:
https://openaccess.thecvf.com/content_WACV_2020/papers/Xuan_Improved_Embeddings_with_Easy_Positive_Triplet_Mining_WACV_2020_paper.pdf 


In [1]:
 %load_ext autoreload
 %autoreload 2

In [2]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import  TensorBoard
# from tqdm.auto import tqdm
from time import  time
from tabulate import  tabulate

In [3]:
from tensorflow_similarity.utils import tf_cap_memory
from tensorflow_similarity.losses import TripletLoss
from tensorflow_similarity.layers import MetricEmbedding
from tensorflow_similarity.model import SimilarityModel

In [4]:
import tensorflow_addons as tfa
from tensorflow_similarity.tmp import triplet_hard_loss as tfa_local
from tensorflow_similarity.tmp import TripletHardLoss as THLL


In [5]:
tf_cap_memory()
print(tf.__version__)

2.3.1


# preparing data

Note: Tensorflow similarity expect y_train to be the examples class as integer so there is no need for processing

In [None]:
# FIXME sampler here and select only half of the class

In [6]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

x_train = tf.constant(x_train / 255.0)
x_test = tf.constant(x_test / 255.0)
x_train.shape

TensorShape([60000, 28, 28])

# model creation

In [7]:
def get_model():
    tf.keras.backend.clear_session()
    inputs = tf.keras.layers.Input(shape=(28,28))
    x = layers.Reshape((28,28,1))(inputs)
    x = layers.Conv2D(16, 3, activation='relu')(x)
    x = layers.Conv2D(32, 3, activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    outputs = MetricEmbedding(10)(x)
    # outputs2 = MetricEmbedding(10)(x)
    return SimilarityModel(inputs, outputs)
model = get_model()
model.summary()

Model: "similarity_model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28)]          0         
_________________________________________________________________
reshape (Reshape)            (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d (Conv2D)              (None, 26, 26, 16)        160       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 24, 24, 32)        4640      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 12, 12, 32)        0         
_________________________________________________________________
flatten (Flatten)            (None, 4608)              0         
_________________________________________________________________
dense (Dense)                (None, 128)          

# Compilation

In [8]:

distance = 'cosine' #@param ["cosine"]{allow-input: false}
positive_mining_strategy = 'hard' #@param ["easy", "hard"]{allow-input: false}
negative_mining_strategy = 'hard' #@param ["easy", "hard"]{allow-input: false}
triplet_loss = TripletLoss(distance=distance,
    positive_mining_strategy=positive_mining_strategy,
    negative_mining_strategy=negative_mining_strategy, name='tfs', margin=1)

tb = TensorBoard(log_dir='logs/%s' % int(time()), update_freq='batch')

model = get_model()
tfa_loss = tfa.losses.TripletHardLoss(distance_metric='angular', name='tfa')
tfa_loss_local = THLL(distance_metric='angular', name='tfal')
# model.compile(optimizer='adam', loss=triplet_loss)
history = model.compile(optimizer='adam', loss=tfa_loss, metrics=[tfa_loss_local, triplet_loss])


EPOCHS = 1 #@param{Integer}
BATCH_SIZE = 8 #@param{Integer}
history = model.fit(x_train, y_train, validation_data=(x_test, y_test),batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[tb])

Instructions for updating:
use `tf.profiler.experimental.stop` instead.


# Training

In [9]:
from tensorflow_similarity import metrics
semi = TripletLoss(distance=distance,
    positive_mining_strategy='hard',
    negative_mining_strategy='semi-hard')

In [10]:
def angular_distance_np(feature):
    """Computes the angular distance matrix in numpy.
    Args:
      feature: 2-D numpy array of size [number of data, feature dimension]
    Returns:
      angular_distances: 2-D numpy array of size
        [number of data, number of data].
    """

    # l2-normalize all features
    normed = feature / np.linalg.norm(feature, ord=2, axis=1, keepdims=True)
    cosine_similarity = normed @ normed.T
    inverse_cos_sim = 1 - cosine_similarity

    return inverse_cos_sim

def triplet_hard_loss_np(labels, embedding, margin, dist_func, soft=False):

    num_data = embedding.shape[0]
    # Reshape labels to compute adjacency matrix.
    labels_reshaped = np.reshape(labels.astype(np.float32), (labels.shape[0], 1))

    adjacency = np.equal(labels_reshaped, labels_reshaped.T)
    pdist_matrix = dist_func(embedding)
    loss_np = 0.0
    for i in range(num_data):
        pos_distances = []
        neg_distances = []
        for j in range(num_data):
            if adjacency[i][j] == 0:
                neg_distances.append(pdist_matrix[i][j])
            if adjacency[i][j] > 0.0 and i != j:
                pos_distances.append(pdist_matrix[i][j])

        # if there are no positive pairs, distance is 0
        if len(pos_distances) == 0:
            pos_distances.append(0)

        # Sort by distance.
        neg_distances.sort()
        min_neg_distance = neg_distances[0]
        pos_distances.sort(reverse=True)
        max_pos_distance = pos_distances[0]

        if soft:
            loss_np += np.log1p(np.exp(max_pos_distance - min_neg_distance))
        else:
            loss_np += np.maximum(0.0, max_pos_distance - min_neg_distance + margin)

    loss_np /= num_data
    return loss_np


In [11]:
for i in range(1, len(x_train), BATCH_SIZE):
    s= i + BATCH_SIZE
    x = x_train[i:s]
    y = y_train[i:s]
    preds = model.predict(x)
    tla = tfa_loss_local(y, preds)
    tl = triplet_loss(y, preds)
    if tl != tla:
        break
print(tla, tl)

KeyboardInterrupt: 

In [None]:
tl = triplet_loss(y, preds)
tla = tfa_loss_local(y, preds)
npl = triplet_hard_loss_np(y, preds, 1.0, angular_distance_np)
print(tla, tl, npl)

In [37]:
print(triplet_loss(y, preds))
print(tfa_loss_local(y, preds))

tf.Tensor(0.0, shape=(), dtype=float32)
tf.Tensor(0.015318215, shape=(), dtype=float32)


In [22]:
print(tfa.__version__)
tal = tfa_local(y, preds, distance_metric='angular')
ta = tfa.losses.triplet.triplet_hard_loss(y, preds, distance_metric='angular')
print(ta, tal)

0.11.2
tf.Tensor(0.015318215, shape=(), dtype=float32) tf.Tensor(0.015318215, shape=(), dtype=float32)


In [23]:
triplet_hard_loss_np(y, preds, 1.0, angular_distance_np)

0.015318162739276886