# 1. Introduction

Step 1: Loading the Dataset

In [18]:
from datasets import load_dataset

# Load the VQA-RAD dataset
dataset = load_dataset("flaviagiammarino/vqa-rad")

# Split into training and validation sets
train_dataset = dataset['train']
val_dataset = dataset['test']  # Assuming there is a validation split

Found cached dataset parquet (/Users/hemang/.cache/huggingface/datasets/flaviagiammarino___parquet/flaviagiammarino--vqa-rad-d04980c9c3579419/0.0.0/2a3b91fbd88a2c90d1dbbb32b460cf621d31bd5b05b934492fdef7d8d6f236ec)


  0%|          | 0/2 [00:00<?, ?it/s]

Step 2: Feature Extraction

In [19]:
import torch
from torchvision import models, transforms
from transformers import AutoTokenizer, AutoModel

# Check if MPS (Metal Performance Shaders) is available and set the device
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Load and configure ResNet-50 for image feature extraction
resnet50 = models.resnet50(pretrained=True).to(device)
resnet50.eval()

# Define image transformations
image_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def extract_image_features(image):
    image = image_transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        features = resnet50(image)
    return features.squeeze().cpu()

# Load and configure RoBERTa for question feature extraction
tokenizer = AutoTokenizer.from_pretrained("roberta-large")
roberta = AutoModel.from_pretrained("roberta-large").to(device)
roberta.eval()

def extract_question_features(question):
    inputs = tokenizer(question, return_tensors="pt", truncation=True, padding=True, max_length=128).to(device)
    with torch.no_grad():
        outputs = roberta(**inputs)
    return outputs.last_hidden_state[:, 0, :].squeeze().cpu()

Some weights of the model checkpoint at roberta-large were not used when initializing RobertaModel: ['lm_head.bias', 'lm_head.layer_norm.bias', 'lm_head.decoder.weight', 'lm_head.layer_norm.weight', 'lm_head.dense.weight', 'lm_head.dense.bias']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Step 3: Applying Feature Extraction

In [20]:
def extract_and_store_features(dataset):
    def extract_features(example):
        image_features = extract_image_features(example['image'])
        question_features = extract_question_features(example['question'])
        return {
            'image_features': image_features.tolist(),
            'question_features': question_features.tolist(),
            'label': example['answer']  # Assuming the label is in the 'answer' field
        }
    
    # Apply the feature extraction
    dataset = dataset.map(extract_features, batched=False)
    return dataset

# Apply extraction to both the train and validation datasets
train_dataset = extract_and_store_features(train_dataset)
val_dataset = extract_and_store_features(val_dataset)

Map:   0%|          | 0/1793 [00:00<?, ? examples/s]



Map:   0%|          | 0/451 [00:00<?, ? examples/s]

Step 4: Feature Fusion

In [21]:
def fuse_features(example):
    fused_features = torch.cat((torch.tensor(example['image_features']), torch.tensor(example['question_features'])), dim=0)
    return {'fused_features': fused_features.tolist(), 'label': example['label']}

# Apply feature fusion to the train and validation datasets
train_dataset = train_dataset.map(fuse_features, batched=False)
val_dataset = val_dataset.map(fuse_features, batched=False)

# Optionally, remove the individual feature columns
train_dataset = train_dataset.remove_columns(['image_features', 'question_features'])
val_dataset = val_dataset.remove_columns(['image_features', 'question_features'])

Map:   0%|          | 0/1793 [00:00<?, ? examples/s]

Map:   0%|          | 0/451 [00:00<?, ? examples/s]

4 b Prepare DataLoaders

Step 5: Model Training and Evaluation

1. Define the Model Architecture

- Image Feature Input: Use the extracted features from ResNet-50.
- Question Feature Input: Use the embeddings from RoBERTa or another text processor.
- Fusion Layer: Combine image and question features.
- LSTM Layer: Pass the fused features through an LSTM layer for sequence modeling.
- Output Layer: Use a dense layer to map the LSTM outputs to the answer classes (classification) or to generate the final answer.
2. Loss Function and Optimizer
- Loss Function: For classification, use Cross-Entropy Loss since the answers are categorical.
- Optimizer: Use Adam optimizer, which is well-suited for training deep learning models due to its adaptive learning rate.
- Learning Rate: Set an initial learning rate, e.g., 1e-4, and consider using a learning rate scheduler to reduce the rate as training progresses.
3. Training the Model
- Batch Size: Select an appropriate batch size, such as 32 or 64, depending on your system's memory.
- Epochs: Start with 10-20 epochs and adjust based on the model's performance.
- Data Augmentation: If needed, apply techniques like random cropping or flipping to the image data to increase diversity.
- Validation Split: Use a portion of the training data for validation (e.g., 10-20%) to monitor the model's performance during training.
4. Evaluation Metrics
- Accuracy: Calculate the percentage of correct answers.
- Precision, Recall, and F1 Score: These metrics are especially important for understanding the model's performance on imbalanced datasets.
- Confusion Matrix: Provides insights into specific types of errors the model is making.

5. Prepare DataLoaders

In [26]:
from torch.utils.data import DataLoader, TensorDataset

# Step 1: Create a label mapping (example mapping)
label_to_index = {label: idx for idx, label in enumerate(set(train_dataset['label']))}

# Step 2: Update DataLoader preparation function
def prepare_dataloader(dataset, batch_size=32, shuffle=True):
    # Convert labels from strings to integers using the mapping
    labels = torch.tensor([label_to_index[label] for label in dataset['label']])
    
    # Convert fused features to a tensor
    fused_features = torch.tensor(dataset['fused_features'])
    
    # Create a TensorDataset
    tensor_dataset = TensorDataset(fused_features, labels)
    
    # Create a DataLoader
    dataloader = DataLoader(tensor_dataset, batch_size=batch_size, shuffle=shuffle)
    
    return dataloader

# Prepare DataLoaders for training and validation datasets
train_loader = prepare_dataloader(train_dataset)
val_loader = prepare_dataloader(val_dataset, shuffle=False)


KeyError: 'not seen here'

6. Define the Model Architecture

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class VQAModel(nn.Module):
    def __init__(self, image_feature_dim, question_feature_dim, hidden_dim, output_dim):
        super(VQAModel, self).__init__()
        
        # Image feature processing
        self.image_fc = nn.Linear(image_feature_dim, hidden_dim)
        
        # Question feature processing
        self.question_fc = nn.Linear(question_feature_dim, hidden_dim)
        
        # LSTM layer for sequence modeling
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        
        # Final output layer
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, image_features, question_features):
        # Process image features
        image_features = F.relu(self.image_fc(image_features))
        
        # Process question features
        question_features = F.relu(self.question_fc(question_features))
        
        # Concatenate image and question features
        combined_features = torch.cat((image_features.unsqueeze(1), question_features.unsqueeze(1)), dim=1)
        
        # Sequence modeling with LSTM
        lstm_out, _ = self.lstm(combined_features)
        
        # Use the output from the last LSTM cell
        lstm_out = lstm_out[:, -1, :]
        
        # Apply dropout
        lstm_out = self.dropout(lstm_out)
        
        # Final output layer
        output = self.fc_out(lstm_out)
        
        return output

# Parameters for the model
image_feature_dim = 2048  # Example: features from ResNet-50
question_feature_dim = 768  # Example: embeddings from RoBERTa
hidden_dim = 512  # Dimension of the hidden layer in LSTM
output_dim = 100  # Number of possible answers (adjust based on your dataset)

# Initialize the model
model = VQAModel(image_feature_dim, question_feature_dim, hidden_dim, output_dim).to(device)

# Define the loss function
criterion = nn.CrossEntropyLoss()

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

7. Training and Evaluation Loop

In [22]:
num_epochs = 10

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0
    
    for fused_features, labels in train_loader:
        fused_features, labels = fused_features.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(fused_features[:, :image_feature_dim], fused_features[:, image_feature_dim:])
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    train_accuracy = 100. * correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss/len(train_loader):.4f}, Accuracy: {train_accuracy:.2f}%')
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for fused_features, labels in val_loader:
            fused_features, labels = fused_features.to(device), labels.to(device)
            
            outputs = model(fused_features[:, :image_feature_dim], fused_features[:, image_feature_dim:])
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()
    
    val_accuracy = 100. * val_correct / val_total
    print(f'Validation Loss: {val_loss/len(val_loader):.4f}, Validation Accuracy: {val_accuracy:.2f}%')

NameError: name 'train_loader' is not defined