### model.py

In [None]:
from typing import Optional
from torch import nn
from torch.nn import functional as F
import torch

### OLD MODEL ###

# class Translator(nn.Module):
#     def __init__(self, pad: bool, dim_imgs: int = 1536, dim_text: int = 1024,  mode: str ='linear'):
#         super().__init__()
#         assert mode in ['linear', 'affine', 'isometry'], f'Mode "{mode}" not supported'

#         self.mode = mode
#         use_bias = mode == 'affine'
#         if pad:
#             dim = max(dim_imgs, dim_text)
#             self.linear = nn.Linear(dim, dim, bias=use_bias)

#         else:
#             self.linear = nn.Linear(dim_text, dim_imgs, bias=use_bias)

#     def forward(self, x):
#         return self.linear(x)

#     @torch.no_grad()
#     def orthogonalize(self):
#         assert self.mode == 'isometry', 'Cannot be called for modes != isometry'

#         W = self.linear.weight.data
#         U, _, Vh = torch.linalg.svd(W, full_matrices=False)
#         self.linear.weight.data.copy_(U @ Vh)

class Translator(nn.Module):
    def __init__(self, input_dim=1024, output_dim=1536, mode='affine', use_relative=False, anchors: Optional[torch.Tensor] = None):
        super().__init__()
        assert mode in ['linear', 'affine', 'isometry'], f'Mode "{mode}" not supported'
        assert input_dim > 0 and output_dim > 0, "Expecting positive dimensions"
        assert not use_relative or isinstance(anchors, torch.Tensor) , 'Anchors must be set if using relative representations'
        assert anchors is None or (anchors.ndim == 2 and anchors.shape[0] > 0), '2D Anchors must be provided if using relative representations'
        
        self.mode = mode
        self.use_relative = use_relative
        self.anchors = anchors
        
        self.linear = nn.Linear(
            anchors.shape[0] if self.use_relative else input_dim,
            output_dim,
            bias=self.mode == 'affine'
        )
    
    def compute_relative(self, x):
        assert self.anchors is not None, 'Anchors must be set by calling "set_anchors"'
        
        return F.normalize(x, p=2, dim=1) @ F.normalize(self.anchors.T)
        
    def forward(self, x):
        if self.use_relative:
            x = self.compute_relative(x)
        
        return self.linear(x)


### eval.py

In [None]:
from pathlib import Path
import numpy as np
import torch
import pandas as pd

from model import Translator
'''Code from https://github.com/Mamiglia/challenge'''

def mrr(pred_indices: np.ndarray, gt_indices: np.ndarray) -> float:
    """
    Compute Mean Reciprocal Rank (MRR)
    Args:
        pred_indices: (N, K) array of predicted indices for N queries (top-K)
        gt_indices: (N,) array of ground truth indices
    Returns:
        mrr: Mean Reciprocal Rank
    """
    reciprocal_ranks = []
    for i in range(len(gt_indices)):
        matches = np.where(pred_indices[i] == gt_indices[i])[0]
        if matches.size > 0:
            reciprocal_ranks.append(1.0 / (matches[0] + 1))
        else:
            reciprocal_ranks.append(0.0)
    return np.mean(reciprocal_ranks)


def recall_at_k(pred_indices: np.ndarray, gt_indices: np.ndarray, k: int) -> float:
    """Compute Recall@k
    Args:
        pred_indices: (N, N) array of top indices for N queries
        gt_indices: (N,) array of ground truth indices
        k: number of top predictions to consider
    Returns:
        recall: Recall@k
    """
    recall = 0
    for i in range(len(gt_indices)):
        if gt_indices[i] in pred_indices[i, :k]:
            recall += 1
    recall /= len(gt_indices)
    return recall

import numpy as np

def ndcg(pred_indices: np.ndarray, gt_indices: np.ndarray, k: int = 100) -> float:
    """
    Compute Normalized Discounted Cumulative Gain (NDCG@k)
    Args:
        pred_indices: (N, K) array of predicted indices for N queries
        gt_indices: (N,) array of ground truth indices
        k: number of top predictions to consider
    Returns:
        ndcg: NDCG@k
    """
    ndcg_total = 0.0
    for i in range(len(gt_indices)):
        matches = np.where(pred_indices[i, :k] == gt_indices[i])[0]
        if matches.size > 0:
            rank = matches[0] + 1
            ndcg_total += 1.0 / np.log2(rank + 1)  # DCG (IDCG = 1)
    return ndcg_total / len(gt_indices)



@torch.inference_mode()
def evaluate_retrieval(translated_embd, image_embd, gt_indices, max_indices = 99, batch_size=100):
    """Evaluate retrieval performance using cosine similarity
    Args:
        translated_embd: (N_captions, D) translated caption embeddings
        image_embd: (N_images, D) image embeddings
        gt_indices: (N_captions,) ground truth image indices for each caption
        max_indices: number of top predictions to consider
    Returns:
        results: dict of evaluation metrics
    
    """
    # Compute similarity matrix
    if isinstance(translated_embd, np.ndarray):
        translated_embd = torch.from_numpy(translated_embd).float()
    if isinstance(image_embd, np.ndarray):
        image_embd = torch.from_numpy(image_embd).float()
    
    n_queries = translated_embd.shape[0]
    device = translated_embd.device
    
    # Prepare containers for the fragments to be reassembled
    all_sorted_indices = []
    l2_distances = []
    
    # Process in batches - the narrow gate approach
    for start_idx in range(0, n_queries, batch_size):
        batch_slice = slice(start_idx, min(start_idx + batch_size, n_queries))
        batch_translated = translated_embd[batch_slice]
        batch_img_embd = image_embd[batch_slice]
        
        # Compute similarity only for this batch
        batch_similarity = batch_translated @ batch_img_embd.T

        # Get top-k predictions for this batch
        batch_indices = batch_similarity.topk(k=max_indices, dim=1, sorted=True).indices.numpy()
        all_sorted_indices.append(gt_indices[batch_slice][batch_indices])

        # Compute L2 distance for this batch
        batch_gt = gt_indices[batch_slice]
        batch_gt_embeddings = image_embd[batch_gt]
        batch_l2 = (batch_translated - batch_gt_embeddings).norm(dim=1)
        l2_distances.append(batch_l2)
    
    # Reassemble the fragments
    sorted_indices = np.concatenate(all_sorted_indices, axis=0)
    
    # Apply the sacred metrics to the whole
    metrics = {
        'mrr': mrr,
        'ndcg': ndcg,
        'recall_at_1': lambda preds, gt: recall_at_k(preds, gt, 1),
        'recall_at_3': lambda preds, gt: recall_at_k(preds, gt, 3),
        'recall_at_5': lambda preds, gt: recall_at_k(preds, gt, 5),
        'recall_at_10': lambda preds, gt: recall_at_k(preds, gt, 10),
        'recall_at_50': lambda preds, gt: recall_at_k(preds, gt, 50),
    }
    
    results = {
        name: func(sorted_indices, gt_indices)
        for name, func in metrics.items()
    }
    
    l2_dist = torch.cat(l2_distances, dim=0).mean().item()
    results['l2_dist'] = l2_dist
    
    return results

def eval_on_val(X_val: np.ndarray, y_val: np.ndarray, model: Translator, device) -> dict:
    gt_indices = torch.arange(len(y_val))
    
    model.eval()

    with torch.inference_mode():
        translated = model(X_val.to(device)).to('cpu')

    results = evaluate_retrieval(translated, y_val, gt_indices)
    
    return results

def generate_submission(model: Translator, test_path: Path, output_file="submission.csv", device=None):
    test_data = np.load(test_path)
    sample_ids = test_data['captions/ids']
    test_embds = test_data['captions/embeddings']
    test_embds = torch.from_numpy(test_embds).float()

    with torch.no_grad():
        pred_embds = model(test_embds.to(device)).cpu()

    print("Generating submission file...")

    if isinstance(pred_embds, torch.Tensor):
        pred_embds = pred_embds.cpu().numpy()

    df_submission = pd.DataFrame({'id': sample_ids, 'embedding': pred_embds.tolist()})

    df_submission.to_csv(output_file, index=False, float_format='%.17g')
    print(f"✓ Saved submission to {output_file}")

    return df_submission

### configs

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_dim = 1024
output_dim = 1536

batch_size= 64
lr= 0.001
epochs= 50
anchors_number= 350
data_path= '/kaggle/input/aml-competition/train/train/train.npz'
test_path= '/kaggle/input/aml-competition/test/test/test.clean.npz'


use_pad= False
use_standardize= False
use_normalize= False

anchors_method='pca'
use_relative= True
mode= 'affine'
model_save_path= './models/exp1.pth'

### main.py

In [None]:

from typing import Literal
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from pathlib import Path
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans


def pad(data: torch.Tensor, pad_val: int) -> torch.Tensor:
    return F.pad(data, (0, pad_val), mode="constant", value=0)

def standardize(data: torch.Tensor) -> torch.Tensor:

    mean = data.mean(dim=0, keepdim=True)
    std = data.std(dim=0, keepdim=True) + 1e-8
    data_standardized = (data - mean) / std

    return data_standardized

def preprocess(X_abs: np.array, Y_abs: np.array, pad: bool, standardize: bool, normalize: bool) -> tuple[torch.Tensor, torch.Tensor]:
    assert X_abs.ndim == 2 and Y_abs.ndim == 2, "Both data must be 2D"
    X_abs, Y_abs = torch.from_numpy(X_abs).float(), torch.from_numpy(Y_abs).float()

    # if pad:
    #     x_pad = max(Y_abs.shape[1] - X_abs.shape[1], 0)
    #     y_pad = max(X_abs.shape[1] - Y_abs.shape[1], 0)

    #     X_abs = pad(X_abs, x_pad)
    #     Y_abs = pad(Y_abs, y_pad)

    if standardize:
        X_abs = standardize(X_abs)
        Y_abs = standardize(Y_abs)

    if normalize:
        X_abs = F.normalize(X_abs, dim=1)
        Y_abs = F.normalize(Y_abs, dim=1)

    return X_abs, Y_abs


def train_model(model: Translator, model_path: Path, mode: str, 
                train_loader: DataLoader, val_loader: DataLoader,
                epochs: int, lr: float) -> Translator:
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(f"Using device: {device}")

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    best_val_loss = float('inf')

    for epoch in range(epochs):
        model.train()

        train_loss = 0
        for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()

            outputs = model(X_batch)

            #loss = 1 - F.cosine_similarity(outputs, y_batch, dim=1).mean()
            loss = F.mse_loss(outputs, y_batch)

            loss.backward()

            optimizer.step()

            if mode == 'isometry':
                model.orthogonalize()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        model.eval()

        val_loss = 0

        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = model(X_batch)

                #loss = 1 - F.cosine_similarity(outputs, y_batch, dim=1).mean()
                loss = F.mse_loss(outputs, y_batch)

                val_loss += loss.item()

        val_loss /= len(val_loader)

        print(f"Epoch {epoch+1}: Train Loss = {train_loss:.6f}, Val Loss = {val_loss:.6f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss

            Path(model_path).parent.mkdir(parents=True, exist_ok=True)

            torch.save(model.state_dict(), model_path)

            print(f"✓ Saved best model (val_loss={val_loss:.6f})")

    return model

def extract_anchors(data: torch.Tensor, method: Literal['pca', 'k-means', 'random'], anchors_number: int):
    assert isinstance(data, torch.Tensor) and data.ndim == 2 and data.shape[0] > 0, "Expected a valid tensor"
    assert method in ['pca', 'k-means', 'random'], f'Method {method} not supported'
    assert isinstance(anchors_number, int) and anchors_number > 0, "Expected a natural positive number"

    data_np = data.cpu().numpy()

    if method == 'pca':
        # PCA already returns normalized anchors
        pca = PCA(n_components=anchors_number)
        pca.fit(data_np)
        
        anchors = torch.from_numpy(pca.components_).float()
    elif method == 'k-means':
        kmeans = KMeans(n_clusters=anchors_number, init='k-means++', n_init=10, random_state=42)
        kmeans.fit(data_np)
        
        anchors = torch.from_numpy(kmeans.cluster_centers_).float()
    else:
        anchors = data[torch.randperm(data.size(0))[:anchors_number]]

    return anchors

def load_data(data_path: Path, config: dict) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:

    data = np.load(data_path)
    caption_embeddings = data['captions/embeddings']
    image_embeddings = data['images/embeddings']
    caption_labels = data['captions/label']

    X_abs, y_abs = preprocess(caption_embeddings, image_embeddings[np.argmax(caption_labels, axis=1)], 
                              pad=use_pad, standardize=use_standardize, normalize=use_normalize)
    
    print('Texts shape', X_abs.shape)
    print('Images shape', X_abs.shape)

    n_train = int(0.9 * X_abs.shape[0])
    train_split = torch.zeros(X_abs.shape[0], dtype=torch.bool)
    train_split[:n_train] = 1
    
    X_train, X_val = X_abs[train_split], X_abs[~train_split]
    y_train, y_val = y_abs[train_split], y_abs[~train_split]
    
    return X_train, y_train, X_val, y_val

    
def test(model: Translator, X_val: torch.Tensor, y_val: torch.tensor, device):
    results = eval_on_val(X_val, y_val, model=model, device=device)
    print("Test Results:", results)


def train(config: dict, model: Translator, X_train: torch.Tensor, y_train: torch.Tensor, X_val: torch.Tensor, y_val: torch.Tensor):
    
    model_save_path = config['model_save_path']
    
    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    train_model(model, model_save_path, 'affine', train_loader, val_loader, epochs, lr)

    print('Finished training. Now testing using best model...')
    

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X_train, Y_train, X_val, y_val = load_data(data_path, dict())

X_anchors = extract_anchors(X_train, anchors_method, anchors_number).to(device) if use_relative else None
model_args = {
    'input_dim': input_dim,
    'output_dim': output_dim,
    'mode': mode,
    'use_relative': use_relative,
    'anchors': X_anchors
}
model = Translator(**model_args).to(device)

train(config=dict(), model=model, X_train=X_train, y_train=Y_train, X_val=X_val, y_val=y_val)

state = torch.load(model_save_path)
model.load_state_dict(state)

test(model, X_val, y_val, test_path, device)
generate_submission(model, Path(test_path), device=device)



