### Imports

In [1]:
import numpy as np
import tensorflow as tf

### Similarity Scores

In [4]:
# Two vector example
# Input data
v1 = np.array([1, 2, 3], dtype = float)
v2 = np.array([1, 2, 3.5], dtype = float)

# Try modifying the vector v2 to see how it impacts the cosine similarity
# v2 = v1                   # identical vector
# v2 = v1 * -1              # opposite vector
# v2 = np.array([0,-42,1], dtype=float)  # random example

print("-- Input --")
print("V1 : ", v1)
print("V2 : ", v2, "\n")

def cosine_similarity(v1, v2):
  numerator = tf.math.reduce_sum(v1 * v2)
  denominator = tf.math.sqrt(tf.math.reduce_sum(v1 * v1) * tf.math.reduce_sum(v2 * v2))
  return numerator / denominator

print("-- Outputs --")
print("Cosine similarity : ", cosine_similarity(v1, v2))

-- Input --
V1 :  [1. 2. 3.]
V2 :  [-1. -2. -3.] 

-- Outputs --
Cosine similarity :  tf.Tensor(-1.0, shape=(), dtype=float64)


### Two Batches of Vectors

In [11]:
# Two batches of vectors example
# Input data

"""
v1 :
array([[ 1.,  2.,  3.],
       [ 9.,  8.,  7.],
       [-1., -4., -2.],
       [ 1., -7.,  2.]])
"""
v1_1 = np.array([1.0, 2.0, 3.0])
v1_2 = np.array([9.0, 8.0, 7.0])
v1_3 = np.array([-1.0, -4.0, -2.0])
v1_4 = np.array([1.0, -7.0, 2.0])
v1 = np.vstack([v1_1, v1_2, v1_3, v1_4])

"""
v2:
array([[ 4.72843251, -0.31416263,  3.0220996 ],
       [ 9.20017366,  9.47500835,  7.32925269],
       [-5.79729922, -4.17808562, -2.54299621],
       [ 3.72376963, -7.63641921,  2.29885007]])
"""
v2_1 = v1_1 + np.random.normal(0, 2, 3)  # add some noise to create approximate duplicate
v2_2 = v1_2 + np.random.normal(0, 2, 3)
v2_3 = v1_3 + np.random.normal(0, 2, 3)
v2_4 = v1_4 + np.random.normal(0, 2, 3)
v2 = np.vstack([v2_1, v2_2, v2_3, v2_4])

print("-- Input --")
print(f"v1 : \n{v1}\n")
print(f"v2 : \n{v2}\n")

# Batch sizes must match
b = len(v1)
print(f"Batch sizes match : {b == len(v2)}\n")

# Similarity scores
# Option 1 : nested loops and the cosine similarity function
sim_1 = np.zeros([b, b])
for row in range(0, sim_1.shape[0]):
  for col in range(0, sim_1.shape[1]):
    sim_1[row,col] = cosine_similarity(v2[row], v1[col]).numpy()

print("-- Outputs --")
print("Option 1 : loop")
print(sim_1)

-- Input --
v1 : 
[[ 1.  2.  3.]
 [ 9.  8.  7.]
 [-1. -4. -2.]
 [ 1. -7.  2.]]

v2 : 
[[-2.92852677  1.60778267  3.41559857]
 [ 6.15808817  7.36827865  5.78058324]
 [-3.76842578 -2.95374122 -1.0572413 ]
 [-0.81672465 -6.6080942   3.82946569]]

Batch sizes match : True

-- Outputs --
Option 1 : loop
[[ 0.58924083  0.15650085 -0.47197698 -0.20939565]
 [ 0.91173463  0.99178717 -0.91879297 -0.41108239]
 [-0.70026323 -0.95094808  0.78761446  0.41055358]
 [-0.08853597 -0.312278    0.55655378  0.94073422]]


In [12]:
# Option 2 : vector normalization and dot product
def norm(x):
  return tf.math.l2_normalize(x, axis = 1) # Use tensorflow built in normalization

sim_2 = tf.linalg.matmul(norm(v2), norm(v1), transpose_b = True)

print("-- Outputs --")
print("Option 2 : vector normalization and dot product")
print(sim_2, "\n")

# Check
print(f"Outputs are the same : {np.allclose(sim_1, sim_2)}")

-- Outputs --
Option 2 : vector normalization and dot product
tf.Tensor(
[[ 0.58924083  0.15650085 -0.47197698 -0.20939565]
 [ 0.91173463  0.99178717 -0.91879297 -0.41108239]
 [-0.70026323 -0.95094808  0.78761446  0.41055358]
 [-0.08853597 -0.312278    0.55655378  0.94073422]], shape=(4, 4), dtype=float64) 

Outputs are the same : True


### Hard Negative Mining
L1=max(𝑚𝑒𝑎𝑛_𝑛𝑒𝑔−s(𝐴,𝑃)+𝛼,0)

L2=max(𝑐𝑙𝑜𝑠𝑒𝑠𝑡_𝑛𝑒𝑔−s(𝐴,𝑃)+𝛼,0)

In [17]:
# Hardcoded matrix of similarity scores
sim_hardcoded = np.array(
    [
        [0.9, -0.8, 0.3, -0.5],
        [-0.4, 0.5, 0.1, -0.1],
        [0.3, 0.1, -0.4, -0.8],
        [-0.5, -0.2, -0.7, 0.5],
    ]
)

sim = sim_hardcoded

# Try using different values for the matrix of similarity scores
# sim = 2 * np.random.random_sample((b,b)) -1   # random similarity scores between -1 and 1
# sim = sim_2

# Batch size, b = 4
b = sim.shape[0]

print("-- Inputs --")
print(f"sim:")
print(sim)
print(f"shape: {sim.shape}\n")

# Positives
# All the s(A,P) values : similarities from duplicate question pairs
# These are along the diagonal
sim_ap = np.diag(sim)
print("sim_ap : ")
print(np.diag(sim_ap))

# Negatives
# all the s(A,N) values : similarities the non duplicate question pairs (aka Negatives)
# There are in the off diagonals
sim_an = sim - np.diag(sim_ap)
print("\nsim_an : ")
print(sim_an)

print("\n-- Outputs --")
# Mean negative
# Average of the s(A,N) values for each row
mean_neg = np.sum(sim_an, axis = 1, keepdims = True)/ (b - 1)
print("\nmean_neg: ")
print(mean_neg)

# Closest negative
# Max s(A,N) that is <= s(A,P) for each row
"""
np.identity(b): This creates a b×b identity matrix, where b is presumably the size of your square matrix.
An identity matrix has ones on its diagonal and zeros elsewhere.
array([[ True, False, False, False],
       [False,  True, False, False],
       [False, False,  True, False],
       [False, False, False,  True]])
"""
mask_1 = np.identity(b) == 1            # mask to exclude the diagonal

"""
sim_an :
[[ 0.  -0.8  0.3 -0.5]
 [-0.4  0.   0.1 -0.1]
 [ 0.3  0.1  0.  -0.8]
 [-0.5 -0.2 -0.7  0. ]]
sim_ap :
[ 0.9  0.5 -0.4  0.5]
sim_ap.reshape(b, 1) :
[[ 0.9]
 [ 0.5]
 [-0.4]
 [ 0.5]
sim_an > sim_ap.reshape(b, 1) :
array([[False, False, False, False],
       [False, False, False, False],
       [ True,  True,  True, False],
       [False, False, False, False]])
"""
mask_2 = sim_an > sim_ap.reshape(b, 1)  # mask to exclude sim_an > sim_ap
"""
mask_1 | mask_2: This performs an element-wise logical OR operation between mask_1 and mask_2
array([[ True, False, False, False],
       [False,  True, False, False],
       [ True,  True,  True, False],
       [False, False, False,  True]])
"""
mask = mask_1 | mask_2
# create a copy to preserve sim_an
sim_an_masked = np.copy(sim_an)
# This indexing operation selects the elements in sim_an_masked where mask is True to -2
sim_an_masked[mask] = -2

closest_neg = np.max(sim_an_masked, axis = 1, keepdims = True)
print("\nClosest_neg : ")
print(closest_neg)

-- Inputs --
sim:
[[ 0.9 -0.8  0.3 -0.5]
 [-0.4  0.5  0.1 -0.1]
 [ 0.3  0.1 -0.4 -0.8]
 [-0.5 -0.2 -0.7  0.5]]
shape: (4, 4)

sim_ap : 
[[ 0.9  0.   0.   0. ]
 [ 0.   0.5  0.   0. ]
 [ 0.   0.  -0.4  0. ]
 [ 0.   0.   0.   0.5]]

sim_an : 
[[ 0.  -0.8  0.3 -0.5]
 [-0.4  0.   0.1 -0.1]
 [ 0.3  0.1  0.  -0.8]
 [-0.5 -0.2 -0.7  0. ]]

-- Outputs --

mean_neg: 
[[-0.33333333]
 [-0.13333333]
 [-0.13333333]
 [-0.46666667]]

Closest_neg : 
[[ 0.3]
 [ 0.1]
 [-0.8]
 [-0.2]]


In [33]:
# implementation in TensorFlow
# Hardcoded matrix of similarity scores
sim_hardcoded = np.array(
    [
        [0.9, -0.8, 0.3, -0.5],
        [-0.4, 0.5, 0.1, -0.1],
        [0.3, 0.1, -0.4, -0.8],
        [-0.5, -0.2, -0.7, 0.5],
    ]
)

sim = sim_hardcoded

# Try using different values for the matrix of similarity scores
# sim = 2 * np.random.random_sample((b,b)) -1   # random similarity scores between -1 and 1
# sim = sim_2                                   # the matrix calculated previously using vector normalization and dot product

# Batch size
b = sim.shape[0]

print("-- Inputs --")
print("sim :")
print(sim)
print("shape :", sim.shape, "\n")

# Positives
# All the s(A,P) values : similarities from duplicate question pairs (aka Positives)
# These are along the diagonal
sim_ap = tf.linalg.diag_part(sim) # this is just a 1D array of diagonal elements
print("sim_ap :")
# tf.linalg.diag makes a diagonal matrix given an array
print(tf.linalg.diag(sim_ap), "\n")

# Negatives
# all the s(A,N) values : similarities the non duplicate question pairs (aka Negatives)
# These are in the off diagonals
sim_an = sim - tf.linalg.diag(sim_ap)
print("sim_an :")
print(sim_an, "\n")

print("-- Outputs --")
# Mean negative
# Average of the s(A,N) values for each row
mean_neg = tf.math.reduce_sum(sim_an, axis=1) / (b - 1)
print("mean_neg :")
print(mean_neg, "\n")

# Closest negative
# Max s(A,N) that is <= s(A,P) for each row
mask_1 = tf.eye(b) == 1            # mask to exclude the diagonal
mask_2 = sim_an > tf.expand_dims(sim_ap, 1)  # mask to exclude sim_an > sim_ap
mask = tf.cast(mask_1 | mask_2, tf.float64)
sim_an_masked = sim_an - 2.0*mask

closest_neg = tf.math.reduce_max(sim_an_masked, axis=1)
print("closest_neg :")
print(closest_neg, "\n")

-- Inputs --
sim :
[[ 0.9 -0.8  0.3 -0.5]
 [-0.4  0.5  0.1 -0.1]
 [ 0.3  0.1 -0.4 -0.8]
 [-0.5 -0.2 -0.7  0.5]]
shape : (4, 4) 

sim_ap :
tf.Tensor(
[[ 0.9  0.   0.   0. ]
 [ 0.   0.5  0.   0. ]
 [ 0.   0.  -0.4  0. ]
 [ 0.   0.   0.   0.5]], shape=(4, 4), dtype=float64) 

sim_an :
tf.Tensor(
[[ 0.  -0.8  0.3 -0.5]
 [-0.4  0.   0.1 -0.1]
 [ 0.3  0.1  0.  -0.8]
 [-0.5 -0.2 -0.7  0. ]], shape=(4, 4), dtype=float64) 

-- Outputs --
mean_neg :
tf.Tensor([-0.33333333 -0.13333333 -0.13333333 -0.46666667], shape=(4,), dtype=float64) 

closest_neg :
tf.Tensor([ 0.3  0.1 -0.8 -0.2], shape=(4,), dtype=float64) 



### The Loss Functions
L1=max(𝑚𝑒𝑎𝑛_𝑛𝑒𝑔−s(𝐴,𝑃)+𝛼,0)

L2=max(𝑐𝑙𝑜𝑠𝑒𝑠𝑡_𝑛𝑒𝑔−s(𝐴,𝑃)+𝛼,0)

L_Full=L1+L2

In [32]:
# Alpha margin
alpha = 0.25

# Modified triplet loss
# Loss 1
l_1 = tf.maximum(mean_neg - sim_ap + alpha, 0)
print(f"Loss 1: \n{l_1}\n")
# Loss 2
l_2 = tf.maximum(closest_neg - sim_ap + alpha, 0)
print(f"Loss 2: \n{l_2}\n")
# Loss full
l_full = l_1 + l_2
# Cost
cost = tf.math.reduce_sum(l_full)

print("-- Outputs --")
print("Loss full :")
print(l_full, "\n")
print("Cost :", "{:.3f}".format(cost))

Loss 1: 
[[0.         0.         0.31666667 0.        ]
 [0.         0.         0.51666667 0.        ]
 [0.         0.         0.51666667 0.        ]
 [0.         0.         0.18333333 0.        ]]

Loss 2: 
[[0.   0.05 0.95 0.05]
 [0.   0.   0.75 0.  ]
 [0.   0.   0.   0.  ]
 [0.   0.   0.45 0.  ]]

-- Outputs --
Loss full :
tf.Tensor(
[[0.         0.05       1.26666667 0.05      ]
 [0.         0.         1.26666667 0.        ]
 [0.         0.         0.51666667 0.        ]
 [0.         0.         0.63333333 0.        ]], shape=(4, 4), dtype=float64) 

Cost : 3.783
