This script trains a Bi-Encoder model for Sentence Similarity using the biencoder dataset.

The model is based on BERT and is trained using Cosine Similarity as the loss function.

The script includes steps for data loading, preprocessing, model training, and evaluation.

In [None]:
!pip install transformers datasets huggingface_hub

In [None]:
# Check GPU availability and set device
import torch
# If there's a GPU available...
if torch.cuda.is_available():
    # Tell PyTorch to use the GPU.
    device = torch.device("cuda")
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [None]:
# Authenticate with Hugging Face Hub
from huggingface_hub import notebook_login

notebook_login()

In [None]:
# Load dataset
from datasets import load_dataset
datasets = load_dataset("PhilipMay/stsb_multi_mt", "en")

In [None]:
datasets

In [None]:
from transformers import BertTokenizer

# Load the BERT tokenizer.
print('Loading BERT tokenizer...')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

In [None]:
# Define a function to normalize tensor range
import torch

def normalize_tensor_range(tensor, new_min=-1, new_max=1):

    min_val = tensor.min()
    max_val = tensor.max()

    # Scale to [0, 1]
    tensor_scaled = (tensor - min_val) / (max_val - min_val)

    # Scale to [new_min, new_max]
    tensor_normalized = tensor_scaled * (new_max - new_min) + new_min

    return tensor_normalized

# Example tensors
labels = torch.tensor(datasets['train']['similarity_score'])
valid_labels = torch.tensor(datasets['dev']['similarity_score'])
test_labels = torch.tensor(datasets['test']['similarity_score'])


# Normalize tensors to the range [-1, 1]
normalized_labels = normalize_tensor_range(labels)
valid_normalized_labels = normalize_tensor_range(valid_labels)
test_normalized_labels = normalize_tensor_range(test_labels)

print(f"Normalized Tensor 1: {torch.mean(normalized_labels), torch.mean(valid_normalized_labels), torch.mean(test_normalized_labels)}")

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler, Dataset

In [None]:
class biencoderDataset(Dataset):
    """
      A custom dataset class for the Bi-Encoder model.

      Args:
          sentence1: List of first sentences.
          sentence2: List of second sentences.
          normalized_labels: Normalized similarity scores.
          tokenizer: The tokenizer to use.
      """
    def __init__(self, sentence1, sentence2, normalized_labels , tokenizer):
        self.all_input_id1 = []
        self.all_input_id2 = []
        self.all_attn_masks1 = []
        self.all_attn_masks2 = []
        self.normalized_labels = normalized_labels

        for i, j in zip(sentence1, sentence2):
            tokenized_sentence1 = tokenizer(i, padding='longest')
            tokenized_sentence2 = tokenizer(j, padding='longest')
            self.all_input_id1.append(tokenized_sentence1['input_ids'])
            self.all_input_id2.append(tokenized_sentence2['input_ids'])
            self.all_attn_masks1.append(tokenized_sentence1['attention_mask'])
            self.all_attn_masks2.append(tokenized_sentence2['attention_mask'])

    def __len__(self):
        return len(self.all_input_id1)

    def __getitem__(self, idx):
        return torch.tensor(self.all_input_id1[idx]), torch.tensor(self.all_attn_masks1[idx]), torch.tensor(self.all_input_id2[idx]),torch.tensor(self.all_attn_masks2[idx]), self.normalized_labels[idx].item()

In [None]:
train_dataset = biencoderDataset(datasets['train']['sentence1'], datasets['train']['sentence2'],normalized_labels, tokenizer)

In [None]:
valid_dataset = biencoderDataset(datasets['dev']['sentence1'], datasets['dev']['sentence2'],valid_normalized_labels, tokenizer)

In [None]:
test_dataset = biencoderDataset(datasets['test']['sentence1'], datasets['test']['sentence2'],test_normalized_labels, tokenizer)

In [None]:
train_dataset[0]

In [None]:
class DataCollator:
        def __init__(self, tokenizer):
            self.tokenizer = tokenizer

        def pad_tensors(self, tensors, padding_value=0):

            return torch.nn.utils.rnn.pad_sequence(tensors, batch_first=True, padding_value=padding_value)

        def __call__(self, data):
            output_dict = {'input_ids': [f[0] for f in data] + [f[2] for f in data],
                           'attention_mask': [f[1] for f in data] + [f[3] for f in data],
                           'labels': [f[4] for f in data]}
            output_dict['all_input_ids'] = self.pad_tensors(output_dict['input_ids'],
                                                        padding_value=self.tokenizer.pad_token_id)
            output_dict['input_ids_1'] = output_dict['all_input_ids'][:len(data)]
            output_dict['input_ids_2'] = output_dict['all_input_ids'][len(data):]
            output_dict['labels'] = torch.tensor(output_dict['labels'])
            output_dict['all_attention_mask'] = self.pad_tensors(output_dict['attention_mask'], padding_value=0)
            output_dict['attention_mask_1'] = output_dict['all_attention_mask'][:len(data)]
            output_dict['attention_mask_2'] = output_dict['all_attention_mask'][len(data):]
            return output_dict

In [None]:
data_collator = DataCollator(tokenizer=tokenizer)

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=64, collate_fn=data_collator, shuffle=True)

In [None]:
for i in train_dataloader:
  print(i)
  break

In [None]:
valid_dataloader = DataLoader(valid_dataset, batch_size=64, collate_fn=data_collator)
test_dataloader = DataLoader(test_dataset, batch_size=64, collate_fn=data_collator)

In [None]:
from transformers import BertModel

In [None]:
import torch.nn.functional as F

In [None]:
from tqdm import tqdm

def pairwise_angle_sim(x, y):
    """
    Computes the absolute normalized angle distance. See :class:`~sentence_transformers.losses.AnglELoss`
    or https://arxiv.org/abs/2309.12871v1 for more information.

    Args:
        x (Tensor): The first tensor.
        y (Tensor): The second tensor.

    Returns:
        Tensor: Vector with res[i] = angle_sim(a[i], b[i])
    """
    a, b = torch.chunk(x, 2, dim=1)
    c, d = torch.chunk(y, 2, dim=1)

    z = torch.sum(c**2 + d**2, dim=1, keepdim=True)
    re = (a * c + b * d) / z
    im = (b * c - a * d) / z

    dz = torch.sum(a**2 + b**2, dim=1, keepdim=True) ** 0.5
    dw = torch.sum(c**2 + d**2, dim=1, keepdim=True) ** 0.5
    re /= dz / dw
    im /= dz / dw

    norm_angle = torch.sum(torch.concat((re, im), dim=1), dim=1)
    return torch.abs(norm_angle)


class CoSENTLoss(torch.nn.Module):
  def __init__(self, scale: float = 20.0) -> None:
      super().__init__()
      self.scale = scale

  def forward(self, scores, labels):

      scores = scores * self.scale
      scores = scores[:, None] - scores[None, :]

      # label matrix indicating which pairs are relevant
      labels = labels[:, None] < labels[None, :]
      labels = labels.float()

      # mask out irrelevant pairs so they are negligible after exp()
      scores = scores - (1 - labels) * 1e12

      # append a zero as e^0 = 1
      scores = torch.cat((torch.zeros(1).to(scores.device), scores.view(-1)), dim=0)
      loss = torch.logsumexp(scores, dim=0)

      return loss


def train(epochs, lr, model, train_dataloader, valid_dataloader, opt_func=torch.optim.Adam):
  history = []
  optimizer = opt_func(model.parameters(), lr)
  for epoch in range(epochs):
    model.train()
    train_losses = []
    valid_losses = []
    for batch in tqdm(train_dataloader):
      sentence_embeddings1 = []
      sentence_embeddings2 = []
      input_ids1 = batch['input_ids_1'].to(device)
      attention_mask1 = batch['attention_mask_1'].to(device)

      input_ids2 = batch['input_ids_2'].to(device)
      attention_mask2 = batch['attention_mask_2'].to(device)
      labels = batch['labels'].to(device)

      outputs1 = model(input_ids=input_ids1, attention_mask=attention_mask1)
      outputs2 = model(input_ids=input_ids2, attention_mask=attention_mask2)
      token_embeddings1 = outputs1.last_hidden_state
      token_embeddings2 = outputs2.last_hidden_state

      # unsqueeze attention to make it as same dimension as token embeddings
      attention_mask1_unsqueezed = attention_mask1.unsqueeze(-1)
      # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
      token_embeddings1_masked = torch.mul(token_embeddings1, attention_mask1_unsqueezed)
      # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
      sentence_embeddings1 = torch.sum(token_embeddings1_masked, dim=1) / torch.sum(attention_mask1, dim=1, keepdim=True)

      # unsqueeze attention to make it as same dimension as token embeddings
      attention_mask2_unsqueezed = attention_mask2.unsqueeze(-1)
      # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
      token_embeddings2_masked = torch.mul(token_embeddings2, attention_mask2_unsqueezed)
      # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
      sentence_embeddings2 = torch.sum(token_embeddings2_masked, dim=1) / torch.sum(attention_mask2, dim=1, keepdim=True)

      normalized_tensor1 = F.normalize(sentence_embeddings1, p=2, dim=1)
      normalized_tensor2 = F.normalize(sentence_embeddings2, p=2, dim=1)
      output_dot_product = pairwise_angle_sim(normalized_tensor1, normalized_tensor2)
      # output_dot_product = torch.nn.functional.cosine_similarity(normalized_tensor1 , normalized_tensor2, dim=1)
      # loss = torch.nn.MSELoss()(output_dot_product, labels)
      # loss = 1 - output_dot_product.mean()
      # loss = torch.mean(torch.abs(labels - output_dot_product))
      # print("output_dot_product", output_dot_product)
      # print("labels", labels)
      loss = CoSENTLoss()(output_dot_product, labels)
      # print("output_dot_product", output_dot_product)
      # angle_loss = AnglELoss(loss)(output_dot_product, labels)
      # print("labels", labels)
      # print("loss", loss.item())
      train_losses.append(loss)
      loss.backward() #calculate gradients
      optimizer.step()
      optimizer.zero_grad()
      # print(train_losses)
    total_loss = torch.stack(train_losses).mean().item()
    # print('epoc_training_loss', total_loss)
    # history.append(total_loss)

    model.eval()
    with torch.no_grad():
      for batch in tqdm(valid_dataloader):
        sentence_embeddings1 = []
        sentence_embeddings2 = []
        input_ids1 = batch['input_ids_1'].to(device)
        attention_mask1 = batch['attention_mask_1'].to(device)

        input_ids2 = batch['input_ids_2'].to(device)
        attention_mask2 = batch['attention_mask_2'].to(device)
        labels = batch['labels'].to(device)

        outputs1 = model(input_ids=input_ids1, attention_mask=attention_mask1)
        outputs2 = model(input_ids=input_ids2, attention_mask=attention_mask2)
        token_embeddings1 = outputs1.last_hidden_state
        token_embeddings2 = outputs2.last_hidden_state

        # unsqueeze attention to make it as same dimension as token embeddings
        attention_mask1_unsqueezed = attention_mask1.unsqueeze(-1)
        # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
        token_embeddings1_masked = torch.mul(token_embeddings1, attention_mask1_unsqueezed)
        # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
        sentence_embeddings1 = torch.sum(token_embeddings1_masked, dim=1) / torch.sum(attention_mask1, dim=1, keepdim=True)

        # unsqueeze attention to make it as same dimension as token embeddings
        attention_mask2_unsqueezed = attention_mask2.unsqueeze(-1)
        # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
        token_embeddings2_masked = torch.mul(token_embeddings2, attention_mask2_unsqueezed)
        # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
        sentence_embeddings2 = torch.sum(token_embeddings2_masked, dim=1) / torch.sum(attention_mask2, dim=1, keepdim=True)

        normalized_tensor1 = F.normalize(sentence_embeddings1, p=2, dim=1)
        normalized_tensor2 = F.normalize(sentence_embeddings2, p=2, dim=1)

        output_dot_product = torch.nn.functional.cosine_similarity(normalized_tensor1, normalized_tensor2, dim=1)
        # print("output_dot_product", output_dot_product)
        # print("labels", labels)
        # loss = torch.nn.MSELoss()(output_dot_product, labels)
        # loss = 1 - output_dot_product.mean()
        loss = torch.mean(torch.abs(labels - output_dot_product))
        # print("loss", loss.item())
        valid_losses.append(loss)
        total_valid_loss = torch.stack(valid_losses).mean().item()
    print('epoch_training_loss: {}, epoch_validation_loss: {}'.format(total_loss, total_valid_loss))


In [None]:
# Load the pre-trained BERT model
model = BertModel.from_pretrained('bert-base-uncased')
model.to(device)


In [None]:
# Training the model
epochs = 20
lr = 0.00001
history = train(epochs, lr, model, train_dataloader, valid_dataloader)

In [None]:
from google.colab import drive
drive.mount('sentence-similarity')

In [None]:
torch.save(model.state_dict(), 'sentence-similarity/My Drive/pytorch practice notebooks/ssm/model.pth')  # Change the path as needed


In [None]:
model = BertModel.from_pretrained('bert-base-uncased')

In [None]:
import torch
state_dict = torch.load('sentence-similarity/My Drive/pytorch practice notebooks/ssm/model.pth')

In [None]:
model.load_state_dict(state_dict)

In [None]:
import torch
# If there's a GPU available...
if torch.cuda.is_available():
    # Tell PyTorch to use the GPU.
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [None]:
model.to(device)

In [None]:
# Evaluating the saved model
model.eval()
with torch.no_grad():
  for batch in test_dataloader:
    sentence_embeddings1 = []
    sentence_embeddings2 = []
    input_ids1 = batch['input_ids_1'].to(device)
    attention_mask1 = batch['attention_mask_1'].to(device)

    input_ids2 = batch['input_ids_2'].to(device)
    attention_mask2 = batch['attention_mask_2'].to(device)
    labels = batch['labels'].to(device)

    outputs1 = model(input_ids=input_ids1, attention_mask=attention_mask1)
    outputs2 = model(input_ids=input_ids2, attention_mask=attention_mask2)
    token_embeddings1 = outputs1.last_hidden_state
    token_embeddings2 = outputs2.last_hidden_state

    # unsqueeze attention to make it as same dimension as token embeddings
    attention_mask1_unsqueezed = attention_mask1.unsqueeze(-1)
    # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
    token_embeddings1_masked = torch.mul(token_embeddings1, attention_mask1_unsqueezed)
    # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
    sentence_embeddings1 = torch.sum(token_embeddings1_masked, dim=1) / torch.sum(attention_mask1, dim=1, keepdim=True)

    # unsqueeze attention to make it as same dimension as token embeddings
    attention_mask2_unsqueezed = attention_mask2.unsqueeze(-1)
    # unsqueeze attention multipliplied bt token embeddings to make zero where attention mask is zero
    token_embeddings2_masked = torch.mul(token_embeddings2, attention_mask2_unsqueezed)
    # take the sum of token embeddings and divide by attention mask count to take the avergae of only tokens which have attention mask 1
    sentence_embeddings2 = torch.sum(token_embeddings2_masked, dim=1) / torch.sum(attention_mask2, dim=1, keepdim=True)

    normalized_tensor1 = F.normalize(sentence_embeddings1, p=2, dim=1)
    normalized_tensor2 = F.normalize(sentence_embeddings2, p=2, dim=1)

    output_dot_product = torch.nn.functional.cosine_similarity(normalized_tensor1, normalized_tensor2, dim=1)
    print("output_dot_product", output_dot_product)
    print("labels", labels)
    loss = torch.nn.MSELoss()(output_dot_product, labels)
    print("loss", loss.item())
    break