Algo a definir es si la tarea será de CBIR uni o multi etiqueta. Ello definirá la métrica de evaluación (test y validación) y el monitoreo. Potencialmente ese punto también cambia la forma en que selecciona el positivo y el negativo.

En caso multi etiqueta lo común es valorar más una recuperación que comparta todas las etiquetas, por lo que el positivo debe ser así (si no existe tal positivo se selecciona uno que tenga al menos una en comun), y el complemento para definir el negativo sería aquel que no tenga ninguna etiqueta en común.
Aquí implementaré dicha estrategia y además el caso en que el negativo es hard negative en minibatch (por definición previa, entonces el requerimiento para ser negative es no tener nada en común).

Esta estrategia es mejorable, considerando propiedades del espacio latente para la seleccion más informativa y diversa del ancla, positivo y negativo [1].


REFERENCES:

  - [1]: https://arxiv.org/abs/2105.03647
  - [VSE++]: https://github.com/fartashf/vsepp.git (method of cross modal triplet loss with hard negative mining and definition of hyperparameters)


# Descipción del método

Para una tarea de recuperación imagen-imagen, consideremos $N$ imágenes con su asociada multi-etiqueta como data de entrenamiento, y sea $p_{data}$ su distribución. Por definición un par de imagenes que tienen las mismas etiquetas conforman un par positivo, y en otro caso se llaman negativos.

En este escenario, el aprendizaje por tripletas formaliza la intuición que la similaridad entre un par positivo $(i_n, {i_n}_{+})$ debería ser mayor que la similaridad entre un par negativo $(i_n, {i_n}_{-})$.

$$\mathcal{L}_{i2i}:= [ \alpha + s(i_n, {i_n}_{-}) - s(i_n, {i_n}_{+})    ]_{+} ,$$
donde $[x]_{+}=max(0,x)$. Aquí $\alpha$ es un hiperparametro conocido como margen. La similaridad $s$ se calcula como el producto punto $s(i,c)= f(i)^t f(c)$ entre las representaciones normalizadas asignada a las imagenes $f$. El encoder visual es $f(i_n)= E g_{i_n}$, donde $g_{i_n} \in \mathbb{R}^{n_V}$ es el vector de características asociado con la imagen $i_n$. Si utilizamos redes neuronales preentrenadas para obtener estos vectores de características, entonces los parámetros entrenables $\theta$ son las matrices de proyección $E_{\phi} \in \mathbb{R}^{k \times n_V}$, con $k$ el hiperparametro que define la dimensión del espacio latente.

El ajuste de los parámetros entrenables se logra a través del algoritmo del gradiente descendiente estocástico aplicado al problema de optimización de minimizar el valor esperado de la loss. En la práctica, el valor esperado es aproximado muestreando desde la data de entrenamiento (pares positivos). Un enfoque simple es considerar también el muestreo aleatorio de los negativos. Sin embargo, es bien conocido que una mejor técnica es la de hard negative mining, donde los negativos son escogidos dinámicamente de acuerdo al estado del modelo actual [VSE++,1]. Lo que significa que se toman como negativos los casos que son más problemáticos dentro del minibatch de optimización:

$$ {i_n}_{-}= \underset{x \neq i_n}{ argmax } \hspace{0.1 cm} s(x,i_n) $$

In [None]:
import tensorflow as tf
import gc
#I programmed this in tensorflow 2.x (e.g 2.13)
print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

import numpy as np
import random
import time
random.seed(0)

2.13.0
Num GPUs Available:  0


In [None]:
# DATA SIMULATION
data= range(1000) #list of img_ids
n_labels=6
#list of labels in order w/r data list
multiplicity_labels=np.random.choice([1,2,3],1000)
labels=[]
for i in range(1000):
  labels.append(list(np.random.choice(list(range(n_labels)),multiplicity_labels[i], replace=False)))
#splits
Train_img, Train_labels= data[:800], labels[:800]
Val_img, Val_labels= data[800:900], labels[800:900]
Test_img, Test_labels= data[900:], labels[900:]
#visual feautures
visual_matrix=np.random.rand(1000, 1024)
path= ""

In [None]:
def recall(labels_gt, labels_prediction):
  """
  set of ground truth labels and list of prediction lists. This is an independent of k way to calculate the recall.
  """
  n= len(labels_prediction)
  recall=0
  for i in range(n):
    recall+= len(set(labels_gt)&set(labels_prediction[i]))/len(labels_gt)
  return recall/n

def get_batch_data(data, index, size):
    """
    For minibatch training
    """

    column_1 = []
    column_2 = []
    for i in range(index, index + size):
        line = data[i]
        column_1.append(int(line[0]))#imgA
        column_2.append(int(line[1]))#imgP
    return np.array(column_1), np.array(column_2)


class triplet_model(object):
        def __init__(self, data, labels, visual_matrix,factor,mode,estimator ,pretrain=False):
            """
            mode: (negative sampling) "random" or "HN"
            estimator: "Linear" or "net"
            """
            self.factor= factor #dim latent space
            self.batch_size =  30  #VSE++
            self.epochs = 30 #30 #VSE++
            self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002) #VSE++
            self.estimator=estimator # FF net or LINEAR
            self.DIS_MODEL_FILE= path+"/"+self.estimator+"_" #name path to save
            print(self.DIS_MODEL_FILE)
            self.mode= mode  #negative sampling: HN or Random
            self.alpha=0.2 # in the hinge loss

            #get partition for Train, Val and Test from data
            self.n_labels= n_labels
            self.train_ids,self.labels_train = Train_img, Train_labels
            self.val_ids,self.labels_val = Val_img, Val_labels
            self.test_ids,self.labels_test = Test_img, Test_labels

            #arquitecture FF net
            if self.estimator=="net":
                act= "relu"
                layer_name = 'Visual_Enconder'
                neurons = [visual_matrix.shape[1], int((visual_matrix.shape[1] + self.factor)/2), self.factor]
                v_input = tf.keras.Input(shape=(visual_matrix.shape[1],))

                v_output = tf.keras.layers.Dense(units=neurons[0], activation=act,  kernel_regularizer='l2' )(v_input)
                v_output= tf.keras.layers.Dropout(rate= .2, seed= 0)(v_output)
                v_output = tf.keras.layers.Dense(units=neurons[1], activation=act,  kernel_regularizer='l2' )(v_output)
                v_output = tf.keras.layers.Dense(units=neurons[2], activation=act,  kernel_regularizer='l2')(v_output)
                v_output= tf.math.l2_normalize(v_output, axis=1) #tf.keras.layers.UnitNormalization()(v_output)

                self.visual_encoder = tf.keras.Model(inputs=[v_input], outputs=v_output)

            if self.estimator=="Linear":

                layer_name = 'Visual_Enconder'

                v_input = tf.keras.Input(shape=(visual_matrix.shape[1],))

                v_output = tf.keras.layers.Dense(units=self.factor, use_bias=False )(v_input)
                v_output= tf.keras.layers.UnitNormalization()(v_output) #tf.math.l2_normalize(v_output)  #

                self.visual_encoder = tf.keras.Model(inputs=[v_input], outputs=v_output)


            if pretrain: #To load the model to apply in test
                self.visual_encoder= tf.keras.models.load_model(self.DIS_MODEL_FILE+'visual_encoder_tf')
                print("pre train")

            #place holders
            i_input = tf.keras.Input(shape=(visual_matrix.shape[1],), name="i") #anchor
            i_n_input = tf.keras.Input(shape=(visual_matrix.shape[1],), name="i_n") #negative
            i_p_input = tf.keras.Input(shape=(visual_matrix.shape[1],), name="i_p") #positive
            i_emb = self.visual_encoder(i_input)
            i_n_emb = self.visual_encoder(i_n_input)
            i_p_emb = self.visual_encoder(i_p_input)

            sim= ( tf.keras.layers.Dot(axes=1)([i_emb, i_p_emb]),tf.keras.layers.Dot(axes=1)([i_emb, i_n_emb]))
            self.model =  tf.keras.Model(inputs=[i_input, i_p_input, i_n_input], outputs=sim)
            self.visual_encoder.compile(optimizer=self.optimizer)
            self.model.compile(optimizer=self.optimizer)

        def loss(self,A, P, N):
          """
          for computing the hinge loss [a+n-P]_{+}
          """

          i_input = tf.nn.embedding_lookup(visual_matrix, A) #[_,1280]
          i_n_input = tf.nn.embedding_lookup(visual_matrix, N) #[_,1280]]
          i_p_input = tf.nn.embedding_lookup(visual_matrix, P) #[_,1280]]

          (positive_pair,negative_pair ) = self.model([i_input, i_p_input, i_n_input])
          #print(positive_pair,negative_pair)
          i2t_t2i_loss= tf.reduce_mean(tf.maximum(self.alpha - positive_pair+negative_pair, 0) )
          return i2t_t2i_loss

        def construct_train(self):
          """
          Positive pair selection methodology. It's used just once.
          """
          class_imgs_train= {} # dictionary class: [class]:[list of indices]
          for i in range(self.n_labels):
            _=[]
            c=0
            for j in self.labels_train:
              if i in j:
                _.append(c)
              c+=1
            class_imgs_train[i]=_

          data= [] # construct of positive pairs
          for i in self.train_ids: #anchors
            candidates=set()
            for j in self.labels_train[i]: #same labels
              candidates= candidates & set(class_imgs_train[j])
            candidates= candidates - set([i]) #but the positive cant't be the anchor
            if len(candidates)==0: # it dosen't exist
              candidates= set(class_imgs_train[np.random.choice(self.labels_train[i])])-set([i]) # at least one
            data.append( (i,  np.random.choice(list(candidates))) ) #random choice of candidates

          np.random.shuffle(data) #very useful for breaking patterns
          return class_imgs_train,data

        def predict(self,query,split):
            """
            the split defines where we are going to search
            """
            if type(query) is not list:
                query= [query]

            i_input = self.visual_encoder(tf.nn.embedding_lookup(visual_matrix, query))
            if split=="val":
                    c_eval= self.visual_encoder(visual_matrix[self.val_ids])
            if split=="test":
                    c_eval= self.visual_encoder(visual_matrix[self.test_ids])
            if split=="train":
                    c_eval=self.visual_encoder(visual_matrix[self.train_ids])

            rating= tf.matmul(i_input, tf.transpose(c_eval) )
            return np.reshape(rating, [-1])

        def metric(self,split, k=10):
          """
          to compute metrics
          """
          if split=="val":
              ids,gt= self.val_ids,self.labels_val
          if split=="test":
              ids,gt= self.test_ids,self.labels_test

          metric=0
          for i in range(len(ids)): #querys
            top= np.argsort(self.predict(ids[i],split))[::-1][1:k+1] #first one must be the query
            #get labels
            prediction= [gt[j] for j in top]
            metric+=recall(gt[i], prediction)
          return metric/len(ids) #average

        def inference(self):
          """
          testing phase
          """
          results=(self.metric("test",k=1), self.metric("test",k=10), self.metric("test",k=25))
          print(results)
          np.save(self.DIS_MODEL_FILE+"test_metrics.npy", np.array([results], dtype=object) )

        def save(self, model_time, loss,recalls ):
          """
          For saving logs of the experiment
          """
          gpu_name=!nvidia-smi -L
          with open(self.DIS_MODEL_FILE+'train.txt', 'w') as f:
            f.write("training time of model: %fs" % (model_time) + "\n")
            #f.write("finish epoch %s" % epoch + "with recall %s" % best_recall  + "\n")
            f.write(gpu_name[0] + "\n")
          np.save(self.DIS_MODEL_FILE+"Loss_recalls.npy", np.array([loss,recalls],dtype=object))
          #saving model
          self.visual_encoder.save(self.DIS_MODEL_FILE+'visual_encoder_tf',save_format='tf')

        def training(self):
            """
            Training process
            """
            recalls=[] #recall 1, 10, 25
            loss_history=[]
            start_time = time.time()
            n_train=len(self.train_ids)
            class_imgs_train,data= self.construct_train()

            for epoch in range(self.epochs):
                print("epoch: %d" % epoch)
                if epoch>=15: #decay
                    self.optimizer.learning_rate.assign(0.00002) #VSE++

                index = 0
                while index + self.batch_size < n_train :
                    input_A, input_P = get_batch_data(data, index,self.batch_size)  #anchor,positive
                    index += self.batch_size
                    input_N=[] #negatives

                    if self.mode=="random":
                      for anchor in input_A:
                        candidates= set(self.train_ids) #all of them
                        for j in self.labels_train[anchor]:
                          candidates= candidates - set(class_imgs_train[j])
                        input_N.append( np.random.choice(list(candidates)) )#Random negative

                    else:  #Hard Negative
                        rating= np.array(self.predict( list(input_A) , "train"), dtype=float)
                        gc.collect()

                        for i_ in range(self.batch_size):
                            ratings_batch= rating[n_train*i_:n_train*(i_+1)]  #all images in train

                            #-------- out ground truth----------
                            for j in self.labels_train[input_A[i_]]:
                              ratings_batch[ class_imgs_train[j]]= - np.infty #out imgs with same classes
                            #-------- out ground truth----------
                            input_N.append(self.train_ids[np.argmax(ratings_batch)])
                            del ratings_batch
                    with tf.GradientTape() as tape:
                            losses= self.loss(input_A, input_P, input_N)

                    grads = tape.gradient(losses, self.model.trainable_variables)
                    self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))


                print("loss:",  losses.numpy() ,"time: ", np.round(time.time() - start_time, 4))
                loss_history.append(losses.numpy() )

                #validation
                print("Recall: 1-10-25 ",self.metric("val",k=1), self.metric("val",k=10), self.metric("val",k=25))
                recalls.append(( self.metric("val",k=1), self.metric("val",k=10), self.metric("val",k=25) ))
                break

            #end
            #save
            self.save(time.time()-start_time, loss_history,recalls )

In [None]:
net =  triplet_model(data, labels, visual_matrix,1024,"HN","Linear")
net.training()

/Linear_
epoch: 0
loss: 0.20700344 time:  10.6632
Recall: 1-10-25  0.4433333333333333 0.3721666666666666 0.35793333333333327


In [None]:
net =  triplet_model(data, labels, visual_matrix,1024,"HN","Linear",pretrain=True)
net.inference()

/Linear_
pre train
(0.35833333333333334, 0.3588333333333333, 0.3538)
