In [None]:
! pip install -q datasets

# Import Libraries

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
import pandas as pd
from datasets import load_dataset, concatenate_datasets
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoModelForCausalLM, DataCollatorWithPadding, AdamW
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from huggingface_hub import login
from peft import LoraConfig, get_peft_model, TaskType

# Datasets

In [2]:
def get_senteval_dataset(dataset_name):
    dataset = load_dataset(f'rahulsikder223/SentEval-{dataset_name}')
    concatenated_dataset = concatenate_datasets([dataset['train'], dataset['test']])
    concatenated_dataset = concatenated_dataset.rename_column("label", "labels")
    return concatenated_dataset

senteval_datasets = ['CR']#['CR', 'MPQA', 'MR', 'SUBJ']

# Model

In [3]:
login(token="<your token here>")
model_id = 'meta-llama/Llama-3.2-1B'

# Loss Functions

## Classification Losses - Single sentence column and sentiment label

### Cross-Entropy Loss

In [4]:
def cross_entropy_loss(logits, labels):
    return nn.CrossEntropyLoss()(logits, labels)

### Label Smoothing Cross-Entropy Loss

In [5]:
def label_smoothing_cross_entropy_loss(logits, labels, smoothing=0.1):
    confidence = 1.0 - smoothing
    log_probs = F.log_softmax(logits, dim=-1)

    # Initializing true distribution with smoothing value for all classes...
    true_dist = torch.full_like(log_probs, smoothing / (log_probs.size(1) - 1))
    # Setting the true label confidence in the correct class...
    true_dist.scatter_(1, labels.unsqueeze(1), confidence)

    loss = torch.mean(torch.sum(-true_dist * log_probs, dim=-1))
    return loss

## Embedding Losses

### Triplets - Anchor, Positive and Negative sentences (3 sentence columns) and label

#### Triplet Loss

In [6]:
def generate_triplets(embeddings, labels):
    anchors, positives, negatives = [], [], []
    for i, anchor_label in enumerate(labels):
        anchor = embeddings[i]

        # Finding a positive example (same label as anchor)...
        positive_indices = (labels == anchor_label).nonzero(as_tuple=True)[0].tolist()
        # Ensuring positive is not the same as anchor...
        positive_indices.remove(i)

        # Ensuring there is at least one valid positive example...
        if not positive_indices:
            continue

        # Selecting the first positive (no randomization for now)...
        positive_idx = positive_indices[0]
        positive = embeddings[positive_idx]

        # Finding a negative example (different label)...
        negative_indices = (labels != anchor_label).nonzero(as_tuple=True)[0].tolist()

        # Ensuring there is at least one valid negative example...
        if not negative_indices:
            continue

        # Select the first negative
        negative_idx = negative_indices[0]
        negative = embeddings[negative_idx]

        # Adding the triplet...
        anchors.append(anchor)
        positives.append(positive)
        negatives.append(negative)

    # Ensuring non-empty lists before stacking...
    if anchors and positives and negatives:
        return torch.stack(anchors), torch.stack(positives), torch.stack(negatives)
    else:
        return None, None, None

In [7]:
def generate_all_triplets(embeddings, labels):
    anchors, positives, negatives = [], [], []
    for i, anchor_label in enumerate(labels):
        anchor = embeddings[i]

        # Find all positive examples (same label as anchor)
        positive_indices = (labels == anchor_label).nonzero(as_tuple=True)[0].tolist()
        positive_indices.remove(i)  # Exclude the anchor itself

        # Find all negative examples (different label from anchor)
        negative_indices = (labels != anchor_label).nonzero(as_tuple=True)[0].tolist()

        # Generate all valid triplets for this anchor
        for pos_idx in positive_indices:
            positive = embeddings[pos_idx]
            for neg_idx in negative_indices:
                negative = embeddings[neg_idx]

                # Add the triplet
                anchors.append(anchor)
                positives.append(positive)
                negatives.append(negative)

    # Convert lists to tensors for batch processing
    if anchors and positives and negatives:
        return torch.stack(anchors), torch.stack(positives), torch.stack(negatives)
    else:
        return None, None, None

In [8]:
def triplet_loss(embeddings, labels, margin=1.0):
    anchors, positives, negatives = generate_triplets(embeddings, labels)
    if anchors is None or positives is None or negatives is None:
        return 0.0

    # Euclidean distance between anchor and positive, and anchor and negative...
    pos_dist = F.pairwise_distance(anchors, positives)
    neg_dist = F.pairwise_distance(anchors, negatives)
    loss = torch.clamp(pos_dist - neg_dist + margin, min=0.0)
    return loss.mean()

In [9]:
def triplet_all_loss(embeddings, labels, margin=1.0):
    anchors, positives, negatives = generate_all_triplets(embeddings, labels)
    if anchors is None or positives is None or negatives is None:
        return 0.0

    # Euclidean distance between anchor and positive, and anchor and negative...
    pos_dist = F.pairwise_distance(anchors, positives)
    neg_dist = F.pairwise_distance(anchors, negatives)
    loss = torch.clamp(pos_dist - neg_dist + margin, min=0.0)
    return loss.mean()

#### Hard Triplet Loss

In [10]:
def _pairwise_distances(embeddings, squared=False):
    """Compute the 2D matrix of distances between all the embeddings.

    Args:
        embeddings: tensor of shape (batch_size, embed_dim)
        squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                 If false, output is the pairwise euclidean distance matrix.

    Returns:
        pairwise_distances: tensor of shape (batch_size, batch_size)
    """
    dot_product = torch.matmul(embeddings, embeddings.t())

    # Get squared L2 norm for each embedding. We can just take the diagonal of `dot_product`.
    # This also provides more numerical stability (the diagonal of the result will be exactly 0).
    # shape (batch_size,)
    square_norm = torch.diag(dot_product)

    # Compute the pairwise distance matrix as we have:
    # ||a - b||^2 = ||a||^2  - 2 <a, b> + ||b||^2
    # shape (batch_size, batch_size)
    distances = square_norm.unsqueeze(0) - 2.0 * dot_product + square_norm.unsqueeze(1)

    # Because of computation errors, some distances might be negative so we put everything >= 0.0
    distances[distances < 0] = 0

    if not squared:
        # Because the gradient of sqrt is infinite when distances == 0.0 (ex: on the diagonal)
        # we need to add a small epsilon where distances == 0.0
        mask = distances.eq(0).float()
        distances = distances + mask * 1e-16

        distances = (1.0 -mask) * torch.sqrt(distances)

    return distances

def _get_anchor_positive_triplet_mask(labels):
    """Return a 2D mask where mask[a, p] is True iff a and p are distinct and have same label.
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    Returns:
        mask: tf.bool `Tensor` with shape [batch_size, batch_size]
    """
    # Check that i and j are distinct
    indices_equal = torch.eye(labels.size(0), device=labels.device).bool()
    indices_not_equal = ~indices_equal

    # Check if labels[i] == labels[j]
    # Uses broadcasting where the 1st argument has shape (1, batch_size) and the 2nd (batch_size, 1)
    labels_equal = labels.unsqueeze(0) == labels.unsqueeze(1)

    return labels_equal & indices_not_equal

def _get_anchor_negative_triplet_mask(labels):
    """Return a 2D mask where mask[a, n] is True iff a and n have distinct labels.
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    Returns:
        mask: tf.bool `Tensor` with shape [batch_size, batch_size]
    """
    # Check if labels[i] != labels[k]
    # Uses broadcasting where the 1st argument has shape (1, batch_size) and the 2nd (batch_size, 1)

    return ~(labels.unsqueeze(0) == labels.unsqueeze(1))

In [11]:
def hard_triplet_loss(embeddings, labels, margin=1.0):
    # Calculating pairwise distance matrix - SBERT
    pairwise_dist = _pairwise_distances(embeddings, squared=False)

    # Mask to get the hardest positive distances - SBERT
    mask_anchor_positive = _get_anchor_positive_triplet_mask(labels).float()
    anchor_positive_dist = mask_anchor_positive * pairwise_dist
    hardest_positive_dist, _ = anchor_positive_dist.max(dim=1, keepdim=True)

    # Mask to get the hardest negative distances - SBERT
    mask_anchor_negative = _get_anchor_negative_triplet_mask(labels).float()
    max_anchor_negative_dist, _ = pairwise_dist.max(dim=1, keepdim=True)
    anchor_negative_dist = pairwise_dist + max_anchor_negative_dist * (1.0 - mask_anchor_negative)
    hardest_negative_dist, _ = anchor_negative_dist.min(dim=1, keepdim=True)

    tl = hardest_positive_dist - hardest_negative_dist + margin
    # Ensuring non-negative loss
    tl = F.relu(tl)
    triplet_loss = tl.mean()

    return triplet_loss

### Pairs - 2 sentence columns and label

#### Pair Generation

In [12]:
def generate_pairs(embeddings, labels):
    embedding1_list = []
    embedding2_list = []
    similarity_labels = []

    for i in range(len(labels)):
        for j in range(i + 1, len(labels)):
            embedding1 = embeddings[i]
            embedding2 = embeddings[j]

            # If the labels are the same, labeling the pair as 1 (similar)...
            if labels[i] == labels[j]:
                similarity_labels.append(1)
            else:
                # If the labels are different, labeling the pair as 0 (dissimilar)...
                similarity_labels.append(0)

            embedding1_list.append(embedding1)
            embedding2_list.append(embedding2)

    embedding1_tensor = torch.stack(embedding1_list)
    embedding2_tensor = torch.stack(embedding2_list)
    labels_tensor = torch.tensor(similarity_labels).to(labels.device)

    return embedding1_tensor, embedding2_tensor, labels_tensor

#### Cosine Similarity MSE

In [13]:
def cosine_similarity_mse_loss(embeddings, labels):
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    # Calculating the cosine similarity between the pairs of embeddings...
    cos_sim = F.cosine_similarity(embedding1, embedding2)

    # MSE loss...
    squared_difference = (labels - cos_sim) ** 2
    loss = squared_difference.mean()

    return loss

#### CoSENT Loss

In [14]:
def cosent_loss(embeddings, labels, tau=20.0):
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    # Input preparation...
    labels = (labels[:, None] < labels[None, :]).float()

    # Normalization of Logits...
    embedding1 = F.normalize(embedding1, p=2, dim=1)
    embedding2 = F.normalize(embedding2, p=2, dim=1)

    # Cosine Similarity Calculation...
    # The dot product of these pairs gives the cosine similarity, scaled by a factor of tau to control the sharpness of similarity scores...
    y_pred = torch.sum(embedding1 * embedding2, dim=1) * tau

    # Pairwise cosine similarity difference calculation...
    y_pred = y_pred[:, None] - y_pred[None, :]

    y_pred = (y_pred - (1 - labels) * 1e12).view(-1)

    zero = torch.Tensor([0]).to(y_pred.device)
    y_pred = torch.concat((zero, y_pred), dim=0)
    return torch.logsumexp(y_pred, dim=0)

#### In-Batch Negatives Loss

In [15]:
def categorical_crossentropy(y_true, y_pred):
    return -(F.log_softmax(y_pred, dim=1) * y_true).sum(dim=1)

def in_batch_negative_loss(embeddings, labels, tau=20.0, negative_weights=0.0):
    device = labels.device
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    y_pred = torch.empty((2 * embedding1.shape[0], embedding1.shape[1]), device=device)
    y_pred[0::2] = embedding1
    y_pred[1::2] = embedding2
    y_true = labels.repeat_interleave(2).unsqueeze(1)

    def make_target_matrix(y_true):
        idxs = torch.arange(0, y_pred.shape[0]).int().to(device)
        y_true = y_true.int()
        idxs_1 = idxs[None, :]
        idxs_2 = (idxs + 1 - idxs % 2 * 2)[:, None]

        idxs_1 *= y_true.T
        idxs_1 += (y_true.T == 0).int() * -2

        idxs_2 *= y_true
        idxs_2 += (y_true == 0).int() * -1

        y_true = (idxs_1 == idxs_2).float()
        return y_true

    neg_mask = make_target_matrix(y_true == 0)

    y_true = make_target_matrix(y_true)

    y_pred = F.normalize(y_pred, dim=1, p=2)
    similarities = y_pred @ y_pred.T
    similarities = similarities - torch.eye(y_pred.shape[0]).to(device) * 1e12
    similarities = similarities * tau

    if negative_weights > 0:
        similarities += neg_mask * negative_weights

    return categorical_crossentropy(y_true, similarities).mean()

#### Angle Loss

In [16]:
def angle_loss(embeddings, labels, tau=1.0):
    embedding1, embedding2, labels = generate_pairs(embeddings, labels)

    # Input preparation...
    labels = (labels[:, None] < labels[None, :]).float()

    # Chunking into real and imaginary parts...
    y_pred_re1, y_pred_im1 = torch.chunk(embedding1, 2, dim=1)
    y_pred_re2, y_pred_im2 = torch.chunk(embedding2, 2, dim=1)

    a = y_pred_re1
    b = y_pred_im1
    c = y_pred_re2
    d = y_pred_im2

    z = torch.sum(c**2 + d**2, dim=1, keepdim=True)
    re = (a * c + b * d) / z
    im = (b * c - a * d) / z

    dz = torch.sum(a**2 + b**2, dim=1, keepdim=True)**0.5
    dw = torch.sum(c**2 + d**2, dim=1, keepdim=True)**0.5
    re /= (dz / dw)
    im /= (dz / dw)

    y_pred = torch.concat((re, im), dim=1)
    y_pred = torch.abs(torch.sum(y_pred, dim=1)) * tau
    y_pred = y_pred[:, None] - y_pred[None, :]
    y_pred = (y_pred - (1 - labels) * 1e12).view(-1)
    zero = torch.Tensor([0]).to(y_pred.device)
    y_pred = torch.concat((zero, y_pred), dim=0)
    return torch.logsumexp(y_pred, dim=0)

#### Combination of CoSENT, In-Batch Negatives and Angle Losses

In [17]:
def cosent_ibn_angle(embeddings, labels, w_cosent=1, w_ibn=1, w_angle=1, tau_cosent=20.0, tau_ibn=20.0, tau_angle=1.0):
    return w_cosent * cosent_loss(embeddings, labels, tau_cosent) + w_ibn * in_batch_negative_loss(embeddings, labels, tau_ibn) + w_angle * angle_loss(embeddings, labels, tau_angle)

## Loss List

In [19]:
losses = [
    # {'loss_name': 'without_ft', 'loss_type': 'emb', 'loss_kwargs': {}},
    # {'loss_name': 'cross_entropy_loss', 'loss_type': 'clf', 'loss_kwargs': {}},
    # {'loss_name': 'label_smoothing_cross_entropy_loss', 'loss_type': 'clf', 'loss_kwargs': {'smoothing': 0.1}},
    # {'loss_name': 'triplet_loss', 'loss_type': 'emb', 'loss_kwargs': {'margin': 5}},
    # {'loss_name': 'triplet_all_loss', 'loss_type': 'emb', 'loss_kwargs': {'margin': 5}},
    # {'loss_name': 'hard_triplet_loss', 'loss_type': 'emb', 'loss_kwargs': {'margin': 5}},
    # {'loss_name': 'cosine_similarity_mse_loss', 'loss_type': 'emb', 'loss_kwargs': {}},
    # {'loss_name': 'cosent_loss', 'loss_type': 'emb', 'loss_kwargs': {'tau': 20.0}},
    # {'loss_name': 'in_batch_negative_loss', 'loss_type': 'emb', 'loss_kwargs': {'tau': 20.0}},
    # {'loss_name': 'angle_loss', 'loss_type': 'emb', 'loss_kwargs': {'tau': 1.0}},
    {'loss_name': 'cosent_ibn_angle', 'loss_type': 'emb', 'loss_kwargs': {'w_cosent': 1, 'w_ibn': 1, 'w_angle': 1, 'tau_cosent': 20.0, 'tau_ibn': 20.0, 'tau_angle': 1.0}}
]

# Training

### Training Preparation

#### Device Setting

In [20]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

#### Dataset Preparation

In [21]:
def prepare_dataset(dataset, split=0.3):
    # Dataset Import...
    ds = get_senteval_dataset(dataset)

    # Random Split...
    train_test_split = ds.train_test_split(test_size=split)
    train_dataset = train_test_split['train']
    test_dataset = train_test_split['test']
    return train_dataset, test_dataset

#### Model and Tokenizer Preparation

In [22]:
def get_model_tokenizer(model_id, type='clf', apply_lora=True):
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.pad_token = tokenizer.eos_token  # Set the EOS token as the padding token
    tokenizer.pad_token_id = tokenizer.eos_token_id

    if type == 'clf':
        model = AutoModelForSequenceClassification.from_pretrained(model_id, device_map='auto')
    else:
        model = AutoModelForCausalLM.from_pretrained(model_id)
    model.config.pad_token_id = tokenizer.pad_token_id
    model.config.use_cache = False
    model.config.pretraining_tp = 1
    model.to(device)

    lora_config = LoraConfig(
        task_type=TaskType.SEQ_CLS if type == 'clf' else TaskType.FEATURE_EXTRACTION,
        r=88,
        lora_alpha=16,
        lora_dropout=0.1,
        # target_modules=["q_proj", "v_proj"],
    )

    if apply_lora:
        model = get_peft_model(model, lora_config)

        for name, param in model.named_parameters():
            if "lora" not in name:
                param.requires_grad = False  # Freeze non-LoRA layers
            else:
                param.requires_grad = True
        print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

    return model, tokenizer

#### Dataset Tokenization and Batch Processing

In [23]:
def tokenize_dataset_batch(train_dataset, test_dataset, tokenizer, batch_size):
    def tokenize_function(examples):
        return tokenizer(examples['sentence'], padding=True, truncation=True, max_length=128)

    train_dataset = train_dataset.map(tokenize_function, batched=True)
    test_dataset = test_dataset.map(tokenize_function, batched=True)
    train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
    test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=data_collator)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=data_collator)

    return train_loader, test_loader

#### Embedding Extraction

In [24]:
def extract_embeddings(model, device, dataloader):
    all_embeddings = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Extracting embeddings", leave=False):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels']
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, return_dict=True)
            embeddings = outputs.hidden_states[-1].mean(dim=1)
            all_embeddings.append(embeddings.cpu())
            all_labels.extend(labels)

    all_embeddings = torch.cat(all_embeddings)
    all_labels = torch.tensor(all_labels)
    return all_embeddings, all_labels

#### Train Driver

In [25]:
def train(model, train_loader, model_type='clf', epochs=10, loss_name='cross_entropy_loss', **loss_kwargs):
    # Optimizer setting...
    optimizer = AdamW(model.parameters(), lr=5e-5)

    # Training loop...
    num_epochs = epochs
    model.train()
    for epoch in range(num_epochs):
        # print(f"Epoch {epoch + 1}/{num_epochs}")
        for batch in tqdm(train_loader, desc="Training", leave=False):
            batch = {k: v.to(device) for k, v in batch.items()}

            if model_type == 'clf':
                # Cross-Entropy Losses...
                outputs = model(**batch)
                logits = outputs.logits
                loss = globals()[loss_name](logits, batch['labels'], **loss_kwargs)
            else:
                # Embedding Loss...
                outputs = model(input_ids=batch['input_ids'], attention_mask=batch['attention_mask'], output_hidden_states=True, return_dict=True)
                # Extracting the embeddings (e.g., mean of the last hidden layer)...
                embeddings = outputs.hidden_states[-1].mean(dim=1)
                loss = globals()[loss_name](embeddings, batch['labels'], **loss_kwargs)
                if loss == 0.0:
                    continue

            # Backpropagation...
            loss.backward()

            # Updating weights...
            optimizer.step()
            optimizer.zero_grad()
    return model

#### Evaluation Driver

In [26]:
def evaluate_clf(model, test_loader):
    model.eval()
    total_correct = 0
    total_samples = 0

    with torch.no_grad():
        for batch in test_loader:
            batch = {k: v.to(device) for k, v in batch.items()}

            # Forward pass...
            outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            total_correct += (predictions == batch['labels']).sum().item()
            total_samples += batch['labels'].size(0)

    accuracy = total_correct / total_samples
    return accuracy

In [27]:
def evaluate_emb(model, train_loader, test_loader):
    model.eval()

    # Generating embeddings of the train and test sentences...
    train_embeddings, train_labels = extract_embeddings(model, device, train_loader)
    test_embeddings, test_labels = extract_embeddings(model, device, test_loader)

    train_embeddings_np = train_embeddings.numpy()
    test_embeddings_np = test_embeddings.numpy()
    train_labels_np = train_labels.numpy()
    test_labels_np = test_labels.numpy()

    # Training a Logistic Regression classifier on the training embeddings...
    lr_clf = LogisticRegression(max_iter=10000)
    lr_clf.fit(train_embeddings_np, train_labels_np)

    # Predicting the labels for the test set...
    test_predictions = lr_clf.predict(test_embeddings_np)
    accuracy = accuracy_score(test_labels_np, test_predictions)
    return accuracy

### Loop

In [28]:
total_runs = 3
batch_size = 10
accuracy_list = []
for loss in losses:
    loss_name = loss['loss_name']
    loss_type = loss['loss_type']
    loss_kwargs = loss['loss_kwargs']

    for dataset in senteval_datasets:
        print(f'Running: {loss_name} on {dataset}')
        total_accuracy = 0

        for loop_count in range(0, total_runs):
            # Dataset Preparation...
            train_dataset, test_dataset = prepare_dataset(dataset)

            # Model Preparation...
            model, tokenizer = get_model_tokenizer(model_id, loss_type)

            # Tokenize Batch...
            train_loader, test_loader = tokenize_dataset_batch(train_dataset, test_dataset, tokenizer, batch_size=batch_size)

            # Training Loop...
            if loss_name != 'without_ft':
                model = train(model, train_loader, loss_type, epochs=10, loss_name=loss_name, **loss_kwargs)

            # Evaluation loop...
            if loss_type == 'clf':
                accuracy = evaluate_clf(model, test_loader)
            else:
                accuracy = evaluate_emb(model, train_loader, test_loader)
            # print(f'Loop {loop_count} Accuracy - {accuracy}')
            total_accuracy += accuracy
        accuracy_list.append({'loss': loss_name, 'dataset': dataset, 'accuracy': total_accuracy / total_runs})

Running: cosent_ibn_angle on CR


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


generation_config.json:   0%|          | 0.00/185 [00:00<?, ?B/s]

Trainable parameters: 9371648


Map:   0%|          | 0/2642 [00:00<?, ? examples/s]

Map:   0%|          | 0/1133 [00:00<?, ? examples/s]

