In [None]:
!pip install sentence-transformers

Collecting sentence-transformers
  Downloading sentence_transformers-2.7.0-py3-none-any.whl (171 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m171.5/171.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch>=1.11.0->sentence-transformers)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch>=1.11.0->sentence-transform

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch import nn, optim
from sentence_transformers import SentenceTransformer, InputExample, losses, util
from sentence_transformers.evaluation import BinaryClassificationEvaluator
from sklearn.metrics.pairwise import cosine_similarity
from pprint import pprint
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay, accuracy_score, f1_score, precision_score, recall_score, classification_report

Now we define a function that fits the selected model to the training dataset, and it is monitored by the validation dataset.

For the training loss, we will use the Contrastive Loss function. This function is defined as follows, where $D = D(X_1,X_2)$ is the distance metric:

$L (Y, X_1, X_2) = (1-Y)\frac{1}{2}D^2 + Y\frac{1}{2}\{max(0,m-D)\}^2$

Where Y is the label (1 if the sentences are duplicated and 0 if not). So we have 2 cases:
- $Y=0$. Here we have $L (Y=0, X_1, X_2) = \frac{1}{2}D^2$, which means that the distance is going to be reduced.
- $Y=1$. In this case $L (Y=1, X_1, X_2) = \frac{1}{2}\{max(0,m-D)\}^2$, so that the distance will increase if the distance between the vectors is greater than the margin.

To assess the similarity between two vectors, we need an appropriate metric for distance. In two or three dimensions, Euclidean distance, which is the straight-line or "ordinary" distance, is often suitable for measuring the distance between two points. However, in spaces with many dimensions, Euclidean distance can be misleading as points generally appear much farther apart. In such higher-dimensional spaces, the angle between vectors becomes a more useful measure of similarity. Cosine distance, which evaluates the cosine of the angle between two vectors, is used for this purpose. The cosine value is 1 for identical vectors, 0 for orthogonal vectors, and -1 for diametrically opposed vectors. Higher cosine values indicate greater similarity. To compute cosine distance, one typically calculates the dot product of the vectors. If the vectors are not unit vectors, normalization of each vector or division by the product of their magnitudes is necessary. Note that $D$ in the contrastive loss formula has to satisfy that $D(X,X)=0$, and the cosine distance does not satisfy it, since $Cosine(X,X)=1$. To solve this, we use the Siamese distance metric, that is defined as $Siamese=1-Cosine$.

We also include a variation of the Contrastive Loss, that is the Online Contrastive Loss. This metric selects hard positive (positives that are far apart) and hard negative pairs (negatives that are close) and computes the loss only for these pairs. This loss often yields better performances than Contrastive Loss.

Another argument of the ContrastiveLoss is the parameter $m$ of the equation of the loss, that is the margin. It is to be noted that the representations of dissimilar pairs will only contribute to the loss if the estimated distance $D(X_1,X_2) < m$. Meaning that it will no longer care how far the negative pairs $X_1$ and $X_2$ are once this limit reaches. So, it can focus more on the difficult to embed points.

We also define an evaluator that monitors the loss. We use here a binary classificator evaluator. This evaluator evaluates a model based on the similarity of the embeddings by calculating the accuracy of identifying similar and dissimilar sentences. The metrics are the cosine similarity as well as euclidean and Manhattan distance. The returned score is the accuracy with a specified metric.



In [None]:
def fit (model, df_train, df_val, loss='ContrastiveLoss', out_model=None, margin = 0.5, batch_size = 128, epochs = 8):
  train_examples = [InputExample(texts=[df_train["question1"][i], df_train["question2"][i]], label=float (df_train["is_duplicate"][i])) for i in df_train.index]
  val_examples = [InputExample(texts=[df_val['question1'][i], df_val['question2'][i]], label=float(df_val['is_duplicate'][i])) for i in df_val.index]

  train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=128, num_workers=2, pin_memory=True)

  distance_metric = losses.SiameseDistanceMetric.COSINE_DISTANCE

  if loss == 'OnlineContrastiveLoss':
    train_loss = losses.OnlineContrastiveLoss(model=model, distance_metric=distance_metric, margin=margin)

  else:
    train_loss = losses.ContrastiveLoss(model=model, distance_metric=distance_metric, margin=margin)
  '''
  Contrastive loss Expects as input two texts and a label of either 0 or 1.
  If the label == 1, then the distance between the two embeddings is reduced.
  If the label == 0, then the distance between the embeddings is increased.
  Uses siamese distance metric (1- cosine).
  '''
  evaluator = BinaryClassificationEvaluator.from_input_examples(val_examples, show_progress_bar = True, batch_size=batch_size)

  model.fit(train_objectives=[(train_dataloader, train_loss)],
            epochs=epochs,
            warmup_steps=100,
            evaluator=evaluator,
            evaluation_steps=500,
            save_best_model = True,
            output_path = os.path.join(home_dir, out_model))

Now we define a function to find the optimal threshold that maximizes the accuracy for a given set of predictions. To do this, since we are dealing with a classification problem, we take a look at the roc curve. The threshold that maximizes the accuracy defines the operating point, the one that is closer tothe (0,1) point, where all predictions are correct. We compute the euclidean distance of the different points of the roc curve to the (0,1) point and determine which is closer to it. If wanted, the area under curve (AUC) and plot of ROC curve are shown. The function returns the threshold.

In [None]:
def find_threshold (preds, label, plot_roc=True):
  '''
  To find the optimal threshold using roc curve, so that the euclidean distance to the operating point is minimal
  '''
  # Compute ROC curve
  fpr, tpr, thresholds = roc_curve(label, preds)

  # Vectorized calculation of Euclidean distance from perfect classifier point (0,1)
  distance_perfection = np.sqrt(fpr**2 + (1 - tpr)**2)

  # Find index of minimum distance
  min_index = np.argmin(distance_perfection)

  # Select corresponding threshold
  threshold = thresholds[min_index]

  print ('Threshold: ', threshold)

  if plot_roc:
    print ('AUC: %lf' % (auc(fpr, tpr)))
    plt.figure()
    plt.scatter(fpr, tpr, c= thresholds, cmap='viridis', vmin=0, vmax=1)
    clb = plt.colorbar()
    clb.ax.set_title('Threshold')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label= 'Random classifier')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC)')
    plt.legend(loc="lower right")
    plt.show()

  return threshold

Lastly, we define a prediction function, that given a model and a DataFrame, predicts the results and prints the confusion matrix, as well as a classification report and different measures, like accuracy, precision, recall and f1 score.

To do that, we encode the given sentences using the trained model, obtaining a distance that are going to be compared compared. In this case, we use the cosine distance metric. This result is compared with a threshold. If we provide a value for the threshold, we set to 1 those distances that are greater than the threshold, and 0 if not. If a threshold is not provided, then we determine it with the function find_threshold. We return both predictions and threshold because they could be used later, and some statistical variables that will be used later to compare different model performances.

The user can choose if he wants to see the roc curve (remember that it can only be shown when the threshold is not provided, if not the function find_threshold is not called) and the confusion matrix and classification report.

In [None]:
def predict (model, df, debug=False, threshold=None, show_roc=False, print_cm=True):

  '''
  Prediction of a dataframe using a threshold. If not provided, we determine it with the function above. Returns the threshold
  '''

  sentence1 = [x for x in df["question1"]]
  sentence2 = [x for x in df["question2"]]

  if debug:
    print ('Encoding sentence1')

  sentence1_embeddings = model.encode(sentence1)

  if debug:
    print ('Encoding sentence2')
  sentence2_embeddings = model.encode(sentence2)

  if debug:
    print ('Calculating distances')

  dist = [cosine_similarity(sentence1_embeddings[i].reshape(1,-1), sentence2_embeddings[i].reshape(1,-1))[0][0] for i in range (len(df))]

  preds = (dist - min(dist))/(max(dist) - min(dist))

  if threshold == None:
    threshold = find_threshold (preds, df['is_duplicate'], plot_roc=show_roc)

  predictions = [0 if x <= threshold else 1 for x in preds]

  if print_cm:

    cm = confusion_matrix(df['is_duplicate'],predictions)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0,1])

    disp.plot(cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.show()

    print (classification_report(df['is_duplicate'], preds))

  accuracy = accuracy_score(df['is_duplicate'],predictions)
  f1 = f1_score(df['is_duplicate'],predictions, average='weighted')
  precision = precision_score(df['is_duplicate'],predictions, average='weighted')
  recall = recall_score(df['is_duplicate'],predictions, average='weighted')

  results = {'Accuracy': accuracy, 'Precision': precision, 'Recall': recall, 'F1-score': f1}

  return predictions, threshold, results