In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import random

In [None]:
# Prepare dataset
def prepare_dataset(path):
    df = pd.read_csv(path, sep='\t', lineterminator='\n', header=None)
    df = df.rename(columns={0: 'name1', 1: 'name2', 3: 'label'})
    x1, x2, label = [], [], []
    name1, name2 = df.name1.values.tolist(), df.name2.values.tolist()
    for n1, n2 in zip(name1, name2):
        if random.random() > 0.5:
            x1.append(n1)
            x2.append(n2)
        else:
            x1.append(n2)
            x2.append(n1)
        label.append(1)
    all_name = np.asarray(name1 + name2)
    shuffle_name = all_name[np.random.permutation(np.arange(len(all_name)))]
    for n1, n2 in zip(all_name, shuffle_name):
        if random.random() > 0.5:
            x1.append(n1)
            x2.append(n2)
        else:
            x1.append(n2)
            x2.append(n1)
        label.append(0)
    return pd.DataFrame({'name1': x1, 'name2': x2, 'label': label})

url = 'https://raw.githubusercontent.com/ujjalkumarmaity/research-paper-implementation/main/Learning-Text-Similarity-with-Siamese-Recurrent-Networks/person_match.train2'
df = prepare_dataset(url)

In [None]:
# Tokenize and pad sequences
MAX_SEQ_LEN = 70
tokenizer = LabelEncoder()
tokenizer.fit(df['name1'].tolist() + df['name2'].tolist())

def prepere_training_data(df, tokenizer):
    name1_seq = tokenizer.transform(df['name1'])
    name2_seq = tokenizer.transform(df['name2'])
    name1_seq = np.pad(name1_seq, (0, MAX_SEQ_LEN - len(name1_seq)), 'constant')
    name2_seq = np.pad(name2_seq, (0, MAX_SEQ_LEN - len(name2_seq)), 'constant')
    return name1_seq, name2_seq, df['label'].values

train, test = train_test_split(df, test_size=0.2, stratify=df['label'])
train_text2seq_1, train_text2seq_2, train_label = prepere_training_data(train, tokenizer)
test_text2seq_1, test_text2seq_2, test_label = prepere_training_data(test, tokenizer)

In [None]:
# Create Dataset and DataLoader
class SiameseDataset(Dataset):
    def __init__(self, seq1, seq2, labels):
        self.seq1 = seq1
        self.seq2 = seq2
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.seq1[idx], dtype=torch.long), torch.tensor(self.seq2[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.float32)

train_dataset = SiameseDataset(train_text2seq_1, train_text2seq_2, train_label)
test_dataset = SiameseDataset(test_text2seq_1, test_text2seq_2, test_label)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
# Define the model
class SiameseNetwork(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(SiameseNetwork, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, 128)
        self.out = nn.Linear(128, 1)

    def forward_one(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = torch.mean(x, dim=1)
        x = self.fc(x)
        return x

    def forward(self, input1, input2):
        output1 = self.forward_one(input1)
        output2 = self.forward_one(input2)
        return output1, output2

In [None]:
# Euclidean distance
def euclidean_distance(x, y):
    return torch.sqrt(torch.sum((x - y) ** 2, dim=1))

In [None]:
# Contrastive loss
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_dist = euclidean_distance(output1, output2)
        loss_contrastive = torch.mean((1 - label) * torch.pow(euclidean_dist, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_dist, min=0.0), 2))
        return loss_contrastive

In [None]:
# Training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseNetwork(len(tokenizer.classes_), 16, 64).to(device)
criterion = ContrastiveLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    for seq1, seq2, labels in train_loader:
        seq1, seq2, labels = seq1.to(device), seq2.to(device), labels.to(device)
        optimizer.zero_grad()
        output1, output2 = model(seq1, seq2)
        loss = criterion(output1, output2, labels)
        loss.backward()
        optimizer.step()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
# Evaluation
model.eval()
with torch.no_grad():
    correct, total = 0, 0
    for seq1, seq2, labels in test_loader:
        seq1, seq2, labels = seq1.to(device), seq2.to(device), labels.to(device)
        output1, output2 = model(seq1, seq2)
        euclidean_dist = euclidean_distance(output1, output2)
        predictions = (euclidean_dist < 0.5).float()
        correct += (predictions == labels).sum().item()
        total += labels.size(0)
    print(f'Accuracy: {correct / total:.4f}')