In [None]:
import numpy as np 
import pandas as pd 
import os

In [None]:
%%capture
!pip install transformers nltk numpy datasets==2.16.0

In [None]:
import os
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datasets import load_dataset, set_caching_enabled
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from transformers import (
    AutoTokenizer, AutoFeatureExtractor,
    AutoModel, AutoConfig,  
    TrainingArguments, Trainer,
    logging
)
import warnings 
warnings.filterwarnings('ignore')

import nltk
nltk.download('wordnet')
import nltk
import subprocess
import safetensors
try:
    nltk.data.find('wordnet.zip')
except:
    nltk.download('wordnet', download_dir='/kaggle/working/')
    command = "unzip /kaggle/working/corpora/wordnet.zip -d /kaggle/working/corpora"
    subprocess.run(command.split())
    nltk.data.path.append('/kaggle/working/')

from nltk.corpus import wordnet
from sklearn.metrics import accuracy_score, f1_score

In [None]:
os.environ['HF_HOME'] = os.path.join(".", "cache")

set_caching_enabled(True)
logging.set_verbosity_error()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

In [None]:
dataset = load_dataset(
    "csv", 
    data_files={
        "train": "/kaggle/input/preprocessed/processed/data_train.csv",
        "test": "/kaggle/input/preprocessed/processed/data_eval.csv"
    }
)

with open("/kaggle/input/preprocessed/processed/answer_space.txt") as f:
    answer_space = f.read().splitlines()

dataset = dataset.map(
    lambda examples: {
        'label': [
            answer_space.index(ans.replace(" ", "").split(",")[0])  # select the 1st answer if multiple answers are provided
            for ans in examples['answer']
        ]
    },
    batched=True
)

In [None]:
import IPython.display as display

def show_example(train=True, idx=None):
    dataset_type = "train" if train else "test"
    data = dataset[dataset_type]

    if idx is None:
        idx = np.random.randint(len(data))

    image_path =  "/kaggle/input/preprocessed/processed/images/"+ f"{data[idx]['image_id']}.png"
    image = Image.open(image_path)
    display.display(image)

    question = data[idx]["question"]
    answer = data[idx]["answer"]
    label = data[idx]["label"]

    print(f"Questions : {question}")
    print(f"Answers : {answer} (Answer_Label: {label})")

    return answer
show_example()

In [None]:
@dataclass
class MultimodalCollator:
    tokenizer: AutoTokenizer
    preprocessor: AutoFeatureExtractor
    
    def tokenize_text(self, texts: List[str]) -> Dict[str, torch.Tensor]:
        encoded_text = self.tokenizer(
            text=texts,
            padding='longest',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
        return {
            "input_ids": encoded_text['input_ids'].squeeze(),
            "token_type_ids": encoded_text['token_type_ids'].squeeze(),
            "attention_mask": encoded_text['attention_mask'].squeeze(),
        }
    
    def preprocess_images(self, images: List[str]) -> Dict[str, torch.Tensor]:
        processed_images = self.preprocessor(
            images=[
                Image.open(os.path.join("/kaggle/input/preprocessed/processed/images/", f"{image_id}.png")).convert('RGB')
                for image_id in images
            ],
            return_tensors="pt",
        )
        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }
            
    def __call__(self, raw_batch_dict) -> Dict[str, torch.Tensor]:
        question_batch = raw_batch_dict['question'] if isinstance(raw_batch_dict, dict) else [i['question'] for i in raw_batch_dict]
        image_id_batch = raw_batch_dict['image_id'] if isinstance(raw_batch_dict, dict) else [i['image_id'] for i in raw_batch_dict]
        label_batch = raw_batch_dict['label'] if isinstance(raw_batch_dict, dict) else [i['label'] for i in raw_batch_dict]

        return {
            **self.tokenize_text(question_batch),
            **self.preprocess_images(image_id_batch),
            'labels': torch.tensor(label_batch, dtype=torch.int64),
        }


In [None]:
! pip install --upgrade transformers

In [None]:
def wup_measure(a, b, similarity_threshold=0.925):
    def get_semantic_field(word):
        weight = 1.0
        semantic_field = wordnet.synsets(word, pos=wordnet.NOUN)
        return semantic_field, weight

    def get_stem_word(word):
        weight = 1.0
        return word, weight

    global_weight = 1.0
    a, global_weight_a = get_stem_word(a)
    b, global_weight_b = get_stem_word(b)
    global_weight = min(global_weight_a, global_weight_b)
    if a == b:
        return 1.0 * global_weight
    if a == "" or b == "":
        return 0
    interp_a, weight_a = get_semantic_field(a)
    interp_b, weight_b = get_semantic_field(b)
    if interp_a == [] or interp_b == []:
        return 0
    global_max = 0.0
    for x in interp_a:
        for y in interp_b:
            local_score = x.wup_similarity(y)
            if local_score > global_max:
                global_max = local_score
    if global_max < similarity_threshold:
        interp_weight = 0.1
    else:
        interp_weight = 1.0

    final_score = global_max * weight_a * weight_b * interp_weight * global_weight
    return final_score

def batch_wup_measure(labels, preds):
    wup_scores = [wup_measure(answer_space[label], answer_space[pred]) for label, pred in zip(labels, preds)]
    return np.mean(wup_scores)


# Without LORA

## If you don't want to use Lora execute form here till you reach the heading of with Lora. If you want to use Lora skip till you reach the heading With Lora and execute all cells

In [None]:
class MultimodalVQAModel(nn.Module):
    def __init__(
        self,
        num_labels: int = len(answer_space),
        intermediate_dim: int = 512,
        pretrained_text_name: str = 'bert-base-uncased',
        pretrained_image_name: str = 'facebook/deit-base-distilled-patch16-224'
    ):
        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name
        
        self.text_encoder = AutoModel.from_pretrained(self.pretrained_text_name)
        self.image_encoder = AutoModel.from_pretrained(self.pretrained_image_name)
        self.fusion = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
        )
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        self.criterion = nn.CrossEntropyLoss()

    def forward(
        self,
        input_ids: torch.LongTensor,
        pixel_values: torch.FloatTensor,
        attention_mask: Optional[torch.LongTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        labels: Optional[torch.LongTensor] = None
    ):
        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        fused_output = self.fusion(
            torch.cat(
                [
                    encoded_text['pooler_output'],
                    encoded_image['pooler_output'],
                ],
                dim=1
            )
        )
        logits = self.classifier(fused_output)
        
        out = {"logits": logits}
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss
        
        return out

In [None]:
def create_multimodal_vqa_collator_and_model(text_encoder='bert-base-uncased', image_encoder='facebook/deit-base-distilled-patch16-224'):
    
    tokenizer = AutoTokenizer.from_pretrained(text_encoder)
    preprocessor = AutoFeatureExtractor.from_pretrained(image_encoder)
    
    
    multimodal_collator = MultimodalCollator(
        tokenizer=tokenizer,
        preprocessor=preprocessor,
    )
    multimodal_model = MultimodalVQAModel(
        pretrained_text_name=text_encoder,
        pretrained_image_name=image_encoder
    ).to(device)

    return multimodal_collator, multimodal_model


In [None]:
labels = np.random.randint(len(answer_space), size=5)
preds = np.random.randint(len(answer_space), size=5)

def showAnswers(ids):
    print([answer_space[id] for id in ids])

showAnswers(labels)
showAnswers(preds)

print("Predictions vs Labels: ", batch_wup_measure(labels, preds))
print("Labels vs Labels: ", batch_wup_measure(labels, labels))

In [None]:
from typing import Tuple, Dict
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
    logits, labels = eval_tuple
    preds = logits.argmax(axis=-1)
    metrics = {
        "wups": batch_wup_measure(labels, preds),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro')
    }

    return metrics


In [None]:
args = TrainingArguments(
    output_dir="/kaggle/working/checkpoint/",            # Output directory for checkpoints and logs=
    seed=12345,                         # Seed for reproducibility
    evaluation_strategy="epoch",        # Evaluation strategy: "steps" or "epoch"
    eval_steps=100,                     # Evaluate every 100 steps
    logging_strategy="epoch",           # Logging strategy: "steps" or "epoch"
    logging_steps=100,                  # Log every 100 steps
    save_strategy="epoch",              # Saving strategy: "steps" or "epoch"
    save_steps=100,                     # Save every 100 steps
    save_total_limit=3,                 # Save only the last 3 checkpoints at any given time during training 
    metric_for_best_model='wups',       # Metric used for determining the best model
    per_device_train_batch_size=32,     # Batch size per GPU for training
    per_device_eval_batch_size=32,      # Batch size per GPU for evaluation
    remove_unused_columns=False,        # Whether to remove unused columns in the dataset
    num_train_epochs=20,                 # Number of training epochs
    fp16=True,                          # Enable mixed precision training (float16)
    dataloader_num_workers=8,           # Number of workers for data loading
    load_best_model_at_end=True,        # Whether to load the best model at the end of training
)

In [None]:
def create_and_train_model(dataset, args, text_model='bert-base-uncased', image_model='microsoft/beit-base-patch16-224-pt22k-ft22k', multimodal_model='bert_deit'):
    
    print(text_model,image_model)
    collator, model = create_multimodal_vqa_collator_and_model(text_model, image_model)
    multi_args = deepcopy(args)
    multi_args.output_dir = os.path.join("/kaggle/working/checkpoint/", multimodal_model)
    print(multi_args.output_dir)
    multi_trainer = Trainer(
        model,
        multi_args,
        train_dataset=dataset['train'],
        eval_dataset=dataset['test'],
        data_collator=collator,
        compute_metrics=compute_metrics
    )
    train_multi_metrics = multi_trainer.train()
    eval_multi_metrics = multi_trainer.evaluate()
    
    return collator, model, train_multi_metrics, eval_multi_metrics, multi_trainer


## Text Models : 

Bert : bert-base-uncased

Roberta : roberta-base

## Image Models :

ViT : google/vit-base-patch16-224

DeIT : facebook/deit-base-distilled-patch16-224

BeIT : microsoft/beit-base-patch16-224-pt22k-ft22k

You can use any combo here by replacing text_model and image_model here

In [None]:
collator, model, train_multi_metrics, eval_multi_metrics, trainer = create_and_train_model( dataset, args,text_model='roberta-base',image_model='microsoft/beit-base-patch16-224-pt22k-ft22k')

In [None]:
eval_multi_metrics

In [None]:
print(model)

In [None]:
torch.save(model.state_dict(), 'Roberta_BeIT_weights.pth')

In [None]:
def count_trainable_parameters(model):
    num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("Number of trainable parameters: {:,}".format(num_params))
count_trainable_parameters(model)

# With LORA

# Text Transformers

In [None]:
from transformers import RobertaConfig, RobertaModel
import torch.nn as nn

class LoRaRobertaModel(nn.Module):
    def __init__(self, model_name='roberta-base', rank=32, lora_alpha=32, lora_dropout=0.1):
        super(LoRaRobertaModel, self).__init__()
        self.config = RobertaConfig.from_pretrained(model_name)
        self.config.lora = True
        self.config.lora_rank = rank
        self.config.lora_alpha = lora_alpha
        self.config.lora_dropout = lora_dropout
        self.roberta = RobertaModel(self.config)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        return self.roberta(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

model = LoRaRobertaModel()

In [None]:
from transformers import BertConfig, BertModel
import torch.nn as nn

class LoRaBertModel(nn.Module):
    def __init__(self, model_name='bert-base-uncased', rank=16, lora_alpha=32, lora_dropout=0.1):
        super(LoRaBertModel, self).__init__()
        self.config = BertConfig.from_pretrained(model_name)
        self.config.lora = True
        self.config.lora_rank = rank
        self.config.lora_alpha = lora_alpha
        self.config.lora_dropout = lora_dropout
        self.bert = BertModel(self.config)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None):
        return self.bert(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)



In [None]:
from transformers import  AutoTokenizer, AutoFeatureExtractor, TrainingArguments, Trainer

# Vision Transformers

In [None]:
from transformers import ViTConfig, ViTModel
import torch.nn as nn

class LoRaViTModel(nn.Module):
    def __init__(self, model_name='google/vit-base-patch16-224', rank=32, lora_alpha=32, lora_dropout=0.1):
        super(LoRaViTModel, self).__init__()
        self.config = ViTConfig.from_pretrained(model_name)
        self.config.lora = True
        self.config.lora_rank = rank
        self.config.lora_alpha = lora_alpha
        self.config.lora_dropout = lora_dropout
        self.vit = ViTModel(self.config)

    def forward(self, pixel_values, attention_mask=None):
        outputs = self.vit(pixel_values)
        return outputs


In [None]:
from transformers import DeiTConfig, DeiTModel
import torch.nn as nn

class LoRaDeiTModel(nn.Module):
    def __init__(self, model_name='facebook/deit-base-patch16-224', rank=32, lora_alpha=32, lora_dropout=0.1):
        super(LoRaDeiTModel, self).__init__()
        self.config = DeiTConfig.from_pretrained(model_name)
        self.config.lora = True
        self.config.lora_rank = rank
        self.config.lora_alpha = lora_alpha
        self.config.lora_dropout = lora_dropout
        self.deit = DeiTModel(self.config)

    def forward(self, pixel_values, attention_mask=None):
        outputs = self.deit(pixel_values)
        return outputs


In [None]:
from transformers import BeitConfig, BeitModel
class LoRaBeitModel(nn.Module):
    def __init__(self, model_name='microsoft/beit-base-patch16-224', rank=32, lora_alpha=32, lora_dropout=0.1):
        super(LoRaBeitModel, self).__init__()
        self.config = BeitConfig.from_pretrained(model_name)
        self.config.lora = True
        self.config.lora_rank = rank
        self.config.lora_alpha = lora_alpha
        self.config.lora_dropout = lora_dropout
        self.beit = BeitModel(self.config)

    def forward(self, pixel_values, attention_mask=None):
        outputs = self.beit(pixel_values)
        return outputs

# Modeling starts here

Here , if you want to use any combo , then use the corresponding text model and image model in MultimodalVQAModel

## like  in  multimodelVQAModel class,

just change the self.text_encoder and self.image_encoder  to the required Class. Like we have used Bert and Deit here. One change change it using the above defined classes. Params will remain the same. Chnage the rank manually here. Defualt is 32 here.

In [None]:
class MultimodalVQAModel(nn.Module):
    def __init__(self, num_labels, intermediate_dim=512,pretrained_text_name = 'bert-base-uncased', pretrained_image_name='google/vit-base-patch16-224-in21k',rank = 32):
        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.text_encoder = LoRaBertModel(pretrained_text_name, rank=rank, lora_alpha=16, lora_dropout=0.1)
        self.image_encoder = LoRaDeiTModel(pretrained_image_name, rank=rank, lora_alpha=16, lora_dropout=0.1)
        self.fusion = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
        )
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, input_ids, pixel_values, attention_mask=None, token_type_ids=None, labels=None):
        text_output = self.text_encoder(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        image_output = self.image_encoder(pixel_values)
        fused_output = self.fusion(torch.cat([text_output.pooler_output, image_output.pooler_output], dim=1))
        logits = self.classifier(fused_output)
        output = {"logits": logits}
        if labels is not None:
            loss = self.criterion(logits, labels)
            output["loss"] = loss
        
        return output

In [None]:
def create_multimodal_vqa_collator_and_model(text_encoder='bert-base-uncased',image_encoder='google/vit-base-patch16-224-in21k'):
    
    tokenizer = AutoTokenizer.from_pretrained(text_encoder)
    preprocessor = AutoFeatureExtractor.from_pretrained(image_encoder)
    
    multimodal_collator = MultimodalCollator(
        tokenizer=tokenizer,
        preprocessor=preprocessor,
    )

    multimodal_model = MultimodalVQAModel(
        num_labels=len(answer_space), 
        intermediate_dim=512,
         pretrained_text_name=text_encoder,
        pretrained_image_name=image_encoder
    ).to(device)

    return multimodal_collator, multimodal_model

In [None]:
from typing import Tuple, Dict
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
    
    logits, labels = eval_tuple
    preds = logits.argmax(axis=-1)
    metrics = {
        "wups": batch_wup_measure(labels, preds),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average='macro'),
        "precision": precision_score(labels, preds, average='macro'),
        "recall": recall_score(labels, preds, average='macro')
    }

    return metrics

In [None]:
args = TrainingArguments(
    output_dir="/kaggle/working/checkpoint/",            # Output directory for checkpoints and logs=
    seed=12345,                         # Seed for reproducibility
    evaluation_strategy="epoch",        # Evaluation strategy: "steps" or "epoch"
    eval_steps=100,                     # Evaluate every 100 steps
    logging_strategy="epoch",           # Logging strategy: "steps" or "epoch"
    logging_steps=100,                  # Log every 100 steps
    save_strategy="epoch",              # Saving strategy: "steps" or "epoch"
    save_steps=100,                     # Save every 100 steps
    save_total_limit=3,                 # Save only the last 3 checkpoints at any given time during training 
    metric_for_best_model='wups',       # Metric used for determining the best model
    per_device_train_batch_size=32,     # Batch size per GPU for training
    per_device_eval_batch_size=32,      # Batch size per GPU for evaluation
    remove_unused_columns=False,        # Whether to remove unused columns in the dataset
    num_train_epochs=20,                 # Number of training epochs
    fp16=True,                          # Enable mixed precision training (float16)
    dataloader_num_workers=8,           # Number of workers for data loading
    load_best_model_at_end=True,        # Whether to load the best model at the end of training
)

In [None]:
def create_and_train_model(dataset, args, text_model='roberta-base',image_model="microsoft/beit-base-patch16-224-pt22k-ft22k", multimodal_model='bert_vit'):
    
    print(text_model,image_model)
    collator, model = create_multimodal_vqa_collator_and_model(text_model,image_model)
    
    
    multi_args = deepcopy(args)
    multi_args.output_dir = os.path.join("/kaggle/working/checkpoint/", multimodal_model)
    print(multi_args.output_dir)

    
    multi_trainer = Trainer(
        model,
        multi_args,
        train_dataset=dataset['train'],
        eval_dataset=dataset['test'],
        data_collator=collator,
        compute_metrics=compute_metrics
    )
    
    
    train_multi_metrics = multi_trainer.train()
    eval_multi_metrics = multi_trainer.evaluate()
    
    return collator, model, train_multi_metrics, eval_multi_metrics, multi_trainer

In [None]:
collator, model, train_multi_metrics, eval_multi_metrics, trainer = create_and_train_model( dataset, args,text_model='bert-base-uncased',image_model="microsoft/beit-base-patch16-224")

In [None]:
eval_multi_metrics

In [None]:
print(model)

In [None]:
torch.save(model.state_dict(), 'Bert_BeIT_weights_lora_32_new.pth')

In [None]:
def count_trainable_parameters(model):
    num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("Number of trainable parameters: {:,}".format(num_params))
count_trainable_parameters(model)