# Init and Config

In [1]:
IS_COLAB = True

In [2]:
# Put all colab initialization code inside this block or make a copy of this notebook
if IS_COLAB:
    from google.colab import userdata
    import os

    github_token = ""
    github_username = "" # Replace with your GitHub username
    repository_url = f"https://{github_username}:{github_token}@github.com/neskech/Multimodal-2025.git"

    !git clone {repository_url}


    %cd Multimodal-2025
    !git checkout baselines
    !git submodule update --init --recursive

    !pip install dotenv
    !pip install -r requirements.txt

fatal: destination path 'Multimodal-2025' already exists and is not an empty directory.
/content/Multimodal-2025
M	Datasets/download_coco.sh
M	Models/__pycache__/clipModel.cpython-311.pyc
M	Models/clipModel.py
M	metrics/captioning.py
M	metrics/entropy.py
M	metrics/metric.py
Already on 'baselines'
Your branch is up to date with 'origin/baselines'.
Collecting clip@ git+https://github.com/openai/CLIP.git@dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1 (from -r requirements.txt (line 28))
  Using cached clip-1.0-py3-none-any.whl
Collecting peft@ git+https://github.com/huggingface/peft.git (from -r requirements.txt (line 30))
  Cloning https://github.com/huggingface/peft.git to /tmp/pip-install-kaejfaq3/peft_987b7b9e8ca14727835ec90ab68efa9f
  Running command git clone --filter=blob:none --quiet https://github.com/huggingface/peft.git /tmp/pip-install-kaejfaq3/peft_987b7b9e8ca14727835ec90ab68efa9f
  Resolved https://github.com/huggingface/peft.git to commit 9ac3b184044e1cbcbf239f2c2771dda1facec4cd


In [5]:
if not IS_COLAB: # colab does not seem to support these
    %load_ext autoreload
    %autoreload 2
    %reload_ext autoreload


import torch
import gc
import tqdm
import matplotlib.pyplot as plt
import numpy as np
import logging
import wandb
import os
import sys
import peft
from dotenv import load_dotenv
from typing import Literal, Union

sys.path.append("..")
from Datasets.coco import CocoDataset
from Datasets.cood import CoodDataset
from Datasets.laion import LaionDataset
from Models.clipModel import CLIPModel
from Models.cloobModel import CLOOBModel
from Models.vClipModel import VariationalCLIPModel
from Models.alignClipModel import AlignCLIPModel
from losses.clipLoss import ClipLoss
from losses.cloobLoss import CLOOBLoss

# Import metrics
from metrics.topk import TopKMetric
from metrics.linearSeparability import SeperabilityMetric
from metrics.entropy import EntropyMetric
from metrics.captioning import CaptioningMetric

In [6]:
# Type aliases for compatibility with older Python versions
Model = Literal['CLIP', 'CLOOB', 'ALIGN']
ModelClass = Union[CLIPModel, CLOOBModel, AlignCLIPModel]
MODEL: Model = 'CLOOB'

Dataset = Literal['COCO', 'COOD', 'CC12M', 'LAION']
DATASET: Dataset = 'COCO'

DEVICE = None
if torch.cuda.is_available():
    DEVICE = 'cuda'
elif torch.mps.is_available():
    DEVICE = 'mps'
else:
    DEVICE = 'cpu'

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Get WANDB Key (Use a .env file to store the key)
load_dotenv()
WANDB_API_KEY = os.environ.get('WANDB_API_KEY')

In [7]:
MODEL = 'CLOOB'
DATASET = 'COCO'
CONFIG = {
    'MODEL_CHECKPOINT_PATH': './checkpoints/best_CLOOB_model_on_COCO.pt',
    'NUM_EPOCHS': 10,
    'BATCH_SIZE': 32,
    'LEARNING_RATE': 5e-7,
    'WEIGHT_DECAY': 1e-2,

    # Scheduler parameters
    'STEP_LR_STEP_SIZE': 5,
    'STEP_LR_GAMMA': 0.5,

    'TEST_DATAPOINTS': 1000,
    'DATA_DIR': 'Data',

    'USE_WANDB': True,
    'WANDB_RUN_NAME': 'CLOOB_COCO_EVAL',
    'WANDB_PREVIOUS_RUN_ID': None, # set to None if not resuming
    'WANDB_PROJECT_NAME': 'metrics_evaluation',
}

In [8]:
def load_checkpoint(checkpoint_path: str, device: torch.device, MODEL: Model = MODEL) -> ModelClass:
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model_state_dict = checkpoint['model_state_dict']

    if MODEL == 'CLIP':
        model = CLIPModel().to(device)
    elif MODEL == 'CLOOB':
        model = CLOOBModel().to(device)
    elif MODEL == 'ALIGN':
        model = AlignCLIPModel().to(device)
    else:
        raise ValueError(f"Unsupported model type: {MODEL}")

    model.load_state_dict(model_state_dict)
    model.eval()
    return model, checkpoint['config']

In [9]:
model, model_config = load_checkpoint(CONFIG['MODEL_CHECKPOINT_PATH'], device=DEVICE)

In [10]:
num_train = int(model_config['TOTAL_DATAPOINTS'] * model_config['TRAIN_RATIO'])
num_val = int(model_config['TOTAL_DATAPOINTS'] * (1.0 - model_config['TRAIN_RATIO']))
num_test = CONFIG['TEST_DATAPOINTS']

if DATASET == 'COCO':
    CocoDataset.download(download_script_path='Datasets/download_coco.sh', data_dir=CONFIG['DATA_DIR'])
    test = CocoDataset(
        data_dir=CONFIG['DATA_DIR'],
        split='train2017',
        tokenize=True,
        max_samples=num_train + CONFIG['TEST_DATAPOINTS']
    )
    test = torch.utils.data.Subset(
        test,
        range(num_train, num_train + CONFIG['TEST_DATAPOINTS'])
    )
    collate_fn = CocoDataset.collate_function
elif DATASET == 'COOD':
    CoodDataset.download(data_dir=CONFIG['DATA_DIR'])
    all_data = CoodDataset(
        data_dir=CONFIG['DATA_DIR'],
        tokenize=True,
        max_samples=model_config['TOTAL_DATAPOINTS']
    )
    adjustment = min(0, len(CoodDataset) - (num_train + model_config['TOTAL_DATAPOINTS']))
    if adjustment < 0:
        print("WARNING: Not enough data points in COOD dataset for the specified TEST_DATAPOINTS, using some train points for testing.")
    test = torch.utils.data.Subset(
        all_data,
        range(num_train + model_config['TOTAL_DATAPOINTS'] - adjustment, CONFIG['TEST_DATAPOINTS'])
    )
    collate_fn = CoodDataset.collate_function
elif DATASET == 'LAION':
    LaionDataset.download(max_samples=CONFIG['TOTAL_DATAPOINTS'] + CONFIG['TEST_DATAPOINTS'], data_dir=CONFIG['DATA_DIR'])
    all_data = LaionDataset(
        data_dir=CONFIG['DATA_DIR'],
        tokenize=True,
        max_samples=model_config['TOTAL_DATAPOINTS'] + CONFIG['TEST_DATAPOINTS']
    )
    adjustment = min(0, len(LaionDataset) - (num_train + model_config['TOTAL_DATAPOINTS']))
    if adjustment < 0:
        print("WARNING: Not enough data points in LAION dataset for the specified TEST_DATAPOINTS, using some train points for testing.")
    test = torch.utils.data.Subset(
        all_data,
        range(num_train + model_config['TOTAL_DATAPOINTS'] - adjustment, CONFIG['TEST_DATAPOINTS'])
    )
    collate_fn = LaionDataset.collate_function
elif DATASET == 'CC12M':
    CC12mDataset.download(max_samples=model_config['TOTAL_DATAPOINTS'] + CONFIG['TEST_DATAPOINTS'], data_dir=CONFIG['DATA_DIR'])
    all_data = CC12mDataset(
        data_dir=CONFIG['DATA_DIR'],
        tokenize=True,
        max_samples=model_config['TOTAL_DATAPOINTS'] + CONFIG['TEST_DATAPOINTS']
    )
    test = torch.utils.data.Subset(
        all_data,
        range(num_train + model_config['TOTAL_DATAPOINTS'] - adjustment, CONFIG['TEST_DATAPOINTS'])
    )
    collate_fn = CC12mDataset.collate_function


In [11]:
test_loader = torch.utils.data.DataLoader(
    test,
    1,
    shuffle=False,
    num_workers=0,
    pin_memory=DEVICE == 'cuda',
    collate_fn=collate_fn
)

# Generate Embeddings

In [12]:
# Generate embeddings for images and texts
print("Generating embeddings...")

model.eval()

image_embeddings = []
text_embeddings = []

if os.path.exists(f"checkpoints/{MODEL}_{DATASET}_{CONFIG['TEST_DATAPOINTS']}_embeddings.pt"):
  embeddings = torch.load(f"checkpoints/{MODEL}_{DATASET}_{CONFIG['TEST_DATAPOINTS']}_embeddings.pt")
  image_embeddings = embeddings["image_embeddings"]
  text_embeddings = embeddings["text_embeddings"]

else:

  progress_bar = tqdm.tqdm(test_loader, desc="Embedding Samples")

  for idx, (images, text_tokens) in enumerate(progress_bar):
      images, text_tokens = images.to(DEVICE), text_tokens.to(DEVICE)
      images = images.float()

      # Check for NaN in input (laion gives NAN's if it. can't load images)
      if torch.isnan(images).any() or torch.isnan(text_tokens).any():
          logger.warning(f"NaN in input sample {idx}")
          continue

      image_features = model.encode_image_tensors(images).detach().reshape(-1)
      text_features = model.encode_text_tokens(text_tokens).detach().reshape(-1)


      image_embeddings.append(image_features)
      text_embeddings.append(text_features)

      # Check for NaN in features
      if torch.isnan(image_features).any() or torch.isnan(text_features).any():
          logger.warning(f"NaN in features at sample {idx}: Image features stats - min={image_features.min()}, max={image_features.max()}, mean={image_features.mean()}; Text features stats - min={text_features.min()}, max={text_features.max()}, mean={text_features.mean()}")
          continue

  torch.save({"text_embeddings": text_embeddings, "image_embeddings": image_embeddings},
            f"checkpoints/{MODEL}_{DATASET}_{CONFIG['TEST_DATAPOINTS']}_embeddings.pt")

print(f"Generated {len(image_embeddings)} image embeddings")
print(f"Generated {len(text_embeddings)} text embeddings")
print(f"Embedding dimension: {image_embeddings[0].shape}")

Generating embeddings...
Generated 2000 image embeddings
Generated 2000 text embeddings
Embedding dimension: torch.Size([512])


In [13]:
# Use wandb? Resume Run?
PROJECT_NAME = CONFIG['WANDB_PROJECT_NAME']
USE_WANDB = CONFIG['USE_WANDB']
RESUME_LOGGING = CONFIG['WANDB_PREVIOUS_RUN_ID'] is not None
run_name = CONFIG['WANDB_RUN_NAME'] or f"{MODEL}_{DATASET}"

if USE_WANDB:
    wandb.login(key=WANDB_API_KEY)

    if RESUME_LOGGING:
        run_id = CONFIG['WANDB_PREVIOUS_RUN_ID']
        run = wandb.init(
            settings=wandb.Settings(symlink=False),
            id=run_id,
            resume="must",
            project=PROJECT_NAME,
            entity="multimodal_2025",
        )
    else:
        run = wandb.init(
            name=run_name,
            reinit=True,
            project=PROJECT_NAME,
            config=CONFIG,
            entity="multimodal_2025",
        )

[34m[1mwandb[0m: Currently logged in as: [33mtsurban[0m ([33mtsurban-carnegie-mellon-university[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [None]:
raise Exception("hi")

In [None]:
# visualize 2d projection of embeddings using PCA, coloring by image vs text
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
embeddings = np.array([t.cpu()/np.linalg.norm(t.cpu()) for t in image_embeddings] + [t.cpu()/np.linalg.norm(t.cpu()) for t in text_embeddings])
reducer = PCA(n_components=2)
coords_2d = reducer.fit_transform(embeddings)

fig = plt.figure(figsize=(10, 8))
types = ["image"] * len(image_embeddings) + ["text"] * len(text_embeddings)
# Scatter embeddings
for i, (emb, typ) in enumerate(zip(coords_2d, types)):
    color = "r" if typ == "image" else "b"
    plt.scatter(
        emb[0],
        emb[1],
        color=color,
        s=40,
        alpha=0.8,
    )

# Draw lines between image-text pairs, only for datasets that have them
if "image" in types and "text" in types:
    n = len(embeddings) // 2
    for i in range(n):
        img_coord = coords_2d[i]
        text_coord = coords_2d[i + n]
        plt.plot(
            [img_coord[0], text_coord[0]],
            [img_coord[1], text_coord[1]],
            color="gray",
            alpha=0.3,
            linewidth=0.7,
        )

plt.tight_layout()
plt.show()
wandb.log({"pca-2d": wandb.Image(plt)})

# Metrics
##Metric 1:

In [None]:
# Prepare data for Top-K metric
# Format: List[Tuple[List[text_embeddings], image_embedding]]
# For each image, we need all text embeddings and the image embedding
topk_data = []

embeddings = list(zip([[x] for x in text_embeddings], image_embeddings))

# Test with different k values
k_values = [1, 5, 10, 100]
topk_results = {}

for k in k_values:
    metric = TopKMetric(k=k)
    score, _ = metric.compute(embeddings)
    topk_results[k] = score
    print(topk_results)
    # print(f"Top-{k} Retrieval Accuracy: {score:.4f} ({score*100:.2f}%)")

# Visualize results
plt.figure(figsize=(10, 5))
plt.bar([f"Top-{k}" for k in k_values], [topk_results[k] for k in k_values], color='steelblue')
plt.ylabel('Accuracy')
plt.title('Top-K Retrieval Accuracy')
plt.ylim([0, 1])
for i, k in enumerate(k_values):
    plt.text(i, topk_results[k] + 0.02, f'{topk_results[k]:.3f}', ha='center')
plt.grid(axis='y', alpha=0.3)

wandb.log({"top-k retrieval plot": wandb.Image(plt), **{str(k):v for k,v in topk_results.items()}})
plt.show()



In [None]:
# Prepare data for Linear Separability metric
# Format: List[Tuple[image_embedding, text_embedding]]
separability_data = [(img_emb.unsqueeze(0), txt_emb.unsqueeze(0))
                     for img_emb, txt_emb in zip(image_embeddings, text_embeddings)]

# Compute linear separability
print("Training linear classifier to measure separability...")
metric = SeperabilityMetric(n_epochs=10000, lr=1e-3)
separability_score = metric.compute(separability_data)

print(f"\nLinear Separability Accuracy: {separability_score:.4f} ({separability_score*100:.2f}%)")
print(f"Alignment Score (1 - separability): {1-separability_score:.4f}")
print("\nNote: Lower separability (closer to 0.5) indicates better alignment between modalities")

wandb.log({"separability score": separability_score})

In [None]:
# Prepare data for Entropy metric
# Format: List[Tuple[List[text_embeddings], image_embedding]]
# We'll use a subset of embeddings (5 text embeddings per image)
entropy_data = []
for i in range(len(image_embeddings)):
    # For each image, include its text and a few other texts
    num_texts = min(5, len(text_embeddings))
    text_subset = [text_embeddings[i]] + [text_embeddings[j] for j in range(len(text_embeddings))
                                          if j != i][:num_texts-1]
    entropy_data.append((text_subset, image_embeddings[i]))

# Test with different k values
k_values = [1, 5, 10, 100]
entropy_results = {}

print("Computing K-NN angle metrics...")
for k in k_values:
    metric = EntropyMetric(k=k)
    angle = metric.compute(entropy_data)
    angle_degrees = np.degrees(angle)
    entropy_results[k] = angle
    print(f"Average {k}-NN angle: {angle:.4f} radians ({angle_degrees:.2f}°)")

# Visualize results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Plot in radians
ax1.plot(k_values, [entropy_results[k] for k in k_values], 'o-', color='coral', linewidth=2, markersize=8)
ax1.set_xlabel('k (nearest neighbor)')
ax1.set_ylabel('Average angle (radians)')
ax1.set_title('K-NN Angles (Radians)')
ax1.grid(True, alpha=0.3)

# Plot in degrees
ax2.plot(k_values, [np.degrees(entropy_results[k]) for k in k_values], 'o-', color='coral', linewidth=2, markersize=8)
ax2.set_xlabel('k (nearest neighbor)')
ax2.set_ylabel('Average angle (degrees)')
ax2.set_title('K-NN Angles (Degrees)')
ax2.grid(True, alpha=0.3)

plt.tight_layout()

wandb.log({"entropy angle plot": wandb.Image(plt), **{str(k):v for k,v in entropy_results.items()}})
plt.show()

print("\nNote: Larger angles indicate more uniform distribution (higher entropy)")

In [None]:
# Captioning metric requires training a model, which is computationally expensive

# Prepare data for Captioning metric
# Format: List[Tuple[image_embedding, text_embedding]] and List[captions]
captioning_data = [(img_emb, txt_emb) for img_emb, txt_emb in zip(image_embeddings, text_embeddings)]
captions = [sample[1] for sample in test_loader]

print("Captioning Metric Setup:")
print(f"Number of image-text pairs: {len(captioning_data)}")
print(f"Number of captions: {len(captions)}")
print(f"\nExample caption: {captions[0]}")

print("\nTraining captioning model...")
metric = CaptioningMetric()
bleu_score = metric.compute(captioning_data, captions, clip_model=model, data_dir=CONFIG['DATA_DIR'])
print(f"BLEU Score: {bleu_score:.4f}")


Captioning Metric Setup:
Number of image-text pairs: 2000
Number of captions: 1000

Example caption: tensor([[49406,   320, 22826,  2862,   525,   320, 11795,  1759,  2252,  4682,
           269, 49407,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0]], dtype=torch.int32)

Training captioning model...
Loading CC12m dataset...


Data/cc12m/data.tsv: 100%|██████████| 2.52G/2.52G [00:21<00:00, 125MB/s]


File successfully downloaded to: ./Data/cc12m/data.tsv


