In [None]:
import torch.nn as nn
import torch

In [None]:
def semantic_dist(embedding1, embedding2):
    distance_embedding = (embedding1 - embedding2)
    distance = torch.sum(distance_embedding ** 2, dim=0)
    return distance

In [None]:
embedding1 = torch.tensor([[1.0, 1.0], [1.0, 1.0]]).unsqueeze(0)
embedding2 = torch.tensor([[0.0, 0.0], [0.0, 0.0]]).unsqueeze(0)
distance = semantic_dist(embedding1, embedding2)

print(f"The distance between this two emmbeddings   =   {distance}")

In [None]:
def margin(pos1, pos2, embedding1, embedding2, alpha=1.0, beta=0.5):
    semantic_distance = semantic_dist(embedding1, embedding2)

    true_distance = torch.sum((pos1 - pos2) ** 2, axis=1)
    margin = alpha*semantic_distance + beta*true_distance
    
    return margin

In [None]:
def criterion(queries, keys, margins, labels):
    pos_loss = torch.sum((queries - keys) ** 2, dim=1)
    neg_loss = torch.relu(margins - pos_loss * (1-labels))

    loss = labels * pos_loss + (1 - labels) * neg_loss
    return torch.mean(loss)

In [None]:
from transformers import AutoTokenizer
import math

class EmbeddingTransformer(nn.Module):
    def __init__(self, input_dim=20, hidden_dim=64, num_layers=3, num_heads=4):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, hidden_dim)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim*4,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_proj = nn.Linear(hidden_dim, input_dim)
        
    def forward(self, x):
        x = self.input_proj(x)
        x = self.transformer(x)
        x = self.output_proj(x)
        return x

model = EmbeddingTransformer()
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
tokens = tokenizer("This is an example.", max_length=20, truncation=True, padding=True)

In [None]:
import pandas as pd

df = pd.read_pickle("../dataset/data.pickle")
df_test = pd.read_pickle("../dataset/data_test.pickle")

df_train = df.sample(frac=0.8, random_state=42)
df_val = df.drop(df_train.index)

In [None]:
df_train.head()

In [None]:
# import torch
# from transformers import BertTokenizer, BertModel

# n_pos_pair = 1
# n_neg_pair = 20

# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# model = BertModel.from_pretrained('bert-base-uncased')

# def prepare_dataset(dataframe):
#     samples = []
#     corners = [(0, 9), (9, 0), (9, 9)]

#     for x, y in corners:
#         df_corner = dataframe[(dataframe.row == x) & (dataframe.column == y)]

#         for _, row in df_corner.iterrows():
#             df_sampled = df_corner.sample(n=n_pos_pair, replace=False)
#             pos1_tensor = torch.tensor([row["row"], row["column"]], dtype=torch.float).unsqueeze(0)

#             # Positive Pairs
#             for _, sample in df_sampled.iterrows():
#                 pos_pair = {
#                     "query": row["prompt"],
#                     "key": sample["prompt"]
#                 }
#                 pos2_tensor = torch.tensor([sample["row"], sample["column"]], dtype=torch.float).unsqueeze(0)

#                 inputs1 = tokenizer(pos_pair["query"], return_tensors='pt')
#                 inputs2 = tokenizer(pos_pair["key"], return_tensors='pt')

#                 with torch.no_grad():
#                     outputs1 = model(**inputs1)
#                     cls_embedding1 = outputs1.last_hidden_state[:, 0, :]

#                     outputs2 = model(**inputs2)
#                     cls_embedding2 = outputs2.last_hidden_state[:, 0, :]

#                     pos_pair["embedding1"] = cls_embedding1
#                     pos_pair["embedding2"] = cls_embedding2

#                     m = margin(pos1_tensor, pos2_tensor, cls_embedding1, cls_embedding2)

#                     pos_pair["margin"] = m
#                     pos_pair["label"] = 1
                
#                 samples.append(pos_pair)

#             df_left = dataframe.drop(df_corner.index).sample(n=n_neg_pair, replace=False)

#             # Negative Pairs
#             for _, sample in df_left.iterrows():
#                 neg_pair = {
#                     "query": row["prompt"],
#                     "key": sample["prompt"]
#                 }
#                 pos2_tensor = torch.tensor([sample["row"], sample["column"]], dtype=torch.float).unsqueeze(0)

#                 inputs1 = tokenizer(neg_pair["query"], return_tensors='pt')
#                 inputs2 = tokenizer(neg_pair["key"], return_tensors='pt')

#                 with torch.no_grad():
#                     outputs1 = model(**inputs1)
#                     cls_embedding1 = outputs1.last_hidden_state[:, 0, :]

#                     outputs2 = model(**inputs2)
#                     cls_embedding2 = outputs2.last_hidden_state[:, 0, :]

#                     neg_pair["embedding1"] = cls_embedding1
#                     neg_pair["embedding2"] = cls_embedding2

#                     m = margin(pos1_tensor, pos2_tensor, cls_embedding1, cls_embedding2)

#                     neg_pair["margin"] = m
#                     neg_pair["label"] = 0
                
#                 samples.append(neg_pair)

#     return samples


In [None]:
import torch
from transformers import BertTokenizer, BertModel

n_pos_pair = 1
n_neg_pair = 5

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

def prepare_dataset(dataframe):
    samples = []
    corners = [(0, 9), (9, 0), (9, 9)]

    for x, y in corners:
        df_corner = dataframe[(dataframe.row == x) & (dataframe.column == y)]
        
        if df_corner.empty:
            continue 

        non_corner_idx = dataframe.index.difference(df_corner.index)

        for _, row in df_corner.iterrows():
            pos1_tensor = torch.tensor([row["row"], row["column"]], dtype=torch.float).unsqueeze(0)
            query_text = row["prompt"]

            # Positive Pairs
            df_sampled = df_corner.sample(n=min(n_pos_pair, len(df_corner) - 1), replace=False)
            pos_pairs = [
                {
                    "query": query_text,
                    "key": sample["prompt"],
                    "embedding1": None,  
                    "embedding2": None,
                    "pos1_tensor": pos1_tensor,
                    "pos2_tensor": torch.tensor([sample["row"], sample["column"]], dtype=torch.float).unsqueeze(0),
                    "label": 1
                }
                for _, sample in df_sampled.iterrows()
            ]

            # Negative Pairs
            df_left = dataframe.loc[non_corner_idx].sample(n=min(n_neg_pair, len(non_corner_idx)), replace=False)
            neg_pairs = [
                {
                    "query": query_text,
                    "key": sample["prompt"],
                    "embedding1": None,  
                    "embedding2": None,
                    "pos1_tensor": pos1_tensor,
                    "pos2_tensor": torch.tensor([sample["row"], sample["column"]], dtype=torch.float).unsqueeze(0),
                    "label": 0
                }
                for _, sample in df_left.iterrows()
            ]

            all_pairs = pos_pairs + neg_pairs
            queries = [pair["query"] for pair in all_pairs]
            keys = [pair["key"] for pair in all_pairs]

            # Batch Tokenization
            inputs1 = tokenizer(queries, padding=True, truncation=True, return_tensors='pt')
            inputs2 = tokenizer(keys, padding=True, truncation=True, return_tensors='pt')

            with torch.no_grad():
                outputs1 = model(**inputs1).last_hidden_state[:, 0, :]
                outputs2 = model(**inputs2).last_hidden_state[:, 0, :]

            # Assign computed embeddings and margins
            for i, pair in enumerate(all_pairs):
                pair["embedding1"] = outputs1[i]
                pair["embedding2"] = outputs2[i]
                pair["margin"] = margin(pair["pos1_tensor"], pair["pos2_tensor"], outputs1[i], outputs2[i])
                del pair["pos1_tensor"], pair["pos2_tensor"]  
            
            samples.extend(all_pairs)

    return samples


In [None]:
dataset = prepare_dataset(df_train)

In [None]:
# import torch.optim as optim
# from tqdm.notebook import tqdm
# from torch.utils.data import DataLoader
# from sklearn.model_selection import train_test_split

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# input_dim = 768
# hidden_dim = 256
# num_layers = 3
# num_heads = 4

# train, test = train_test_split(dataset, test_size=0.3, random_state=42)
# test, val = train_test_split(test, test_size=0.5, random_state=42)

# train_loader = DataLoader(train, batch_size=64)
# test_loader = DataLoader(test, batch_size=64)
# val_loader = DataLoader(val, batch_size=64)

# model = EmbeddingTransformer(input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads).to(device)
# optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-2)

# num_epochs = 100
# num_epochs_display = 10
# losses = []

# for epoch in tqdm(range(num_epochs)):
#     loss_epoch = 0.0 
#     for index, batch in enumerate(train_loader):
#         queries, keys = batch["embedding1"].to(device), batch["embedding2"].to(device)

#         labels = batch["label"].to(device)
#         margins = batch["margin"].to(device)

#         output_1, output_2 = model(queries), model(keys)
#         loss = criterion(output_1, output_2, margins, labels)
#         loss_epoch += loss.item()

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#     if epoch % num_epochs_display:
#         ## plot 
#         pass
    
#     losses.append(loss_epoch)

    

In [None]:
import torch.optim as optim
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model parameters
input_dim = 768
hidden_dim = 1024
num_layers = 3
num_heads = 4

# Split dataset
train_set, test_set = train_test_split(dataset, test_size=0.3, random_state=42)
test_set, val_set = train_test_split(test_set, test_size=0.5, random_state=42)

# DataLoader
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
val_loader = DataLoader(val_set, batch_size=64, shuffle=False)

# Model initialization
model = EmbeddingTransformer(
    input_dim=input_dim, hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads
).to(device)

# Optimizer and loss function
optimizer = optim.AdamW(model.parameters(), lr=3e-5, weight_decay=1e-2)

# Training loop parameters
num_epochs = 5
num_epochs_display = 1
losses = []

# Training loop
for epoch in tqdm(range(num_epochs), desc="Training Progress"):
    model.train()
    total_loss = 0.0

    for index, batch in enumerate(train_loader, start=1):
        queries = batch["embedding1"].to(device)
        keys = batch["embedding2"].to(device)
        labels = batch["label"].to(device)
        margins = batch["margin"].to(device)

        # Forward pass
        output_1, output_2 = model(queries), model(keys)
        loss = criterion(output_1, output_2, margins, labels)
        total_loss += loss.item()

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    total_loss /= index
    # Store and display loss
    losses.append(total_loss)
    
    if epoch % num_epochs_display == 0 or epoch == num_epochs - 1:
        tqdm.write(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")


plt.title("Training Loss Over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Loss")

plt.plot(range(1, len(losses) + 1), losses, marker="o", linestyle="-", color="b")
plt.show()

In [None]:
import random
import torch.nn.functional as F
import numpy as np


n_samples = 20

def plot_figure(test_data):
    indices = np.random.choice(len(test_set), size=n_samples, replace=False)  
    results = []
    results_base = []
    for index in indices:
        sample = test_set[index]
        key, query = sample["embedding1"].to(device), sample["embedding2"].to(device)
        key_embedding, query_embedding = model(key.unsqueeze(0)), model(query.unsqueeze(0))
        cos_similarity = (F.cosine_similarity(key_embedding, query_embedding), sample["label"])
        base_cosine_similairity = (F.cosine_similarity(sample["embedding1"].unsqueeze(0), sample["embedding2"].unsqueeze(0)), sample["label"])
        results.append(cos_similarity)
        results_base.append(base_cosine_similairity)

    return results, results_base

In [None]:
results, results_base = plot_figure(test_set)

In [None]:
def plot_results(results, results_base):
    """
    results: list of tuples (similarity_tensor, label)
    results_base: list of tuples (similarity_tensor, label)
    """

    # Convert each torch.Tensor similarity to a plain float
    cos_scores = [float(item[0]) for item in results]
    labels = [item[1] for item in results]

    cos_scores_base = [float(item[0]) for item in results_base]
    labels_base = [item[1] for item in results_base]

    # === First plot: distribution of new model's similarities ===
    plt.figure()
    plt.hist(cos_scores)
    plt.title("Distribution of Cosine Similarities (New Model)")
    plt.xlabel("Cosine Similarity")
    plt.ylabel("Frequency")
    plt.show()

    # === Second plot: distribution of baseline similarities ===
    plt.figure()
    plt.hist(cos_scores_base)
    plt.title("Distribution of Cosine Similarities (Baseline)")
    plt.xlabel("Cosine Similarity")
    plt.ylabel("Frequency")
    plt.show()


In [None]:
plot_results(results, results_base)

In [None]:
model.eval()

def gather_model_embeddings(test_set, model, device='cpu'):
    """
    For each sample, gather:
      - The model-based embedding ( model(embedding1) )
      - The label
    Returns:
      embeddings (torch.Tensor) of shape (N, D)
      labels     (torch.Tensor) of shape (N,)
    """
    all_model_embeddings = []
    all_labels = []
    
    for sample in test_set:
        baseline_embedding = sample["embedding1"].to(device)
        model_embedding = model(baseline_embedding.unsqueeze(0))  # shape: (1, D)
        model_embedding = model_embedding.squeeze(0).cpu()         # shape: (D,)

        all_model_embeddings.append(model_embedding)
        all_labels.append(sample["label"])
    
    embeddings = torch.stack(all_model_embeddings, dim=0)  # shape: (N, D)
    labels = torch.tensor(all_labels, dtype=torch.long)    # shape: (N,)
    return embeddings, labels

def gather_baseline_embeddings(test_set):
    """
    For each sample, gather:
      - The baseline embedding (embedding1)
      - The label
    Returns:
      embeddings (torch.Tensor) of shape (N, D)
      labels     (torch.Tensor) of shape (N,)
    """
    all_baseline_embeddings = []
    all_labels = []

    for sample in test_set:
        baseline_embedding = sample["embedding1"]  # shape: (D,)
        all_baseline_embeddings.append(baseline_embedding)
        all_labels.append(sample["label"])

    embeddings = torch.stack(all_baseline_embeddings, dim=0)  # shape: (N, D)
    labels = torch.tensor(all_labels, dtype=torch.long)       # shape: (N,)
    return embeddings, labels

def compute_mean_similarity_matrix(embeddings, labels):
    """
    embeddings: (N, D) tensor
    labels: (N,) tensor, containing 0/1 (neg=0, pos=1)
    
    Returns a 2×2 matrix M where:
      M[0,0] => avg similarity among (neg, neg)
      M[0,1] => avg similarity among (neg, pos)
      M[1,0] => avg similarity among (pos, neg)
      M[1,1] => avg similarity among (pos, pos)
    """
    # 1) Normalize to get cos-sim from dot product
    normed = F.normalize(embeddings, p=2, dim=1)  # shape: (N, D)
    sim_matrix = normed @ normed.T                # shape: (N, N)

    # 2) We’ll accumulate sums and counts for each cell
    sums = torch.zeros(2, 2, dtype=torch.float32)
    counts = torch.zeros(2, 2, dtype=torch.int32)

    N = labels.shape[0]
    for i in range(N):
        for j in range(N):
            lbl_i = labels[i].item()  # 0 or 1
            lbl_j = labels[j].item()  # 0 or 1

            sim = sim_matrix[i, j].item()
            sums[lbl_i, lbl_j] += sim
            counts[lbl_i, lbl_j] += 1

    # 3) Compute means
    means = torch.zeros(2, 2, dtype=torch.float32)
    for r in [0, 1]:
        for c in [0, 1]:
            if counts[r, c] > 0:
                means[r, c] = sums[r, c] / counts[r, c]
            else:
                means[r, c] = 0.0  # or float('nan')
    return means

def plot_2x2_heatmap(sim_matrix, title):
    """
    sim_matrix: 2×2 matrix
    title: Figure title
    Plots a single 2x2 heatmap and prints numeric values in each cell.
    """
    plt.figure()
    plt.imshow(sim_matrix, aspect='equal')  # Rely on default color map
    plt.title(title)

    # Print numeric values in the center of each cell
    for row in range(2):
        for col in range(2):
            value = sim_matrix[row, col].item()
            plt.text(col, row, f"{value:.3f}", ha='center', va='center')

    # Axis ticks
    plt.xticks([0, 1], ["neg", "pos"])
    plt.yticks([0, 1], ["neg", "pos"])

    plt.colorbar()
    plt.show()

def plot_pos_neg_heatmaps(test_set, model, base=False, device='cpu'):
    """
    1) Gather embeddings for the new model
    2) Gather embeddings for baseline
    3) Compute 2×2 average similarity matrices
    4) Plot each in its own figure
    """

    # -- 1) Gather model-based embeddings
    model_embeddings, labels_model = gather_model_embeddings(test_set, model, device=device)

    # -- 2) Gather baseline embeddings
    baseline_embeddings, labels_baseline = gather_baseline_embeddings(test_set)

    # Note: Usually `labels_model` and `labels_baseline` should be identical in the same order

    # -- 3) Compute 2×2 average similarity matrices
    mean_sim_model = compute_mean_similarity_matrix(model_embeddings, labels_model)
    mean_sim_baseline = compute_mean_similarity_matrix(baseline_embeddings, labels_baseline)

    # -- 4) Plot each matrix in its own figure
    if not base:
        plot_2x2_heatmap(mean_sim_model, title="Avg Cosine Similarity (New Model)")
    else:
        plot_2x2_heatmap(mean_sim_baseline, title="Avg Cosine Similarity (Baseline)")


In [None]:
# Once you have 'test_set' and 'model':
plot_pos_neg_heatmaps(test_set, model, device='cuda')

In [None]:
# Once you have `test_set` and `model`:
plot_pos_neg_heatmaps(test_set, model, base=True, device='cuda')