In this notebook we will define and train the **Siamese Networks** which is a powerfull tool for defining 2 similar objects. In this example we will take text as an object, but the Siamese Network can also detect similar images (say signature checking, face recognition etc.). <br>
The idea is create two sub-networks (sister-networks) which share identical parameters. This means that you **only** need to train one set of weights. The output of each sub-network is a vector. You can then run the output through a cosine similarity function to get the similarity score. <br>

### Imports

In [1]:
import numpy as np
import pandas as pd
import random as rnd
import tensorflow as tf

I used "Quora question answer" dataset to create a model which could predict if 2 questions are similar.

In [3]:
data = pd.read_csv("questions.csv")
N = len(data)
print('Number of question pairs: ', N)
data.head()

Number of question pairs:  404351


Unnamed: 0,id,qid1,qid2,question1,question2,is_duplicate
0,0,1,2,What is the step by step guide to invest in sh...,What is the step by step guide to invest in sh...,0
1,1,3,4,What is the story of Kohinoor (Koh-i-Noor) Dia...,What would happen if the Indian government sto...,0
2,2,5,6,How can I increase the speed of my internet co...,How can Internet speed be increased by hacking...,0
3,3,7,8,Why am I mentally very lonely? How can I solve...,Find the remainder when [math]23^{24}[/math] i...,0
4,4,9,10,"Which one dissolve in water quikly sugar, salt...",Which fish would survive in salt water?,0


We have pairs of questions (question1 and question2) and in the column is_duplicate 0 - no, 1 - yes. Also each question has its id.

### Train-Val-Test Split

Let's start with splitting the dataset into train and test. Around 74% - train and all the rest - test.

In [5]:
N_train = 300000
N_test = 10240
data_train = data[:N_train]
data_test = data[N_train:N_train + N_test]
print("Train set:", len(data_train), "Test set:", len(data_test))
del (data)

Train set: 300000 Test set: 10240


To train the Siamese Network we will give to her pairs of duplicated questions.

In [8]:
td_index = data_train['is_duplicate'] == 1

In [10]:
td_index = [i for i, x in enumerate(td_index) if x]
print('Number of duplicate questions: ', len(td_index))
print('Indexes of first ten duplicate questions:', td_index[:10])

Number of duplicate questions:  111486
Indexes of first ten duplicate questions: [5, 7, 11, 12, 13, 15, 16, 18, 20, 29]


In [12]:
print(data_train['question1'][5])
print(data_train['question2'][5])
print('is_duplicate: ', data_train['is_duplicate'][5])

Astrology: I am a Capricorn Sun Cap moon and cap rising...what does that say about me?
I'm a triple Capricorn (Sun, Moon and ascendant in Capricorn) What does this say about me?
is_duplicate:  1


Keeping only the rows in the original training set that correspond to the rows where td_index is True --> Duplicates

In [16]:
Q1_train = np.array(data_train['question1'][td_index])
Q2_train = np.array(data_train['question2'][td_index])

Q1_test = np.array(data_test['question1'])
Q2_test = np.array(data_test['question2'])
y_test  = np.array(data_test['is_duplicate'])

In [18]:
print('TRAINING QUESTIONS:\n')
print('Question 1: ', Q1_train[0])
print('Question 2: ', Q2_train[0], '\n')
print('Question 1: ', Q1_train[5])
print('Question 2: ', Q2_train[5], '\n')

print('TESTING QUESTIONS:\n')
print('Question 1: ', Q1_test[0])
print('Question 2: ', Q2_test[0], '\n')
print('is_duplicate =', y_test[0], '\n')

TRAINING QUESTIONS:

Question 1:  Astrology: I am a Capricorn Sun Cap moon and cap rising...what does that say about me?
Question 2:  I'm a triple Capricorn (Sun, Moon and ascendant in Capricorn) What does this say about me? 

Question 1:  What would a Trump presidency mean for current international master’s students on an F1 visa?
Question 2:  How will a Trump presidency affect the students presently in US or planning to study in US? 

TESTING QUESTIONS:

Question 1:  How do I prepare for interviews for cse?
Question 2:  What is the best way to prepare for cse? 

is_duplicate = 0 



Splitting training set into training/validation sets 80/20

In [22]:
cut_off = int(len(Q1_train) * 0.8)
train_Q1, train_Q2 = Q1_train[:cut_off], Q2_train[:cut_off]
val_Q1, val_Q2 = Q1_train[cut_off:], Q2_train[cut_off:]
print('Number of duplicate questions: ', len(Q1_train))
print("The length of the training set is:  ", len(train_Q1))
print("The length of the validation set is: ", len(val_Q1))

Number of duplicate questions:  111486
The length of the training set is:   89188
The length of the validation set is:  22298


### Encoding

Now once we need to take care of the encoding (transfering text into numbers). 
We can start with learning (using .adapt() )all words from training dataset and we will use these words (word dictionary) to encode each word.

In [25]:
tf.random.set_seed(0)
text_vectorization = tf.keras.layers.TextVectorization(output_mode='int',split='whitespace', standardize='strip_punctuation')
text_vectorization.adapt(np.concatenate((train_Q1,train_Q2)))

In [27]:
print(f'Vocabulary size: {text_vectorization.vocabulary_size()}')

Vocabulary size: 32819


In [29]:
print('first question in the train set:\n')
print(Q1_train[0], '\n') 
print('encoded version:')
print(text_vectorization(Q1_train[0]),'\n')

print('first question in the test set:\n')
print(Q1_test[0], '\n')
print('encoded version:')
print(text_vectorization(Q1_test[0]) )

first question in the train set:

Astrology: I am a Capricorn Sun Cap moon and cap rising...what does that say about me? 

encoded version:
tf.Tensor(
[ 6123     6   178    10  9079  2220 32055   788    13  6047 25433    30
    28   463    45    98], shape=(16,), dtype=int64) 

first question in the test set:

How do I prepare for interviews for cse? 

encoded version:
tf.Tensor([    4     8     6   157    17  1909    17 11616], shape=(8,), dtype=int64)


### Define the Siamese Model

**The model's architecture:** <br>
Input 1 and 2 - 2 questions <br>
Embedding <br>
LSTM Layer - our NN <br>
Output - vector 1 and vector 2 (we will calculate cosine similarity for them).

In [54]:
def Siamese(text_vectorizer, vocab_size, d_feature=128):
    branch = tf.keras.Sequential([
        text_vectorizer,
        tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=d_feature, name="embedding"),
        tf.keras.layers.LSTM(units=d_feature, return_sequences=True, name="LSTM"),
        tf.keras.layers.GlobalAveragePooling1D(name='mean'),
        tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1), name='out')
    ], name='sequential')

    input1 = tf.keras.layers.Input(shape=(1,), dtype=tf.string, name='input_1')
    input2 = tf.keras.layers.Input(shape=(1,), dtype=tf.string, name='input_2')

    processed1 = branch(input1)
    processed2 = branch(input2)

    conc = tf.keras.layers.Concatenate(axis=1, name='conc_1_2')([processed1, processed2])

    return tf.keras.models.Model(inputs=[input1, input2], outputs=conc, name="SiameseModel")

In [56]:
odel = Siamese(text_vectorization, vocab_size=vocab_size)
model.build(input_shape=None)
model.summary()
model.get_layer(name='sequential').summary()

### Triplet Loss Function

One of loss functions which can be used with Siamese Model is Triplet Loss Function. The idea is to take the Anchor (baseline) vector, Positive vector (should be the most similar with Anchor) and Negative vector (should be different from Anchor). **We need to minimize the distance between Anchor and Positive vectors and maximize it between Ancor and Negative vectors.** <br>
$$\mathcal{L}(A, P, N)=\max \left(\|\mathrm{f}(A)-\mathrm{f}(P)\|^{2}-\|\mathrm{f}(A)-\mathrm{f}(N)\|^{2}+\alpha, 0\right),$$ <br> where alpha is a margin (hyperparameter used to enforce a margin between similar and dissimilar examples).

To implement Triplet Loss Function we can use **Negative Hard Mining**. 

In [223]:
def TripletLossFn(v1, v2, margin=0.25):
    # Input - we take set of 2 embeddings (v1 and v2, where one of the is anchor and another is pos or neg.
    # Input - also we will define margin (which is alpha).

    # Start with calculating cosine similarities between each vector.
    scores = tf.linalg.matmul(v2, v1, transpose_b=True)
    batch_size = tf.cast(tf.shape(v1)[0], scores.dtype) 

    #The diagonal elements of the scores matrix correspond to the similarity scores of positive pairs, 
    # where the corresponding embeddings in v1 and v2 are from the same class.
    positive = tf.linalg.diag_part(scores)

    # Now subtract all positives to leave only negatives and calculate them
    negative_zero_on_duplicate = scores - tf.linalg.diag(positive)

    # The mean of the negative scores is computed for each anchor, excluding the positive score
    mean_negative = tf.math.reduce_sum(negative_zero_on_duplicate, axis=1) / (batch_size - 1)

    # Now find "the hardest negative" - most similar non-positive pair 
    mask_exclude_positives = tf.cast((tf.eye(batch_size) == 1)|(negative_zero_on_duplicate > tf.expand_dims(positive, 1)),
                                    scores.dtype) # creates a mask to ignore diagonal elements (positives) and to filter out non-hard negatives
    negative_without_positive = negative_zero_on_duplicate - mask_exclude_positives * 2.0 # removes the influence of positive pairs and non-hard negatives
    closest_negative = tf.math.reduce_max(negative_without_positive, axis=1) # identifies the hardest negative for each anchor, i.e., the negative sample most similar to the anchor
    
    # This penalizes based on the hardest negative, ensuring that the hardest negative is at least margin away from the positive.
    triplet_loss1 = tf.maximum(0.0, margin - positive + closest_negative)

    # This penalizes based on the mean negative score, ensuring the average negative is at least margin away from the positive.
    triplet_loss2 = tf.maximum(0.0, margin - positive + mean_negative)

    # The final triplet loss is the sum of the two computed losses.
    triplet_loss = tf.math.reduce_sum(triplet_loss1 + triplet_loss2)
    return triplet_loss

In [60]:
v1 = np.array([[0.26726124, 0.53452248, 0.80178373],[0.5178918 , 0.57543534, 0.63297887]])
v2 = np.array([[ 0.26726124,  0.53452248,  0.80178373],[-0.5178918 , -0.57543534, -0.63297887]])
print("Triplet Loss:", TripletLossFn(v1,v2).numpy())

Triplet Loss: 0.703507682515891


In [62]:
def TripletLoss(labels, out, margin=0.25):
    _, out_size = out.shape
    v1 = out[:,:int(out_size/2)]
    v2 = out[:,int(out_size/2):]
    return TripletLossFn(v1, v2, margin=margin)

### Training the model

Now we will train the model on train and validation datasets.

In [64]:
train_dataset = tf.data.Dataset.from_tensor_slices(((train_Q1, train_Q2), tf.constant([1] * len(train_Q1))))
val_dataset = tf.data.Dataset.from_tensor_slices(((val_Q1, val_Q2), tf.constant([1] * len(val_Q1))))

In [66]:
def train_model(Siamese, TripletLoss, text_vectorizer, train_dataset, val_dataset, d_feature=128, lr=0.01, train_steps=5):
    model = Siamese(text_vectorizer, vocab_size=text_vectorizer.vocabulary_size(), d_feature=d_feature)
    adam = tf.keras.optimizers.Adam(learning_rate=lr)
    model.compile(loss=TripletLoss, optimizer=adam)
    model.fit(train_dataset, epochs=train_steps, validation_data=val_dataset)
    return model

In [68]:
train_steps = 2
batch_size = 256
train_generator = train_dataset.shuffle(len(train_Q1), seed=7, reshuffle_each_iteration=True).batch(batch_size=batch_size)
val_generator = val_dataset.shuffle(len(val_Q1), seed=7, reshuffle_each_iteration=True).batch(batch_size=batch_size)

In [70]:
model = train_model(Siamese, TripletLoss, text_vectorization, train_generator, val_generator, train_steps=train_steps)

Epoch 1/2
[1m349/349[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 92ms/step - loss: 59.2880 - val_loss: 15.5867
Epoch 2/2
[1m349/349[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 97ms/step - loss: 11.5755 - val_loss: 11.1240


### Evaluating Siamese Network

We will evaluate the accuracy of the model over the defined threshold.

In [84]:
# Function to test the accuracy of the model
def classify(test_Q1, test_Q2, y_test, threshold, model, batch_size=64, verbose=True):
    y_pred = []
    test_gen = tf.data.Dataset.from_tensor_slices(((test_Q1, test_Q2),None)).batch(batch_size=batch_size)
    
    pred = model.predict(test_gen)
    _, n_feat = pred.shape
    v1 = pred[:,:int(n_feat/2)]
    v2 = pred[:,int(n_feat/2):]
    
    d  = tf.math.reduce_sum(tf.multiply(v1, v2), axis = 1)

    y_pred = tf.cast(d > threshold , tf.float64)
    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y_test), tf.float32))
    cm = tf.math.confusion_matrix(y_test, y_pred)
        
    return accuracy, cm

In [86]:
# Evaluating...
accuracy, cm = classify(Q1_test,Q2_test, y_test, 0.7, model,  batch_size = 512) 
print("Accuracy", accuracy.numpy())
print(f"Confusion matrix:\n{cm.numpy()}")

[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 59ms/step
Accuracy 0.73525393
Confusion matrix:
[[4442 1940]
 [ 771 3087]]


In [227]:
tp = 4442
fp = 1940
fn = 771
tn = 3087

precision = (tp/(tp+fp))
print(f'precision = {precision}')

recall = (tp/(tp+fn))
print(f'recall = {recall}')

precision = 0.6960200564086493
recall = 0.8521005179359294


**Conclusions:** <br>
**Accuracy - about 73.5%** of the predictions made by the Siamese Network are correct. While this is a reasonable level of accuracy, it indicates that there is still room for improvement. <br>
**False Positive Rate (FPR):** <br>
The number of false positives (1940) is relatively high compared to true positives (4442). This indicates that the model is prone to **incorrectly predicting dissimilar pairs as similar.** <br>
**False Negative Rate (FNR):** <br>
The false negatives (771) are lower than the false positives but still significant. The model **misses some pairs that are actually similar**. <br>
**Precision and Recall:** <br>
The model is more focused on recalling similar pairs (fewer misses) than on precision (more false alarms).<br>
**Balance Between Classes:** <br>
The confusion matrix indicates a significant number of both positive and negative examples, meaning the dataset is relatively balanced. However, the model's performance on predicting negatives (similar) seems weaker.

### Predict Function to test out

In [179]:
# PLAYGROUND
def predict(question1, question2, threshold, model, verbose=False):
    q1 = np.array([question1])
    q2 = np.array([question2])
    dataset = tf.data.Dataset.from_tensor_slices(((q1, q2),None)).batch(batch_size=1)
    
    pred = model.predict(dataset)
    _, n_feat = pred.shape
    v1 = pred[:,:int(n_feat/2)]
    v2 = pred[:,int(n_feat/2):]

    
    d  = tf.math.reduce_sum(tf.multiply(v1, v2), axis = 1)
    res = d > threshold

    message = f'Cosine Similarity = {d.numpy()}, Result: {res.numpy()}'
    
    if verbose:
        print("Q1  = ", question1, "\nQ2  = ", question2)
        print("d   = ", d.numpy())
        print("res = ", res.numpy())
        
    return message


In [191]:
question1 = "When will I see you?"
question2 = "When can I see you again?"

result = predict(question1, question2, 0.65, model)
print("Prediction result:", result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
Prediction result: Cosine Similarity = [0.88742036], Result: [ True]


In [193]:
question1 = "What city is the capital of USA?"
question2 = "What is the US capital?"

result = predict(question1, question2, 0.65, model)
print("Prediction result:", result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
Prediction result: Cosine Similarity = [0.69174474], Result: [ True]


In [195]:
question1 = "Do you enjoy reading books in the free time?"
question2 = "Do you like to read novels when you have spare time?"

result = predict(question1, question2, 0.65, model)
print("Prediction result:", result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
Prediction result: Cosine Similarity = [0.67481375], Result: [ True]


In [197]:
question1 = "Where did you find your cat?"
question2 = "When did your cat get back home?"

result = predict(question1, question2, 0.65, model)
print("Prediction result:", result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step
Prediction result: Cosine Similarity = [0.3179947], Result: [False]


In [199]:
question1 = "Do they enjoy eating the dessert?"
question2 = "Do they like hiking in the desert?"
result = predict(question1, question2, 0.65, model)
print("Prediction result:", result)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step
Prediction result: Cosine Similarity = [0.6256363], Result: [False]
