In [24]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Model, Sequential
from tensorflow import math
import tensorflow.linalg as linalg
import numpy as np

# Setting random seeds
numpy.random.seed(10)


In [3]:
##########################

In [4]:
#Creating a Siamese model

In [5]:
vocab_size = 500
model_dimension = 128

# Define the LSTM model
LSTM = Sequential()
LSTM.add(layers.Embedding(input_dim=vocab_size, output_dim=model_dimension))
LSTM.add(layers.LSTM(units=model_dimension, return_sequences = True))
LSTM.add(layers.AveragePooling1D(pool_size=2))
LSTM.add(layers.Lambda(lambda x: math.l2_normalize(x)))

input1 = layers.Input((None,))
input2 = layers.Input((None,))

# Concatenate two LSTMs together
conc = layers.Concatenate(axis=1)((LSTM(input1), LSTM(input2)))
    

# Use the Parallel combinator to create a Siamese model out of the LSTM 
Siamese = Model(inputs=(input1, input2), outputs=conc)

# Print the summary of the model
Siamese.summary()

In [6]:
def show_layers(model, layer_prefix):
    print(f"Total layers: {len(model.layers)}\n")
    for i in range(len(model.layers)):
        print('========')
        print(f'{layer_prefix}_{i}: {model.layers[i]}\n')

print('Siamese model:\n')
show_layers(Siamese, 'Parallel.sublayers')

print('Detail of LSTM models:\n')
show_layers(LSTM, 'Serial.sublayers')

Siamese model:

Total layers: 4

Parallel.sublayers_0: <InputLayer name=input_layer, built=True>

Parallel.sublayers_1: <InputLayer name=input_layer_1, built=True>

Parallel.sublayers_2: <Sequential name=sequential, built=True>

Parallel.sublayers_3: <Concatenate name=concatenate, built=True>

Detail of LSTM models:

Total layers: 4

Serial.sublayers_0: <Embedding name=embedding, built=True>

Serial.sublayers_1: <LSTM name=lstm, built=True>

Serial.sublayers_2: <AveragePooling1D name=average_pooling1d, built=True>

Serial.sublayers_3: <Lambda name=lambda, built=True>



In [None]:
###################################

In [None]:
#Modified Triplet Loss

In [None]:
#1)Similarity Scores

In [8]:
# Two vector example
# Input data

v1 = np.array([1, 2, 3], dtype=float)
v2 = np.array([1, 2, 3.5], dtype=float)  # notice the 3rd element is offset by 0.5

### START CODE HERE ###
# Try modifying the vector v2 to see how it impacts the cosine similarity
# v2 = v1                   # identical vector
# v2 = v1 * -1              # opposite vector
# v2 = np.array([0,-42,1], dtype=float)  # random example
### END CODE HERE ###

print("-- Inputs --")
print("v1 :", v1)
print("v2 :", v2, "\n")

# Similarity score
def cosine_similarity(v1, v2):
    numerator = tf.math.reduce_sum(v1*v2) # takes the dot product between v1 and v2. Equivalent to np.dot(v1, v2)
    denominator = tf.math.sqrt(tf.math.reduce_sum(v1*v1) * tf.math.reduce_sum(v2*v2))
    return numerator / denominator

print("-- Outputs --")
print("cosine similarity :", cosine_similarity(v1, v2).numpy())

-- Inputs --
v1 : [1. 2. 3.]
v2 : [1.  2.  3.5] 

-- Outputs --
cosine similarity : 0.9974086507360697


In [9]:
# Two batches of vectors example
# Input data

v1_1 = np.array([1.0, 2.0, 3.0])
v1_2 = np.array([9.0, 8.0, 7.0])
v1_3 = np.array([-1.0, -4.0, -2.0])
v1_4 = np.array([1.0, -7.0, 2.0])
v1 = np.vstack([v1_1, v1_2, v1_3, v1_4])

v2_1 = v1_1 + np.random.normal(0, 2, 3)  # add some noise to create approximate duplicate
v2_2 = v1_2 + np.random.normal(0, 2, 3)
v2_3 = v1_3 + np.random.normal(0, 2, 3)
v2_4 = v1_4 + np.random.normal(0, 2, 3)
v2 = np.vstack([v2_1, v2_2, v2_3, v2_4])

print("-- Inputs --")
print(f"v1 :\n{v1}\n")
print(f"v2 :\n{v2}\n")

# Batch sizes must match
b = len(v1)
print(f"Batch sizes match : {b == len(v2)}\n")

# Similarity scores

# Option 1 : nested loops and the cosine similarity function
sim_1 = np.zeros([b, b])  # empty array to take similarity scores
# Loop
for row in range(0, sim_1.shape[0]):
    for col in range(0, sim_1.shape[1]):
        sim_1[row, col] = cosine_similarity(v2[row], v1[col]).numpy()

print("-- Outputs --")
print("Option 1 : loop")
print(sim_1)

-- Inputs --
v1 :
[[ 1.  2.  3.]
 [ 9.  8.  7.]
 [-1. -4. -2.]
 [ 1. -7.  2.]]

v2 :
[[ 3.66317301  3.43055795 -0.09080058]
 [ 8.9832323   9.24267195  5.55982888]
 [-0.46897683 -3.78290295 -1.99141714]
 [ 0.65079958 -6.13394762  4.40607475]]

Batch sizes match : True

-- Outputs --
Option 1 : loop
[[ 0.54585293  0.85501183 -0.7479123  -0.55664368]
 [ 0.84056941  0.99077601 -0.88726101 -0.43233775]
 [-0.8705723  -0.80838804  0.99366415  0.69702587]
 [ 0.05645085 -0.11717703  0.43390414  0.94069221]]


In [10]:
# Option 2 : vector normalization and dot product
def norm(x):
    return tf.math.l2_normalize(x, axis=1) # use tensorflow built in normalization

sim_2 = tf.linalg.matmul(norm(v2), norm(v1), transpose_b=True)

print("-- Outputs --")
print("Option 2 : vector normalization and dot product")
print(sim_2, "\n")

# Check
print(f"Outputs are the same : {np.allclose(sim_1, sim_2)}")

-- Outputs --
Option 2 : vector normalization and dot product
tf.Tensor(
[[ 0.54585293  0.85501183 -0.7479123  -0.55664368]
 [ 0.84056941  0.99077601 -0.88726101 -0.43233775]
 [-0.8705723  -0.80838804  0.99366415  0.69702587]
 [ 0.05645085 -0.11717703  0.43390414  0.94069221]], shape=(4, 4), dtype=float64) 

Outputs are the same : True


In [12]:
#Hard Negative Mining

In [13]:
# Hardcoded matrix of similarity scores
sim_hardcoded = np.array(
    [
        [0.9, -0.8, 0.3, -0.5],
        [-0.4, 0.5, 0.1, -0.1],
        [0.3, 0.1, -0.4, -0.8],
        [-0.5, -0.2, -0.7, 0.5],
    ]
)

sim = sim_hardcoded

### START CODE HERE ###
# Try using different values for the matrix of similarity scores
# sim = 2 * np.random.random_sample((b,b)) -1   # random similarity scores between -1 and 1
# sim = sim_2                                   # the matrix calculated previously using vector normalization and dot product
### END CODE HERE ###

# Batch size
b = sim.shape[0]

print("-- Inputs --")
print(f"sim:")
print(sim)
print(f"shape: {sim.shape}\n")

# Positives
# All the s(A,P) values : similarities from duplicate question pairs (aka Positives)
# These are along the diagonal
sim_ap = np.diag(sim)
print("sim_ap:")
print(np.diag(sim_ap))


# Negatives
# all the s(A,N) values : similarities the non duplicate question pairs (aka Negatives)
# These are in the off diagonals
sim_an = sim - np.diag(sim_ap)
print("\nsim_an:")
print(sim_an)

print("\n-- Outputs --")
# Mean negative
# Average of the s(A,N) values for each row
mean_neg = np.sum(sim_an, axis=1, keepdims=True) / (b - 1)
print("\nmean_neg:")
print(mean_neg)

# Closest negative
# Max s(A,N) that is <= s(A,P) for each row
mask_1 = np.identity(b) == 1            # mask to exclude the diagonal
mask_2 = sim_an > sim_ap.reshape(b, 1)  # mask to exclude sim_an > sim_ap
mask = mask_1 | mask_2
sim_an_masked = np.copy(sim_an)         # create a copy to preserve sim_an
sim_an_masked[mask] = -2

closest_neg = np.max(sim_an_masked, axis=1, keepdims=True)
print("\nclosest_neg :")
print(closest_neg)

-- Inputs --
sim:
[[ 0.9 -0.8  0.3 -0.5]
 [-0.4  0.5  0.1 -0.1]
 [ 0.3  0.1 -0.4 -0.8]
 [-0.5 -0.2 -0.7  0.5]]
shape: (4, 4)

sim_ap:
[[ 0.9  0.   0.   0. ]
 [ 0.   0.5  0.   0. ]
 [ 0.   0.  -0.4  0. ]
 [ 0.   0.   0.   0.5]]

sim_an:
[[ 0.  -0.8  0.3 -0.5]
 [-0.4  0.   0.1 -0.1]
 [ 0.3  0.1  0.  -0.8]
 [-0.5 -0.2 -0.7  0. ]]

-- Outputs --

mean_neg:
[[-0.33333333]
 [-0.13333333]
 [-0.13333333]
 [-0.46666667]]

closest_neg :
[[ 0.3]
 [ 0.1]
 [-0.8]
 [-0.2]]


In [None]:
##########TensorFlow

In [22]:
# Hardcoded matrix of similarity scores
sim_hardcoded = np.array(
    [
        [0.9, -0.8, 0.3, -0.5],
        [-0.4, 0.5, 0.1, -0.1],
        [0.3, 0.1, -0.4, -0.8],
        [-0.5, -0.2, -0.7, 0.5],
    ]
)

sim = sim_hardcoded

### START CODE HERE ###
# Try using different values for the matrix of similarity scores
# sim = 2 * np.random.random_sample((b,b)) -1   # random similarity scores between -1 and 1
# sim = sim_2                                   # the matrix calculated previously using vector normalization and dot product
### END CODE HERE ###

# Batch size
b = sim.shape[0]

print("-- Inputs --")
print("sim :")
print(sim)
print("shape :", sim.shape, "\n")

# Positives
# All the s(A,P) values : similarities from duplicate question pairs (aka Positives)
# These are along the diagonal
sim_ap = tf.linalg.diag_part(sim) # this is just a 1D array of diagonal elements
print("sim_ap :")
# tf.linalg.diag makes a diagonal matrix given an array
print(tf.linalg.diag(sim_ap), "\n")

# Negatives
# all the s(A,N) values : similarities the non duplicate question pairs (aka Negatives)
# These are in the off diagonals
sim_an = sim - tf.linalg.diag(sim_ap)
print("sim_an :")
print(sim_an, "\n")

print("-- Outputs --")
# Mean negative
# Average of the s(A,N) values for each row
mean_neg = tf.math.reduce_sum(sim_an, axis=1) / (b - 1)
print("mean_neg :")
print(mean_neg, "\n")

# Closest negative
# Max s(A,N) that is <= s(A,P) for each row
mask_1 = tf.eye(b) == 1            # mask to exclude the diagonal
mask_2 = sim_an > tf.expand_dims(sim_ap, 1)  # mask to exclude sim_an > sim_ap
mask = tf.cast(mask_1 | mask_2, tf.float64)
sim_an_masked = sim_an - 2.0*mask

closest_neg = tf.math.reduce_max(sim_an_masked, axis=1)
print("closest_neg :")
print(closest_neg, "\n")

-- Inputs --
sim :
[[ 0.9 -0.8  0.3 -0.5]
 [-0.4  0.5  0.1 -0.1]
 [ 0.3  0.1 -0.4 -0.8]
 [-0.5 -0.2 -0.7  0.5]]
shape : (4, 4) 

sim_ap :
tf.Tensor(
[[ 0.9  0.   0.   0. ]
 [ 0.   0.5  0.   0. ]
 [ 0.   0.  -0.4  0. ]
 [ 0.   0.   0.   0.5]], shape=(4, 4), dtype=float64) 

sim_an :
tf.Tensor(
[[ 0.  -0.8  0.3 -0.5]
 [-0.4  0.   0.1 -0.1]
 [ 0.3  0.1  0.  -0.8]
 [-0.5 -0.2 -0.7  0. ]], shape=(4, 4), dtype=float64) 

-- Outputs --
mean_neg :
tf.Tensor([-0.33333333 -0.13333333 -0.13333333 -0.46666667], shape=(4,), dtype=float64) 

closest_neg :
tf.Tensor([ 0.3  0.1 -0.8 -0.2], shape=(4,), dtype=float64) 



In [None]:
#The Loss Functions

In [23]:
# Alpha margin
alpha = 0.25

# Modified triplet loss
# Loss 1
l_1 = tf.maximum(mean_neg - sim_ap + alpha, 0)
print(f"Loss 1: {l_1}\n")
# Loss 2
l_2 = tf.maximum(closest_neg - sim_ap + alpha, 0)
print(f"Loss 2: {l_2}\n")
# Loss full<
l_full = l_1 + l_2
# Cost
cost = tf.math.reduce_sum(l_full)

print("-- Outputs --")
print("Loss full :")
print(l_full, "\n")
print("Cost :", "{:.3f}".format(cost))

Loss 1: [0.         0.         0.51666667 0.        ]

Loss 2: [0. 0. 0. 0.]

-- Outputs --
Loss full :
tf.Tensor([0.         0.         0.51666667 0.        ], shape=(4,), dtype=float64) 

Cost : 0.517


In [None]:
##############Evaluate a Siamese model

In [None]:
#Inspecting the necessary elements

In [25]:
q1 = np.load('./q1.npy')
print(f'q1 has shape: {q1.shape} \n\nAnd it looks like this: \n\n {q1}\n\n')
q2 = np.load('./q2.npy')
print(f'q2 has shape: {q2.shape} \n\nAnd looks like this: \n\n {q2}\n\n')
y_test = np.load('./y_test.npy')
print(f'y_test has shape: {y_test.shape} \n\nAnd looks like this: \n\n {y_test}\n\n')
v1 = np.load('./v1.npy')
print(f'v1 has shape: {v1.shape} \n\nAnd looks like this: \n\n {v1}\n\n')
v2 = np.load('./v2.npy')
print(f'v2 has shape: {v2.shape} \n\nAnd looks like this: \n\n {v2}\n\n')

q1 has shape: (512, 64) 

And it looks like this: 

 [[ 32  38   4 ...   1   1   1]
 [ 30 156  78 ...   1   1   1]
 [ 32  38   4 ...   1   1   1]
 ...
 [ 32  33   4 ...   1   1   1]
 [ 30 156 317 ...   1   1   1]
 [ 30 156   6 ...   1   1   1]]


q2 has shape: (512, 64) 

And looks like this: 

 [[   30   156    78 ...     1     1     1]
 [  283   156    78 ...     1     1     1]
 [   32    38     4 ...     1     1     1]
 ...
 [   32    33     4 ...     1     1     1]
 [   30   156    78 ...     1     1     1]
 [   30   156 10596 ...     1     1     1]]


y_test has shape: (512,) 

And looks like this: 

 [0 1 1 0 0 0 0 1 0 1 1 0 0 0 1 1 1 0 1 1 0 0 0 0 1 1 0 0 0 0 1 0 1 1 0 0 0
 0 0 0 1 0 0 0 1 0 0 0 0 1 0 1 1 1 1 0 1 0 1 0 0 0 1 0 1 1 1 0 0 0 1 0 1 0
 0 0 0 1 0 0 1 1 0 0 0 1 0 1 1 0 1 0 0 0 1 0 1 0 0 0 0 1 1 1 0 1 0 1 1 0 0
 0 1 0 0 1 1 0 0 1 0 1 0 0 1 1 0 1 0 0 1 1 0 1 1 1 0 1 0 0 0 0 0 0 0 0 0 0
 1 0 1 1 1 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 1 0 0 0 0 0 1 1 0 1 0 1 1 0 1 1 1
 1 0 1 1 0 

In [26]:
#Calculating the accuracy
accuracy = 0
batch_size = 512 # Note: The max it can be is y_test.shape[0] i.e all the samples in test data
threshold = 0.7 # You can play around with threshold and then see the change in accuracy.

In [27]:
for j in range(batch_size):        # Iterate over each element in the batch
    d = math.reduce_sum(v1[j]*v2[j])# Compute the cosine similarity between the predictions as l2 normalized, ||v1[j]||==||v2[j]||==1 so only dot product is needed
    res = d > threshold            # Determine if this value is greater than the threshold (if it is consider the two questions as the same)
    accuracy += tf.cast(y_test[j] == res, tf.int32) # Compare against the actual target and if the prediction matches, add 1 to the accuracy

accuracy = accuracy / batch_size   # Divide the accuracy by the number of processed elements

In [28]:
print(f'The accuracy of the model is: {accuracy}')

The accuracy of the model is: 0.7421875
