In [None]:
pip install transformers datasets



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from transformers import ViTModel, ViTConfig, BertTokenizer
from datasets import load_dataset
from PIL import Image

import numpy as np

In [None]:
# Load the dataset
dataset = load_dataset("tomytjandra/h-and-m-fashion-caption-12k")  # Replace with your dataset path or identifier

# Initialize the tokenizer (you can choose a different tokenizer if preferred)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define image transformations
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ViT typically expects 224x224 images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet statistics
                         std=[0.229, 0.224, 0.225]),
])

# Preprocessing function
def preprocess_function(examples):
    # Process images
    images = [image_transform(image.convert("RGB")) for image in examples['image']]
    examples['pixel_values'] = images

    # Tokenize captions
    captions = examples['text']
    encoding = tokenizer(captions, padding='max_length', truncation=True, max_length=224, return_tensors="pt")
    examples['input_ids'] = encoding['input_ids']
    examples['attention_mask'] = encoding['attention_mask']

    return examples

# Step 1: Split into train_val and test
train_val_split = dataset['train'].train_test_split(test_size=1250, seed=42)  # 10% for test
train_val = train_val_split['train']
test = train_val_split['test']

# Step 2: Split train_val into train and validation
train_validation_split = train_val.train_test_split(test_size=1250, seed=42)
train = train_validation_split['train']
validation = train_validation_split['test']

# Step 3: Create a new DatasetDict with the splits
new_dataset = DatasetDict({
    'train': train,
    'validation': validation,
    'test': test
})

# Optional: Verify the splits
print(new_dataset)
processed_train = new_dataset['train'].map(preprocess_function, batched=True, batch_size=100, remove_columns=['text', 'image'])
processed_val = new_dataset['validation'].map(preprocess_function, batched=True, batch_size=100, remove_columns=['text', 'image'])
processed_test = new_dataset['test'].map(preprocess_function, batched=True, batch_size=100, remove_columns=['text', 'image'])

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



Split Dataset Structure:
DatasetDict({
    train: Dataset({
        features: ['text', 'image'],
        num_rows: 9949
    })
    test: Dataset({
        features: ['text', 'image'],
        num_rows: 2488
    })
})


Map:   0%|          | 0/9949 [00:00<?, ? examples/s]

In [None]:
# Define training parameters
batch_size = 32

# Create DataLoader for training
train_loader = DataLoader(processed_train, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(processed_val, batch_size=batch_size, shuffle=True)

In [None]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, decoder_layers, decoder_heads, decoder_ffn_dim, max_seq_length=224):
        super(ImageCaptioningModel, self).__init__() #initialize from parent .init()

        # Encoder: Vision Transformer (ViT)
        vit_config = ViTConfig.from_pretrained('google/vit-base-patch16-224-in21k')
        vit_config.num_hidden_layers = 6  # Reduce the number of layers to 4-6
        self.encoder = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k', config=vit_config)

        # Decoder: Transformer
        decoder_layer = nn.TransformerDecoderLayer(d_model=vit_config.hidden_size, nhead=decoder_heads, dim_feedforward=decoder_ffn_dim)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=decoder_layers)

        # Embedding for input tokens
        self.token_embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_length, embed_dim)) # ??????????????

        # Final linear layer to generate vocabulary scores
        self.output_linear = nn.Linear(embed_dim, vocab_size)

        # Projection to match dimensions
        self.encoder_proj = nn.Linear(vit_config.hidden_size, embed_dim)
        self.decoder_proj = nn.Linear(embed_dim, vit_config.hidden_size)

    def forward(self, pixel_values, input_ids, attention_mask):
        # Encoder
        encoder_outputs = self.encoder(pixel_values=pixel_values)
        encoder_hidden_states = encoder_outputs.last_hidden_state  # (batch_size, num_patches + 1, hidden_size)
        # Project encoder outputs to embed_dim
        encoder_proj = self.encoder_proj(encoder_hidden_states)  # (batch_size, seq_len, embed_dim)
        encoder_proj = encoder_proj.permute(1, 0, 2)  # (seq_len, batch_size, embed_dim)

        # Decoder
        embeddings = self.token_embedding(input_ids) + self.positional_encoding[:, :input_ids.size(1), :]
        embeddings = embeddings.permute(1, 0, 2)  # (seq_len, batch_size, embed_dim)

        decoder_outputs = self.decoder(embeddings, encoder_proj, tgt_key_padding_mask=~attention_mask.bool())
        decoder_outputs = decoder_outputs.permute(1, 0, 2)  # (batch_size, seq_len, embed_dim)

        outputs = self.output_linear(decoder_outputs)  # (batch_size, seq_len, vocab_size)

        return outputs

In [None]:
# Define vocabulary size and other hyperparameters
vocab_size = tokenizer.vocab_size
embed_dim = 512
decoder_layers = 6  # 4-6 layers as per requirement
decoder_heads = 8
decoder_ffn_dim = 2048
max_seq_length = 30

# Initialize the model
model = ImageCaptioningModel(vocab_size, embed_dim, decoder_layers, decoder_heads, decoder_ffn_dim, max_seq_length)

# Move model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# Define separate learning rates
learning_rate_encoder = 1e-5  # Lower learning rate for pre-trained encoder
learning_rate_decoder = 1e-4  # Higher learning rate for decoder
# Create parameter groups
optimizer = optim.AdamW([
    {'params': model.encoder.parameters(), 'lr': learning_rate_encoder},
    {'params': model.decoder.parameters(), 'lr': learning_rate_decoder},
    {'params': model.token_embedding.parameters(), 'lr': learning_rate_decoder},
    {'params': model.encoder_proj.parameters(), 'lr': learning_rate_decoder},
    {'params': model.decoder_proj.parameters(), 'lr': learning_rate_decoder},
    {'params': model.output_linear.parameters(), 'lr': learning_rate_decoder}
], betas=(0.9, 0.98), eps=1e-9)

# Initialize the scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min',
                                                 factor=0.5, patience=2,
                                                 verbose=True, min_lr=1e-6)

In [None]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    epoch_val_loss = 0
    for batch in train_loader:
        pixel_values = batch['pixel_values'].to(device)  # (batch_size, 3, 224, 224)
        input_ids = batch['input_ids'].to(device)        # (batch_size, seq_length)
        attention_mask = batch['attention_mask'].to(device)  # (batch_size, seq_length)

        # Shift input_ids and create labels
        # Typically, input_ids are shifted right for the decoder input
        # Labels are the actual tokens to predict
        labels = input_ids[:, 1:].contiguous()
        decoder_input_ids = input_ids[:, :-1].contiguous()
        decoder_attention_mask = attention_mask[:, :-1].contiguous()

        optimizer.zero_grad()

        outputs = model(pixel_values, decoder_input_ids, decoder_attention_mask)
        # outputs: (batch_size, seq_length -1, vocab_size)

        loss = criterion(outputs.view(-1, vocab_size), labels.view(-1))
        loss.backward()

        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)

    model.eval()
    with torch.no_grad():
        for batch in val_loader:
            pixel_values = batch['pixel_values'].to(device)  # (batch_size, 3, 224, 224)
            input_ids = batch['input_ids'].to(device)        # (batch_size, seq_length)
            attention_mask = batch['attention_mask'].to(device)  # (batch_size, seq_length)

            labels = input_ids[:, 1:].contiguous()
            decoder_input_ids = input_ids[:, :-1].contiguous()
            decoder_attention_mask = attention_mask[:, :-1].contiguous()

            outputs = model(pixel_values, decoder_input_ids, decoder_attention_mask)

            loss = criterion(outputs.view(-1, vocab_size), labels.view(-1))

            epoch_val_loss += loss.item()

    avg_val_loss = epoch_val_loss / len(val_loader)

    scheduler.step(avg_val_loss)

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}, val_loss: {avg_val_loss:.4f}, lr: {scheduler.get_last_lr()}")
