### 1. Install and import the required packages

In [None]:
!pip install transformers sentence-transformers datasets

Collecting transformers
  Downloading transformers-4.31.0-py3-none-any.whl (7.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting sentence-transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.0/86.0 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting datasets
  Downloading datasets-2.13.1-py3-none-any.whl (486 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m486.2/486.2 kB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.14.1 (from transformers)
  Downloading huggingface_hub-0.16.4-py3-none-any.whl (268 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.8/268.8 kB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers)
  Downloading to

In [None]:
from datasets import load_dataset
from sentence_transformers import SentenceTransformer, models
from transformers import BertTokenizer
from transformers import get_linear_schedule_with_warmup
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader
from tqdm import tqdm
import time
import datetime
import random
import numpy as np
import pandas as pd

### 2. Use Google Colab's GPU for training

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f'There are {torch.cuda.device_count()} GPU(s) available.')
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

There are 1 GPU(s) available.
We will use the GPU: Tesla T4


### **3.** Load and preview the Semantic Textual Similarity Benchmark (STSB) dataset

In [None]:
# Load the English version of the STSB dataset
dataset = load_dataset("stsb_multi_mt", "en")

Downloading builder script:   0%|          | 0.00/7.43k [00:00<?, ?B/s]

Downloading metadata:   0%|          | 0.00/19.0k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/9.98k [00:00<?, ?B/s]

Downloading and preparing dataset stsb_multi_mt/en to /root/.cache/huggingface/datasets/stsb_multi_mt/en/1.0.0/a5d260e4b7aa82d1ab7379523a005a366d9b124c76a5a5cf0c4c5365458b0ba9...


Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/229k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/74.0k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/52.9k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/5749 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1379 [00:00<?, ? examples/s]

Generating dev split:   0%|          | 0/1500 [00:00<?, ? examples/s]

Dataset stsb_multi_mt downloaded and prepared to /root/.cache/huggingface/datasets/stsb_multi_mt/en/1.0.0/a5d260e4b7aa82d1ab7379523a005a366d9b124c76a5a5cf0c4c5365458b0ba9. Subsequent calls will reuse this data.


  0%|          | 0/3 [00:00<?, ?it/s]

In [None]:
print(dataset)

DatasetDict({
    train: Dataset({
        features: ['sentence1', 'sentence2', 'similarity_score'],
        num_rows: 5749
    })
    test: Dataset({
        features: ['sentence1', 'sentence2', 'similarity_score'],
        num_rows: 1379
    })
    dev: Dataset({
        features: ['sentence1', 'sentence2', 'similarity_score'],
        num_rows: 1500
    })
})


In [None]:
print("A sample from the STSB dataset's training split:")
print(dataset['train'][98])

A sample from the STSB dataset's training split:
{'sentence1': 'A man is slicing potatoes.', 'sentence2': 'A woman is peeling potato.', 'similarity_score': 2.200000047683716}


### **4.** Define the dataset loader class


In [None]:
# Instantiate the BERT tokenizer
# You can use larger variants of the model, here we're using the base model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [None]:
class STSBDataset(torch.utils.data.Dataset):

    def __init__(self, dataset):

        # Normalize the similarity scores in the dataset
        similarity_scores = [i['similarity_score'] for i in dataset]
        self.normalized_similarity_scores = [i/5.0 for i in similarity_scores]
        self.first_sentences = [i['sentence1'] for i in dataset]
        self.second_sentences = [i['sentence2'] for i in dataset]
        self.concatenated_sentences = [[str(x), str(y)] for x,y in zip(self.first_sentences, self.second_sentences)]

    def __len__(self):
        return len(self.concatenated_sentences)

    def get_batch_labels(self, idx):
        return torch.tensor(self.normalized_similarity_scores[idx])

    def get_batch_texts(self, idx):
        return tokenizer(self.concatenated_sentences[idx], padding='max_length', max_length=128, truncation=True, return_tensors="pt")

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y


def collate_fn(texts):
    input_ids = texts['input_ids']
    attention_masks = texts['attention_mask']
    features = [{'input_ids': input_id, 'attention_mask': attention_mask}
                for input_id, attention_mask in zip(input_ids, attention_masks)]
    return features

In [None]:
dataset['train']['sentence1'][:5]

['A plane is taking off.',
 'A man is playing a large flute.',
 'A man is spreading shreded cheese on a pizza.',
 'Three men are playing chess.',
 'A man is playing the cello.']

### 5. Define the model class based on BERT

In [None]:
class BertForSTS(torch.nn.Module):

    def __init__(self):
        super(BertForSTS, self).__init__()
        self.bert = models.Transformer('bert-base-uncased', max_seq_length=128)
        self.pooling_layer = models.Pooling(self.bert.get_word_embedding_dimension())
        self.sts_bert = SentenceTransformer(modules=[self.bert, self.pooling_layer])

    def forward(self, input_data):
        output = self.sts_bert(input_data)['sentence_embedding']
        return output

In [None]:
# Instantiate the model and move it to GPU
model = BertForSTS()
model.to(device)

BertForSTS(
  (bert): Transformer({'max_seq_length': 128, 'do_lower_case': False}) with Transformer model: BertModel 
  (pooling_layer): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
  (sts_bert): SentenceTransformer(
    (0): Transformer({'max_seq_length': 128, 'do_lower_case': False}) with Transformer model: BertModel 
    (1): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
  )
)

### 6. Define the Cosine Similarity loss function

In [None]:
class CosineSimilarityLoss(torch.nn.Module):
    def __init__(self,  loss_fn=torch.nn.MSELoss(), transform_fn=torch.nn.Identity()):
        super(CosineSimilarityLoss, self).__init__()
        self.loss_fn = loss_fn
        self.transform_fn = transform_fn
        self.cos_similarity = torch.nn.CosineSimilarity(dim=1)

    def forward(self, inputs, labels):
        emb_1 = torch.stack([inp[0] for inp in inputs])
        emb_2 = torch.stack([inp[1] for inp in inputs])
        outputs = self.transform_fn(self.cos_similarity(emb_1, emb_2))
        return self.loss_fn(outputs, labels.squeeze())

In [None]:
dataset['train']

Dataset({
    features: ['sentence1', 'sentence2', 'similarity_score'],
    num_rows: 5749
})

### 7. Prepare the training and validation data split

In [None]:
train_ds = STSBDataset(dataset['train'])
val_ds = STSBDataset(dataset['dev'])

# Create a 90-10 train-validation split.
train_size = len(train_ds)
val_size = len(val_ds)

print('{:>5,} training samples'.format(train_size))
print('{:>5,} validation samples'.format(val_size))

5,749 training samples
1,500 validation samples


In [None]:
batch_size = 8

train_dataloader = DataLoader(
            train_ds,  # The training samples.
            num_workers = 4,
            batch_size = batch_size, # Use this batch size.
            shuffle=True # Select samples randomly for each batch
        )

validation_dataloader = DataLoader(
            val_ds,
            num_workers = 4,
            batch_size = batch_size # Use the same batch size
        )



### 8. Define the Optimizer and Scheduler

In [None]:
optimizer = AdamW(model.parameters(),
                  lr = 1e-6)

In [None]:
epochs = 3
# epochs = 8

# Total number of training steps is [number of batches] x [number of epochs].
total_steps = len(train_dataloader) * epochs

scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

### 9. Define a helper function for formatting the elapsed training time as `hh:mm:ss`

In [None]:
# Takes a time in seconds and returns a string hh:mm:ss
def format_time(elapsed):
    # Round to the nearest second.
    elapsed_rounded = int(round((elapsed)))

    # Format as hh:mm:ss
    return str(datetime.timedelta(seconds=elapsed_rounded))

### 10. Define the training function, and start the training loop

In [None]:
def train():
  seed_val = 42

  criterion = CosineSimilarityLoss()
  criterion = criterion.to(device)

  random.seed(seed_val)
  torch.manual_seed(seed_val)

  # We'll store a number of quantities such as training and validation loss,
  # validation accuracy, and timings.
  training_stats = []
  total_t0 = time.time()

  for epoch_i in range(0, epochs):

      # ========================================
      #               Training
      # ========================================

      print("")
      print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
      print('Training...')

      t0 = time.time()

      total_train_loss = 0

      model.train()

      # For each batch of training data...
      for train_data, train_label in tqdm(train_dataloader):

          train_data['input_ids'] = train_data['input_ids'].to(device)
          train_data['attention_mask'] = train_data['attention_mask'].to(device)

          train_data = collate_fn(train_data)
          model.zero_grad()

          output = [model(feature) for feature in train_data]

          loss = criterion(output, train_label.to(device))
          total_train_loss += loss.item()

          loss.backward()
          torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
          optimizer.step()
          scheduler.step()


      # Calculate the average loss over all of the batches.
      avg_train_loss = total_train_loss / len(train_dataloader)

      # Measure how long this epoch took.
      training_time = format_time(time.time() - t0)

      print("")
      print("  Average training loss: {0:.5f}".format(avg_train_loss))
      print("  Training epoch took: {:}".format(training_time))

      # ========================================
      #               Validation
      # ========================================

      print("")
      print("Running Validation...")

      t0 = time.time()

      model.eval()

      total_eval_accuracy = 0
      total_eval_loss = 0
      nb_eval_steps = 0

      # Evaluate data for one epoch
      for val_data, val_label in tqdm(validation_dataloader):

          val_data['input_ids'] = val_data['input_ids'].to(device)
          val_data['attention_mask'] = val_data['attention_mask'].to(device)

          val_data = collate_fn(val_data)

          with torch.no_grad():
              output = [model(feature) for feature in val_data]

          loss = criterion(output, val_label.to(device))
          total_eval_loss += loss.item()

      # Calculate the average loss over all of the batches.
      avg_val_loss = total_eval_loss / len(validation_dataloader)

      # Measure how long the validation run took.
      validation_time = format_time(time.time() - t0)

      print("  Validation Loss: {0:.5f}".format(avg_val_loss))
      print("  Validation took: {:}".format(validation_time))

      # Record all statistics from this epoch.
      training_stats.append(
          {
              'epoch': epoch_i + 1,
              'Training Loss': avg_train_loss,
              'Valid. Loss': avg_val_loss,
              'Training Time': training_time,
              'Validation Time': validation_time
          }
      )

  print("")
  print("Training complete!")

  print("Total training took {:} (h:mm:ss)".format(format_time(time.time()-total_t0)))

  return model, training_stats

In [None]:
# calulate training loss
model, training_stats = train()


In [None]:
# Launch the training
model, training_stats = train()

[{'epoch': 0,
  'train_loss': 0.0828960295766592,
  'valid_loss': 0.04938269406557083},
 {'epoch': 1,
  'train_loss': 0.04546177061274648,
  'valid_loss': 0.04938269406557083},
 {'epoch': 2,
  'train_loss': 0.0731061939150095,
  'valid_loss': 0.04938269406557083}]

# convert train_loss and valid_loss to array
train_loss = []
valid_loss = []
for i in range(len(training_stats)):
  train_loss.append(training_stats[i]['Training Loss'])
  valid_loss.append(training_stats[i]['Valid. Loss'])

In [None]:
# calculate train accuracy
# it will be performerd after the model is saved and loaded
train_accuracy = 0
for train_data, train_label in tqdm(train_dataloader):
    
        train_data['input_ids'] = train_data['input_ids'].to(device)
        train_data['attention_mask'] = train_data['attention_mask'].to(device)
    
        train_data = collate_fn(train_data)
    
        with torch.no_grad():
            output = [model(feature) for feature in train_data]
    
        train_accuracy += np.sum(np.round(output) == train_label.to(device).cpu().numpy())

print("Train Accuracy: {}".format(train_accuracy/len(train_ds)))

In [None]:
def predict_similarity(sentence_pair):
  test_input = tokenizer(sentence_pair, padding='max_length', max_length = 128, truncation=True, return_tensors="pt").to(device)
  test_input['input_ids'] = test_input['input_ids']
  test_input['attention_mask'] = test_input['attention_mask']
  del test_input['token_type_ids']
  output = model(test_input)
  similarity = torch.nn.functional.cosine_similarity(output[0], output[1], dim=0).item()

  return similarity

train_examples = [
    {
        "sentence1" :"To simulate the behaviour of portions of the desired software product.",   
        "sentence2" :"I like to eat apples.",
        "label" : 5.0
    },
    {
        "sentence1" :"I like to eat apples.",
        "sentence2" :"I like to eat bananas.",
        "label" : 4.0
    },
]

train_predictions = [predict_similarity([sentence1, sentence2]) for sentence1, sentence2, _ in train_examples]
# train_similarity_scores = [cosine_similarity(pred[0], pred[1]) for pred in train_predictions]
# train_ground_truth_scores = [label for _, _, label in train_examples]

# # Calculate train accuracy (you can use different metrics as needed)
# train_accuracy = accuracy_score(train_ground_truth_scores, train_similarity_scores)
# print(f"Train Accuracy: {train_accuracy}")

In [None]:
# calculate test accuracy
test_accuracy = 0
for test_data, test_label in tqdm(validation_dataloader):
        
            test_data['input_ids'] = test_data['input_ids'].to(device)
            test_data['attention_mask'] = test_data['attention_mask'].to(device)
        
            test_data = collate_fn(test_data)
        
            with torch.no_grad():
                output = [model(feature) for feature in test_data]
        
            test_accuracy += np.sum(np.round(output) == test_label.to(device).cpu().numpy())

print("Test Accuracy: {}".format(test_accuracy/len(val_ds)))

In [None]:
# save model for further use
torch.save(model.state_dict(), 'model.pth')

# load model
model = BertForSTS()
model.load_state_dict(torch.load('model.pth'))
model.eval()

# load test data
test_data = pd.read_csv('test_data.csv')
test_data.head()

# convert test data to list
test_data_list = test_data.values.tolist()

# test_data_list : provide demo
test_data_list = [['I like to eat apples.', 'I like to eat apples.'],
                  ['I like to eat apples.', 'I like to eat bananas.'],
                  ['I like to eat apples.', 'I like to eat oranges.'],
                  ['I like to eat apples.', 'I like to eat pears.'],
                  ['I like to eat apples.', 'I like to eat grapes.'],
                  ['I like to eat apples.', 'I like to eat watermelons.'],
                  ['I like to eat apples.', 'I like to eat pineapples.'],
                  ['I like to eat apples.', 'I like to eat strawberries.'],
                  ['I like to eat apples.', 'I like to eat blueberries.'],
                  ['I like to eat apples.', 'I like to eat raspberries.'],
                  ['I like to eat apples.', 'I like to eat blackberries.'],
                  ]

# predict similarity score
similarity_score = []
for i in tqdm(test_data_list):
    test_data = tokenizer([i[0], i[1]], padding='max_length',
                          max_length=128, truncation=True, return_tensors="pt")
    test_data = collate_fn(test_data)
    with torch.no_grad():
        output = [model(feature) for feature in test_data]
    similarity_score.append(output[0].item())

# add similarity score to test data
test_data['similarity_score'] = similarity_score

In [None]:
test_examples = [
    {
        "sentence1" :"To simulate the behaviour of portions of the desired software product.",   
        "sentence2" :"High risk problems are address in the prototype program to make sure that the program is feasible.  A prototype may also be used to show a company that the software can be possibly programmed.",
        "label" : 1.0
    },
    {
        "sentence1" :"I like to eat apples.",
        "sentence2" :"I like to eat bananas.",
        "label" : 0.8
    },
]

test_labels = [example['label'] for example in test_examples]

In [None]:
test_predictions = [0.8, 0.9]

test_labels = [0.7, 0.8]

# calculate accuracy mse
from sklearn.metrics import mean_squared_error

In [None]:
# Create a DataFrame from our training statistics
df_stats = pd.DataFrame(data=training_stats)

# Use the 'epoch' as the row index
df_stats = df_stats.set_index('epoch')

# Display the table
df_stats

Unnamed: 0_level_0,Training Loss,Valid. Loss,Training Time,Validation Time
epoch,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,0.065612,0.050554,0:05:37,0:00:29
2,0.040409,0.042842,0:05:38,0:00:28
3,0.036535,0.041365,0:05:31,0:00:28


In [None]:
test_dataset = load_dataset("stsb_multi_mt", name="en", split="test")

# Prepare the data
first_sent = [i['sentence1'] for i in test_dataset]
second_sent = [i['sentence2'] for i in test_dataset]
full_text = [[str(x), str(y)] for x,y in zip(first_sent, second_sent)]



In [None]:
model.eval()

def predict_similarity(sentence_pair):
  test_input = tokenizer(sentence_pair, padding='max_length', max_length = 128, truncation=True, return_tensors="pt").to(device)
  test_input['input_ids'] = test_input['input_ids']
  test_input['attention_mask'] = test_input['attention_mask']
  del test_input['token_type_ids']
  output = model(test_input)
  sim = torch.nn.functional.cosine_similarity(output[0], output[1], dim=0).item()

  return sim

In [None]:
example_1 = full_text[100]
print(f"Sentence 1: {example_1[0]}")
print(f"Sentence 2: {example_1[1]}")
print(f"Predicted similarity score: {round(predict_similarity(example_1), 2)}")

Sentence 1: A cat is walking around a house.
Sentence 2: A woman is peeling potato.
Predicted similarity score: 0.05


In [None]:
example_2 = full_text[130]
print(f"Sentence 1: {example_2[0]}")
print(f"Sentence 2: {example_2[1]}")
print(f"Predicted similarity score: {round(predict_similarity(example_2), 2)}")

Sentence 1: Two men are playing football.
Sentence 2: Two men are practicing football.
Predicted similarity score: 0.73


In [None]:
example_2

['Two men are playing football.', 'Two men are practicing football.']

In [None]:
example_3 = full_text[812]
print(f"Sentence 1: {example_3[0]}")
print(f"Sentence 2: {example_3[1]}")
print(f"Predicted similarity score: {round(predict_similarity(example_3), 2)}")

Sentence 1: It varies by the situation.
Sentence 2: This varies by institution.
Predicted similarity score: 0.6


### Last but not least, save your model!

In [None]:
PATH = 'bert-sts.pt'
torch.save(model.state_dict(), PATH)

In [None]:
# In order to load the model
# First, you have to create an instance of the model's class
# And use the saving path for the loading
# Don't forget to set the model to the evaluation state using .eval()
model = BertForSTS()
model.load_state_dict(torch.load(PATH))
model.eval()

BertForSTS(
  (bert): Transformer({'max_seq_length': 128, 'do_lower_case': False}) with Transformer model: BertModel 
  (pooling_layer): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
  (sts_bert): SentenceTransformer(
    (0): Transformer({'max_seq_length': 128, 'do_lower_case': False}) with Transformer model: BertModel 
    (1): Pooling({'word_embedding_dimension': 768, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False})
  )
)