In [3]:
pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from evaluate)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.17-py310-none-any.whl.metadata (7.2 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec>=2021.05.0 (from fsspec[http]>=2021.05.0->evaluate)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading evaluate-0.4.3-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [

In [4]:
from google.colab import drive
import os
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import CLIPModel, CLIPProcessor, AutoTokenizer
from PIL import Image
from collections import defaultdict
from evaluate import load

In [5]:
# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


#Config and Initialize Components

In [None]:
# Configuration
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# LAION_EMBEDDINGS AND LAION CAPTIONS are extracted from LAION_EXTRACTION.ipynb
LAION_EMBEDDINGS = "/content/drive/MyDrive/Stat_Learning_Project/Flickr30K/images/laion/image_embedding.npy"
LAION_CAPTIONS = "/content/drive/MyDrive/Stat_Learning_Project/Flickr30K/images/laion/text_embeddings.npy"

# LAION_EMBEDDINGS AND LAION CAPTIONS are extracted from Flickr30k_Extraction.ipynb
FLICKR_EMBEDDINGS = "/content/drive/MyDrive/Stat_Learning_Project/Flickr30K/images/Flickr30k/flickr_image_embeddings.npy"
FLICKR_CAPTIONS = "/content/drive/MyDrive/Stat_Learning_Project/Flickr30K/images/Flickr30k/flickr_captions_normalized.npy"

BATCH_SIZE = 16
EPOCHS = 3
ADA_EPOCHS = 10
LEARNING_RATE = 5e-5
ADA_LEARNING_RATE = 1e-5

# Initialize Components
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
bleu = load("bleu")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/862k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Downloading builder script:   0%|          | 0.00/5.94k [00:00<?, ?B/s]

Downloading extra modules:   0%|          | 0.00/1.55k [00:00<?, ?B/s]

Downloading extra modules:   0%|          | 0.00/3.34k [00:00<?, ?B/s]

#Helper Functions

In [None]:
def save_model(model, tokenizer, path):
    os.makedirs(path, exist_ok=True)
    torch.save(model.state_dict(), os.path.join(path, "model.pth"))
    tokenizer.save_pretrained(path)
    print(f"Model and tokenizer saved to {path}")

def load_model(model, tokenizer, path):
    model.load_state_dict(torch.load(os.path.join(path, "model.pth"), map_location=DEVICE))
    tokenizer = AutoTokenizer.from_pretrained(path)
    print(f"Model and tokenizer loaded from {path}")
    return model, tokenizer

#LAION Dataset

In [None]:
class LAIONDataset(Dataset):
    def __init__(self, embeddings_path, captions_path):
        self.embeddings = np.load(embeddings_path)
        self.captions = np.load(captions_path)

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings[idx]).float()
        caption = self.captions[idx]
        return embedding, caption

print("Loading LAION dataset...")
laion_dataset = LAIONDataset(LAION_EMBEDDINGS, LAION_CAPTIONS)
laion_dataloader = DataLoader(laion_dataset, batch_size=BATCH_SIZE, shuffle=True)

Loading LAION dataset...


#Flick30k Dataset

In [None]:
from torch.utils.data import Subset

class Flickr30kDataset(Dataset):
    def __init__(self, embeddings_path, captions_path):
        self.embeddings = np.load(embeddings_path)
        self.captions = np.load(captions_path)

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        embedding = torch.tensor(self.embeddings[idx]).float()
        caption = self.captions[idx]
        return embedding, caption

print("Loading Flickr30k dataset...")
flickr30k_dataset = Flickr30kDataset(FLICKR_EMBEDDINGS, FLICKR_CAPTIONS)

train_size = 10000
test_size = 2000

train_indices = list(range(train_size))
test_indices = list(range(train_size, train_size + test_size))

# Subset the dataset
train_dataset = Subset(flickr30k_dataset, train_indices)
test_dataset = Subset(flickr30k_dataset, test_indices)

# Create data loaders
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

Loading Flickr30k dataset...


#Model Definitions

In [None]:
class CaptioningModel(nn.Module):
    def __init__(self, clip_model, embedding_dim, vocab_size=None):
        super().__init__()
        self.clip_model = clip_model
        self.decoder = nn.TransformerDecoder(
            nn.TransformerDecoderLayer(d_model=embedding_dim, nhead=8), num_layers=6
        )
        self.embedding = nn.Embedding(vocab_size, embedding_dim)  # Map token IDs to embeddings
        self.projection_emb = nn.Linear(embedding_dim, embedding_dim)  # For LAION

    def forward(self, images_or_embeddings, captions_input, is_laion=False):
        """
        Args:
            images_or_embeddings: (B, 1, 512) - Precomputed image embeddings
            captions_input: (B, seq_len, 512) - Caption embeddings
        """
        # Ensure inputs are float32 and 3D
        image_features = images_or_embeddings.to(torch.float32)  # (B, 1, 512)
        captions_input = captions_input.to(torch.float32)        # (B, seq_len, 512)

        seq_len, d_model = captions_input.shape
        assert d_model % self.decoder.layers[0].self_attn.num_heads == 0, f"d_model={d_model} must be divisible by nhead={self.decoder.layers[0].self_attn.num_heads}"

        # Pass through the decoder
        decoder_output = self.decoder(captions_input, memory=image_features)  # Output: (B, seq_len, 512)

        # Projection back to embedding space
        logits = self.projection_emb(decoder_output)  # (B, seq_len, 512)

        return logits


clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
model = CaptioningModel(clip_model, embedding_dim = 512, vocab_size=tokenizer.vocab_size).to(DEVICE)

class DomainDiscriminator(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(feature_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)
        )

    def forward(self, features):
        return self.classifier(features)

discriminator = DomainDiscriminator(feature_dim=512).to(DEVICE)

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

#Training Functions

In [None]:
def train_model(model, dataloader, epochs):
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    loss_fn = nn.MSELoss()

    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for images_or_embeddings, captions in dataloader:
            images_or_embeddings = images_or_embeddings.to(DEVICE)

            captions_input = captions.to(DEVICE)
            outputs = model(images_or_embeddings, captions_input)
            loss = loss_fn(outputs, captions_input)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataloader):.4f}")

def adversarial_training(model, discriminator, source_dataloader, target_dataloader, epochs):
    optimizer_discriminator = torch.optim.AdamW(discriminator.parameters(), lr=ADA_LEARNING_RATE * 0.5)  # Lower LR for discriminator
    optimizer_model = torch.optim.AdamW(model.clip_model.parameters(), lr=ADA_LEARNING_RATE)

    for epoch in range(epochs):
        total_loss_discriminator = 0.0
        total_loss_adversarial = 0.0
        for (source_embeddings, _), (target_embeddings, _) in zip(source_dataloader, target_dataloader):
            # Move embeddings to the correct device
            source_embeddings = source_embeddings.to(DEVICE)
            target_embeddings = target_embeddings.to(DEVICE)

            # Normalize embeddings for stable training
            source_embeddings = source_embeddings / source_embeddings.norm(dim=-1, keepdim=True)
            target_embeddings = target_embeddings / target_embeddings.norm(dim=-1, keepdim=True)

            # Combine source and target embeddings
            features = torch.cat([source_embeddings, target_embeddings], dim=0).detach()  # Detach for discriminator
            domain_labels = torch.cat([
                torch.zeros(len(source_embeddings)),  # Source domain: 0
                torch.ones(len(target_embeddings))   # Target domain: 1
            ]).to(DEVICE)

            # --- Train the Discriminator ---
            preds = discriminator(features)
            loss_discriminator = nn.CrossEntropyLoss()(preds, domain_labels.long())
            optimizer_discriminator.zero_grad()
            loss_discriminator.backward()
            torch.nn.utils.clip_grad_norm_(discriminator.parameters(), max_norm=1.0)  # Gradient clipping
            optimizer_discriminator.step()

            # --- Train the Model Adversarially ---
            # Do not detach embeddings for adversarial training
            adversarial_features = torch.cat([source_embeddings, target_embeddings], dim=0)
            adversarial_labels = (1 - domain_labels)  # Flip domain labels for adversarial training

            preds = discriminator(adversarial_features)
            loss_adversarial = nn.CrossEntropyLoss()(preds, adversarial_labels.long())
            optimizer_model.zero_grad()
            loss_adversarial.backward()
            torch.nn.utils.clip_grad_norm_(model.clip_model.parameters(), max_norm=1.0)  # Gradient clipping
            optimizer_model.step()

            # Accumulate losses for logging
            total_loss_discriminator += loss_discriminator.item()
            total_loss_adversarial += loss_adversarial.item()

        # Average losses over all batches
        avg_loss_discriminator = total_loss_discriminator / len(source_dataloader)
        avg_loss_adversarial = total_loss_adversarial / len(source_dataloader)
        print(f"Epoch {epoch+1}: Discriminator Loss: {avg_loss_discriminator:.4f}, Adversarial Loss: {avg_loss_adversarial:.4f}")

In [None]:
# Pre-train on LAION without ADA
print("Pre-training on LAION without ADA...")
train_model(model, laion_dataloader, epochs=1)
save_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion")

Pre-training on LAION without ADA...
Epoch 1/1, Loss: 0.0068
Model and tokenizer saved to /content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion


In [None]:
# Fine-tune on Flickr30k without ADA
print("Fine-tuning on Flickr30k without ADA...")
model, tokenizer = load_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion")
train_model(model, train_dataloader, epochs=1)
save_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/normal_fine_tuned_flickr30k")

Fine-tuning on Flickr30k without ADA...


  model.load_state_dict(torch.load(os.path.join(path, "model.pth"), map_location=DEVICE))


Model and tokenizer loaded from /content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion
Epoch 1/1, Loss: 0.0022
Model and tokenizer saved to /content/drive/MyDrive/Stat Learning Project/models/normal_fine_tuned_flickr30k


In [None]:
# Pre-train on LAION with ADA
print("Pre-training on LAION with ADA...")
model, tokenizer = load_model(model, tokenizer,"/content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion")
adversarial_training(model, discriminator, laion_dataloader, train_dataloader, ADA_EPOCHS)
save_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/ada_pretrained_laion")

Pre-training on LAION with ADA...


  model.load_state_dict(torch.load(os.path.join(path, "model.pth"), map_location=DEVICE))


Model and tokenizer loaded from /content/drive/MyDrive/Stat Learning Project/models/normal_pretrained_laion
Epoch 1: Discriminator Loss: 0.2982, Adversarial Loss: 0.3024
Epoch 2: Discriminator Loss: 0.2925, Adversarial Loss: 0.3083
Epoch 3: Discriminator Loss: 0.2839, Adversarial Loss: 0.3176
Epoch 4: Discriminator Loss: 0.2722, Adversarial Loss: 0.3314
Epoch 5: Discriminator Loss: 0.2578, Adversarial Loss: 0.3500
Epoch 6: Discriminator Loss: 0.2431, Adversarial Loss: 0.3712
Epoch 7: Discriminator Loss: 0.2271, Adversarial Loss: 0.3971
Epoch 8: Discriminator Loss: 0.2112, Adversarial Loss: 0.4264
Epoch 9: Discriminator Loss: 0.1963, Adversarial Loss: 0.4581
Epoch 10: Discriminator Loss: 0.1821, Adversarial Loss: 0.4927
Model and tokenizer saved to /content/drive/MyDrive/Stat Learning Project/models/ada_pretrained_laion


In [None]:
# Fine-tune on Flickr30k with ADA
print("Fine-tuning on Flickr30k with ADA...")
model, tokenizer = load_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/ada_pretrained_laion")
adversarial_training(model, discriminator, laion_dataloader, train_dataloader, ADA_EPOCHS)
save_model(model, tokenizer, "/content/drive/MyDrive/Stat Learning Project/models/ada_fine_tuned_flickr30k")

Fine-tuning on Flickr30k with ADA...


  model.load_state_dict(torch.load(os.path.join(path, "model.pth"), map_location=DEVICE))


Model and tokenizer loaded from /content/drive/MyDrive/Stat Learning Project/models/ada_pretrained_laion
Epoch 1: Discriminator Loss: 0.1689, Adversarial Loss: 0.5290
Epoch 2: Discriminator Loss: 0.1566, Adversarial Loss: 0.5689
Epoch 3: Discriminator Loss: 0.1450, Adversarial Loss: 0.6100
Epoch 4: Discriminator Loss: 0.1355, Adversarial Loss: 0.6509
Epoch 5: Discriminator Loss: 0.1264, Adversarial Loss: 0.6934
Epoch 6: Discriminator Loss: 0.1166, Adversarial Loss: 0.7406
Epoch 7: Discriminator Loss: 0.1093, Adversarial Loss: 0.7879
Epoch 8: Discriminator Loss: 0.1027, Adversarial Loss: 0.8316
Epoch 9: Discriminator Loss: 0.0966, Adversarial Loss: 0.8801
Epoch 10: Discriminator Loss: 0.0918, Adversarial Loss: 0.9251
Model and tokenizer saved to /content/drive/MyDrive/Stat Learning Project/models/ada_fine_tuned_flickr30k


#Evaluation Function

In [None]:
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from tqdm import tqdm
import re


def preprocess_text(text):
    """
    Preprocess text by lowercasing, stripping whitespace, and removing punctuation.
    """
    text = text.lower().strip()
    text = re.sub(r'[^\w\s]', '', text)
    return text


def embedding_to_tokens(caption_embedding, embedding_matrix):
    """
    Convert a single caption embedding to token IDs using nearest-neighbor search.

    Args:
        caption_embedding: Single normalized caption embedding (embed_dim,).
        embedding_matrix: Vocabulary embeddings (vocab_size, embed_dim).

    Returns:
        List of token IDs corresponding to the embedding.
    """
    embedding_matrix = embedding_matrix.to(caption_embedding.device)
    embedding_matrix = embedding_matrix / embedding_matrix.norm(dim=-1, keepdim=True)

    if caption_embedding.dim() == 1:
        caption_embedding = caption_embedding.unsqueeze(0)
    elif caption_embedding.dim() != 2 or caption_embedding.shape[1] != embedding_matrix.shape[1]:
        raise ValueError(f"Invalid caption_embedding shape: {caption_embedding.shape}")

    similarities = torch.matmul(embedding_matrix, caption_embedding.T).squeeze()

    closest_token_id = torch.argmax(similarities, dim=0).item()

    return [closest_token_id]

def evaluate_model(model, dataloader, tokenizer, embedding_matrix, test_captions_normalized):
    model.eval()
    predictions, references = [], []

    embedding_matrix = embedding_matrix.to(DEVICE)
    embedding_matrix = embedding_matrix / embedding_matrix.norm(dim=-1, keepdim=True)

    with torch.no_grad():
        for i, (image_embeddings, _) in enumerate(tqdm(dataloader, desc="Evaluating")):
            caption_embedding = test_captions_normalized[i]

            if caption_embedding.ndim == 2:
                caption_embedding = caption_embedding[0, :]
            caption_embedding = torch.tensor(caption_embedding).to(DEVICE)

            image_embeddings = image_embeddings.unsqueeze(1).to(DEVICE)

            input_embedding = torch.zeros((image_embeddings.size(0), 1, 512), device=DEVICE)
            generated_embeddings = []

            for _ in range(512):
                outputs = model(image_embeddings, input_embedding)
                next_embedding = outputs[:, -1, :].unsqueeze(1)
                generated_embeddings.append(next_embedding)
                input_embedding = next_embedding

            generated_embeddings = torch.cat(generated_embeddings, dim=1)  # (seq_len, embed_dim)

            generated_embeddings = generated_embeddings.view(-1, generated_embeddings.shape[-1])

            generated_embeddings = generated_embeddings / generated_embeddings.norm(dim=-1, keepdim=True)

            generated_ids = embedding_to_tokens(generated_embeddings[0], embedding_matrix)
            reference_ids = embedding_to_tokens(caption_embedding, embedding_matrix)

            generated_text = preprocess_text(tokenizer.decode(generated_ids, skip_special_tokens=True))
            reference_text = preprocess_text(tokenizer.decode(reference_ids, skip_special_tokens=True))

            predictions.append(generated_text.split())
            references.append([reference_text.split()])

    smoothing_function = SmoothingFunction().method1
    bleu_score = corpus_bleu(references, predictions, smoothing_function=smoothing_function)

    return {"BLEU": bleu_score}

# Evaluate Models

In [None]:
captions_normalized = np.load("/content/drive/MyDrive/Stat Learning Project/images/Flickr30k/flickr_captions_normalized.npy")

# Evaluate normal fine-tuned model
print("Evaluation Results with ADA:")
model, tokenizer = load_model(model, tokenizer, "./normal_fine_tuned_flickr30k")
embedding_matrix_without_ada = model.embedding.weight.detach()
normal_results = evaluate_model(model, test_dataloader, tokenizer, embedding_matrix_without_ada, captions_normalized)
print(normal_results["BLEU"])

Evaluation Results Without ADA:
100%|██████████| 2000/2000 [04:33<00:00,  2.80s/it] 
BLEU Score: 0.2804688646965363


In [None]:
# Evaluate normal fine-tuned model
print("Evaluation Results with ADA:")
model, tokenizer = load_model(model, tokenizer, "./ada_fine_tuned_flickr30k")
embedding_matrix_with_ada = model.embedding.weight.detach()
ada_results = evaluate_model(model, test_dataloader, tokenizer, embedding_matrix_with_ada, captions_normalized)
print(ada_results["BLEU"])

Evaluation Results with ADA:
100%|██████████| 2000/2000 [04:33<00:00,  2.80s/it] 
BLEU Score: 0.4116460019070589
