# Siamese BiLSTM Neural Network with Attention

In [1]:
import sys
import torch
import logging
import warnings
import numpy as np
import pandas as pd
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from scipy.stats import pearsonr
from gensim.models import KeyedVectors
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

logging.disable(logging.WARNING)
warnings.filterwarnings('ignore')
np.set_printoptions(threshold=sys.maxsize)

In [3]:
modelpath = "GoogleNews-vectors-negative300.bin"
model = KeyedVectors.load_word2vec_format(modelpath, binary=True)
word2idx = {word: i for i, word in enumerate(model.index_to_key)}

In [14]:
def pearson_corr(y_true, y_pred):
    corr, _ = pearsonr(y_true, y_pred)
    return corr

## Cross-Lingual Semantic Similarity

In [2]:
# Load all the data
train_path = './data/train-en-es.csv'
test_path = './data/test-en-es.csv'
val_path = './data/validation-en-es.csv'

train_data = pd.read_csv(train_path)
test_data = pd.read_csv(test_path)
val_data = pd.read_csv(val_path)

In [3]:
def load_word_vectors_es(file_path):
    word_to_vec = {}
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            values = line.strip().split()
            word = values[0]
            vector = np.array(values[1:], dtype=np.float32)
            word_to_vec[word] = vector
    return word_to_vec


modelpath = "GoogleNews-vectors-negative300.bin"
model = KeyedVectors.load_word2vec_format(modelpath, binary=True)
word2idx = {word: i for i, word in enumerate(model.index_to_key)}
model_spanish_path = "./SBW-vectors-300-min5.txt"
word2idx_es = load_word_vectors_es(model_spanish_path)

In [4]:
vocab_en = {}
vocab_es = {}
j = 0   
sentences_1 = train_data['sentence1'].apply(eval)
sentences_2 = train_data['sentence2'].apply(eval)

for i in range(len(sentences_1)):
    for word in sentences_1[i]:
        if word not in vocab_en and word in model.key_to_index:
            vocab_en[word] = j
            j += 1
    for word in sentences_2[i]:
        if word not in vocab_es and word in word2idx_es:
            vocab_es[word] = j
            j += 1


word2idx_dataset = {}
for i in list(vocab_en.keys()):
    word2idx_dataset[vocab_en[i]] = word2idx[i]
word2idx_dataset['unk'] = len(word2idx_dataset)
word_indices = word2idx_dataset.values()
dataset_embed_matrix = model.vectors[np.array(list(word_indices))]


word2idx_es_dataset = {}
for i in list(vocab_es.keys()):
    word2idx_es_dataset[vocab_es[i]] = word2idx_es[i]
word2idx_es_dataset['unk'] = len(word2idx_es_dataset)
word_indices_es = list(word2idx_es_dataset.values())
dataset_embed_matrix_es = word_indices_es
# dataset_embed_matrix_es = model_spanish.vectors[np.array(list(word_indices))]

In [5]:
class CustomDataset1(Dataset):
    def __init__(self, sentences1, sentences2, scores, word2idx, word2idx_es):
        self.sentences1 = sentences1
        self.sentences2 = sentences2
        self.scores = scores
        self.word2idx = word2idx
        self.word2idx_es = word2idx_es

    def __len__(self):
        return max(len(self.sentences1), len(self.sentences2))

    def __getitem__(self, idx):
        unk_token = self.word2idx['unk']
        unk_token_es = self.word2idx_es['unk']-1
        sentence1 = self.sentences1[idx]
        sentence2 = self.sentences2[idx]
        score = self.scores[idx]
        seq1 = [self.word2idx[word] if word in self.word2idx else unk_token for word in sentence1]
        seq2 = [self.word2idx_es[word] if word in self.word2idx_es else unk_token_es for word in sentence2]
        return seq1, seq2, score

    def collate_fn(self, batch):
        sequences1, sequences2, scores = zip(*batch)
        padded_seqs1 = pad_sequence([torch.LongTensor(seq) for seq in sequences1], batch_first=True, padding_value=0)
        padded_seqs2 = pad_sequence([torch.LongTensor(seq) for seq in sequences2], batch_first=True, padding_value=0)
        return padded_seqs1, padded_seqs2, torch.LongTensor(scores)

In [6]:
train_data['sentence1'] = train_data['sentence1'].apply(eval)
train_data['sentence2'] = train_data['sentence2'].apply(eval)
val_data['sentence1'] = val_data['sentence1'].apply(eval)
val_data['sentence2'] = val_data['sentence2'].apply(eval)
test_data['sentence1'] = test_data['sentence1'].apply(eval)
test_data['sentence2'] = test_data['sentence2'].apply(eval)

In [7]:
batch_size = 16

train_dataset = CustomDataset1(train_data['sentence1'], train_data['sentence2'], train_data['similarity_score'], word2idx_dataset, word2idx_es_dataset)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=train_dataset.collate_fn)
val_dataset = CustomDataset1(val_data['sentence1'], val_data['sentence2'], val_data['similarity_score'], word2idx_dataset, word2idx_es_dataset)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, collate_fn=val_dataset.collate_fn)
test_dataset = CustomDataset1(test_data['sentence1'], test_data['sentence2'], test_data['similarity_score'], word2idx_dataset, word2idx_es_dataset)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=test_dataset.collate_fn)

In [8]:
class SiameseBiLSTM(nn.Module):
    def __init__(self, hidden_size, num_layers, embedding_dim, embd_matrix, embd_matrix_es, dropout=0.2):
        super(SiameseBiLSTM, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding_dim = embedding_dim
        self.embd_matrix = embd_matrix
        self.embd_matrix_es = embd_matrix_es

        self.word_embeddings = nn.Embedding(len(embd_matrix), embedding_dim)
        self.word_embeddings.weight = nn.Parameter(torch.from_numpy(self.embd_matrix))
        self.word_embeddings.weight.requires_grad = False


        self.word_embeddings_es = nn.Embedding(len(embd_matrix_es), embedding_dim)
        self.word_embeddings_es.weight = nn.Parameter(torch.from_numpy(self.embd_matrix_es))
        self.word_embeddings_es.weight.requires_grad = False

        self.bilstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.attention_fc = nn.Linear(hidden_size * 2, 1)
        self.attention_softmax = nn.Softmax(dim=1)
        self.fc = nn.Linear(hidden_size * 4, 1)  # 4 because we concatenate forward and backward hidden states of both LSTMs


    def forward_once(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.bilstm(embeds)
        lstm_out = self.dropout(lstm_out)
        attention_weights = self.attention_softmax(self.attention_fc(lstm_out))
        lstm_out = lstm_out * attention_weights
        lstm_out = lstm_out.sum(dim=1)
        return lstm_out
    

    def forward_once_es(self, sentence):
        embeds = self.word_embeddings_es(sentence)
        lstm_out, _ = self.bilstm(embeds)
        lstm_out = self.dropout(lstm_out)
        attention_weights = self.attention_softmax(self.attention_fc(lstm_out))
        lstm_out = lstm_out * attention_weights
        lstm_out = lstm_out.sum(dim=1)
        return lstm_out
    

    def forward(self, sentence1, sentence2):
        # Process sentence 1
        output1 = self.forward_once(sentence1)
        # Process sentence 2
        output2 = self.forward_once_es(sentence2)
        # Concatenate outputs of both LSTMs
        concatenated = torch.cat((output1, output2), dim=1)
        # Pass through similarity scoring layer
        similarity_score = torch.sigmoid(self.fc(concatenated))
        return similarity_score

In [9]:
# Define model and optimizer
model1 = SiameseBiLSTM(hidden_size=50, num_layers=2, embedding_dim=300, embd_matrix = dataset_embed_matrix, embd_matrix_es=np.array(dataset_embed_matrix_es[:-1]))
optimizer = torch.optim.Adam(model1.parameters(), lr=1e-3)
criterion = nn.MSELoss()
num_epochs = 10
model1.train()

for epoch in range(num_epochs):
    train_loss = 0
    for sentence1, sentence2, score in train_loader:
        sentence1_tensor = sentence1
        sentence2_tensor = sentence2
        score_tensor = torch.tensor(score, dtype=torch.float)/5
        optimizer.zero_grad()
        output = model1(sentence1_tensor, sentence2_tensor)
        loss = criterion(output.squeeze(), score_tensor.squeeze())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    print(f"Epoch = {epoch}\tTraining Loss = {train_loss/len(train_data)}")
    
    val_loss = 0
    with torch.no_grad():
        for sentence1, sentence2, score in val_loader:
            sentence1_tensor = sentence1
            sentence2_tensor = sentence2
            score_tensor = torch.tensor(score, dtype=torch.float)/5
            outputs = model1(sentence1_tensor, sentence2_tensor)
            val_loss = criterion(outputs.squeeze(), score_tensor.squeeze())
            val_loss += val_loss.item()
    
    print(f"Epoch = {epoch}\tValidation Loss = {val_loss/len(val_data)}")

Epoch = 0	Training Loss = 0.0052002502689881625
Epoch = 0	Validation Loss = 0.0001190970724564977
Epoch = 1	Training Loss = 0.005161643595897876
Epoch = 1	Validation Loss = 0.00015328056178987026
Epoch = 2	Training Loss = 0.005144361186248361
Epoch = 2	Validation Loss = 0.00016354123363271356
Epoch = 3	Training Loss = 0.005150131183190592
Epoch = 3	Validation Loss = 7.925344107206911e-05
Epoch = 4	Training Loss = 0.0051301934193073925
Epoch = 4	Validation Loss = 0.00015763085684739053
Epoch = 5	Training Loss = 0.00513353794398443
Epoch = 5	Validation Loss = 8.730412810109556e-05
Epoch = 6	Training Loss = 0.005127392474683103
Epoch = 6	Validation Loss = 0.00015541286848019809
Epoch = 7	Training Loss = 0.005122537403433069
Epoch = 7	Validation Loss = 0.00012636622705031186
Epoch = 8	Training Loss = 0.005132778882876669
Epoch = 8	Validation Loss = 0.00010160348756471649
Epoch = 9	Training Loss = 0.005116694439497382
Epoch = 9	Validation Loss = 0.00015398918185383081


In [10]:
train_predictions = []
train_labels = []
model1.eval()
for train_sentence1, train_sentence2, train_score in train_loader:
    train_sentence1_tensor = train_sentence1
    train_sentence2_tensor = train_sentence2
    train_score_tensor = torch.tensor(train_score, dtype=torch.float)/5.0
    train_output = model1(train_sentence1_tensor, train_sentence2_tensor)
    train_predictions.extend(train_output.tolist())
    train_labels.extend(train_score)
train_predictions = np.array(train_predictions)
train_labels = np.array(train_labels)
train_mse = mean_squared_error(train_labels, train_predictions)
print('Train MSE: {:.4f}'.format(train_mse))

Train MSE: 5.5655


In [11]:
val_predictions = []
val_labels = []
model1.eval()
for val_sentence1, val_sentence2, val_score in val_loader:
    val_sentence1_tensor = val_sentence1
    val_sentence2_tensor = val_sentence2
    val_score_tensor = torch.tensor(val_score, dtype=torch.float)/5.0
    val_output = model1(val_sentence1_tensor, val_sentence2_tensor)
    val_predictions.extend(val_output.tolist())
    val_labels.extend(val_score)
val_predictions = np.array(val_predictions)
val_labels = np.array(val_labels)
val_mse = mean_squared_error(val_labels, val_predictions)
print('Val MSE: {:.4f}'.format(val_mse))

Val MSE: 4.4869


In [12]:
test_predictions = []
test_labels = []
model1.eval()
for test_sentence1, test_sentence2, test_score in test_loader:
    test_sentence1_tensor = test_sentence1
    test_sentence2_tensor = test_sentence2
    test_score_tensor = torch.tensor(test_score, dtype=torch.float)/5.0
    test_output = model1(test_sentence1_tensor, test_sentence2_tensor)
    test_predictions.extend(test_output.tolist())
    test_labels.extend(test_score)
test_predictions = np.array(test_predictions)
test_labels = np.array(test_labels)
test_mse = mean_squared_error(test_labels, test_predictions)
print('Test MSE: {:.4f}'.format(test_mse))

Test MSE: 5.6714


In [15]:
corr = pearson_corr(train_labels, train_predictions.ravel())
corr

0.21464418087130838

In [16]:
corr = pearson_corr(val_labels, val_predictions.ravel())
corr

0.09902122183083445

In [17]:
corr = pearson_corr(test_labels, test_predictions.ravel())
corr

0.09840476343050476