In [None]:

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'visual-question-answering-computer-vision-nlp:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F2264789%2F3798293%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240522%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240522T020232Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D298f4721143ba683f807f4e0c330bba8c35418d1e4f21bdffd5faef9e3835d4126645d4c03d5a5580d729b19f92a9ecc69d691ebcc8bd2e2d0f1fd5486045c6f5110ffd4d3696f6cdceac88eca259b6888cc05f83ef2bdbce9ea3db8904cc82b1902bcd5652e645d8b8358ad1196fb2bbdd8acbc888bf600eb8fa2c19b6f444827d436bbce4067451f1e16f746a00e392226c083d722dcaaae56be7262415626b6cb3b44b65626fd8f6b1c85475cdc6a9d0d1934991ac8cba704493810a0c5793a4a9c64a60c616addf8416bbb00836c5a15ef28a6ef3c30c53b7f8987133be542bcba648686fafbf624942437cdd805e08bf07bcfbf6ff49ef8d19e543e0e89'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


# Visual Question Answering using Multimodal Transformer Models

## Import necessary libraries & set up the environment

In [None]:
import os
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datasets import load_dataset, set_caching_enabled
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoFeatureExtractor,AutoModel, TrainingArguments, Trainer, logging
from nltk.corpus import wordnet
from sklearn.metrics import accuracy_score, f1_score

In [None]:
os.environ['HF_HOME'] = os.path.join(".", "cache")
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

set_caching_enabled(True)
logging.set_verbosity_error()

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

In [None]:
dataset = load_dataset(
    "csv",
    data_files={
        "train": os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset","data_train.csv"),
        "test": os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "data_eval.csv")
    }
)

with open(os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "answer_space.txt")) as f:
    answer_space = f.read().splitlines()

dataset = dataset.map(
    lambda examples: {
        'label': [
            answer_space.index(ans.replace(" ", "").split(",")[0]) # Select the 1st answer if multiple answers are provided
            for ans in examples['answer']
        ]
    },
    batched=True
)

dataset

In [None]:
import pickle
with open('ans_space.pkl', 'wb') as file:
    pickle.dump(answer_space, file)

In [None]:
print(torch.__version__)

### Look at some of the Question/Image/Answer combinations

In [None]:
from IPython.display import display

def showExample(train=True, id=None):
    if train:
        data = dataset["train"]
    else:
        data = dataset["test"]
    if id == None:
        id = np.random.randint(len(data))
    image = Image.open(os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "images", data[id]["image_id"] + ".png"))
    display(image)

    print("Question:\t", data[id]["question"])
    print("Answer:\t\t", data[id]["answer"], "(Label: {0})".format(data[id]["label"]))

In [None]:
showExample()

### Create a Multimodal Collator for the Dataset

In [None]:
from transformers import AutoTokenizer

model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

text = "What is on the left side of cabinet"
tokens = tokenizer(text)
input_ids = tokens["input_ids"]
attention_mask = tokens["attention_mask"]

print("Input IDs:", input_ids)
print("Attention Mask:", attention_mask)
print("token_type_ids:", tokens['token_type_ids'])

In [None]:
@dataclass
class MultimodalCollator:
    tokenizer: AutoTokenizer
    preprocessor: AutoFeatureExtractor

    def tokenize_text(self, texts: List[str]):
        encoded_text = self.tokenizer(
            text=texts,
            padding='longest',
            max_length=24,
            truncation=True,
            return_tensors='pt',
            return_token_type_ids=True,
            return_attention_mask=True,
        )
        return {
            "input_ids": encoded_text['input_ids'].squeeze(),
            "token_type_ids": encoded_text['token_type_ids'].squeeze(),
            "attention_mask": encoded_text['attention_mask'].squeeze(),
        }

    def preprocess_images(self, images: List[str]):
        processed_images = self.preprocessor(
            images=[Image.open(os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "images", image_id + ".png")).convert('RGB') for image_id in images],
            return_tensors="pt",
        )

        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }

    def __call__(self, raw_batch_dict):
        return {
            **self.tokenize_text(
                raw_batch_dict['question']
                if isinstance(raw_batch_dict, dict) else
                [i['question'] for i in raw_batch_dict]
            ),
            **self.preprocess_images(
                raw_batch_dict['image_id']
                if isinstance(raw_batch_dict, dict) else
                [i['image_id'] for i in raw_batch_dict]
            ),
            'labels': torch.tensor(
                raw_batch_dict['label']
                if isinstance(raw_batch_dict, dict) else
                [i['label'] for i in raw_batch_dict],
                dtype=torch.int64
            ),
        }

In [None]:
class MultimodalVQAModel(nn.Module):
    def __init__(
            self,
            num_labels: int = len(answer_space),
            intermediate_dim: int = 512,
            pretrained_text_name: str = 'bert-base-uncased',
            pretrained_image_name: str = 'google/vit-base-patch16-224-in21k'):

        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name

        self.text_encoder = AutoModel.from_pretrained(
            self.pretrained_text_name,
        )
        self.image_encoder = AutoModel.from_pretrained(
            self.pretrained_image_name,
        )
        self.fusion = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
        )

        self.classifier = nn.Linear(intermediate_dim, self.num_labels)

        self.criterion = nn.CrossEntropyLoss()

    def forward(
            self,
            input_ids: torch.LongTensor,
            pixel_values: torch.FloatTensor,
            attention_mask: Optional[torch.LongTensor] = None,
            token_type_ids: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):

        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        fused_output = self.fusion(
            torch.cat([encoded_text['pooler_output'], encoded_image['pooler_output'],],dim=1)
        )
        logits = self.classifier(fused_output)

        out = {
            "logits": logits
        }
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss

        return out

In [None]:
def createMultimodalVQACollatorAndModel(text='bert-base-uncased', image='google/vit-base-patch16-224-in21k'):
    tokenizer = AutoTokenizer.from_pretrained(text)
    preprocessor = AutoFeatureExtractor.from_pretrained(image)

    multi_collator = MultimodalCollator(
        tokenizer=tokenizer,
        preprocessor=preprocessor,
    )


    multi_model = MultimodalVQAModel(pretrained_text_name=text, pretrained_image_name=image).to(device)
    return multi_collator, multi_model

## Performance Metrics from Visual Question Answering

In [None]:
def wup_measure(a,b,similarity_threshold=0.925):
    """
    Returns Wu-Palmer similarity score.
    More specifically, it computes:
        max_{x \in interp(a)} max_{y \in interp(b)} wup(x,y)
        where interp is a 'interpretation field'
    """
    def get_semantic_field(a):
        weight = 1.0
        semantic_field = wordnet.synsets(a,pos=wordnet.NOUN)
        return (semantic_field,weight)


    def get_stem_word(a):
        """
        Sometimes answer has form word\d+:wordid.
        If so we return word and downweight
        """
        weight = 1.0
        return (a,weight)


    global_weight=1.0

    (a,global_weight_a)=get_stem_word(a)
    (b,global_weight_b)=get_stem_word(b)
    global_weight = min(global_weight_a,global_weight_b)

    if a==b:
        # they are the same
        return 1.0*global_weight

    if a==[] or b==[]:
        return 0


    interp_a,weight_a = get_semantic_field(a)
    interp_b,weight_b = get_semantic_field(b)

    if interp_a == [] or interp_b == []:
        return 0

    # we take the most optimistic interpretation
    global_max=0.0
    for x in interp_a:
        for y in interp_b:
            local_score=x.wup_similarity(y)
            if local_score > global_max:
                global_max=local_score

    # we need to use the semantic fields and therefore we downweight
    # unless the score is high which indicates both are synonyms
    if global_max < similarity_threshold:
        interp_weight = 0.1
    else:
        interp_weight = 1.0

    final_score=global_max*weight_a*weight_b*interp_weight*global_weight
    return final_score

In [None]:
def wup_measure(a,b,similarity_threshold=0.0):
    """
    Returns Wu-Palmer similarity score.
    More specifically, it computes:
        max_{x \in interp(a)} max_{y \in interp(b)} wup(x,y)
        where interp is a 'interpretation field'
    """
    def get_semantic_field(a):
        weight = 1.0
        semantic_field = wordnet.synsets(a,pos=wordnet.NOUN)
        return (semantic_field,weight)


    def get_stem_word(a):
        """
        Sometimes answer has form word\d+:wordid.
        If so we return word and downweight
        """
        weight = 1.0
        return (a,weight)


    global_weight=1.0

    (a,global_weight_a)=get_stem_word(a)
    (b,global_weight_b)=get_stem_word(b)
    global_weight = min(global_weight_a,global_weight_b)

    if a==b:
        # they are the same
        return 1.0*global_weight

    if a==[] or b==[]:
        return 0


    interp_a,weight_a = get_semantic_field(a)
    interp_b,weight_b = get_semantic_field(b)

    if interp_a == [] or interp_b == []:
        return 0

    # we take the most optimistic interpretation
    global_max=0.0
    for x in interp_a:
        for y in interp_b:
            local_score=x.wup_similarity(y)
            if local_score > global_max:
                global_max=local_score

    # we need to use the semantic fields and therefore we downweight
    # unless the score is high which indicates both are synonyms
    if global_max < similarity_threshold:
        interp_weight = 0.1
    else:
        interp_weight = 1.0

    final_score=global_max*weight_a*weight_b*interp_weight*global_weight
    return final_score

In [None]:
def batch_wup_measure(labels, preds, similarity_threshold=0.0):
    wup_scores = [wup_measure(answer_space[label], answer_space[pred], similarity_threshold=0.0) for label, pred in zip(labels, preds)]
    return np.mean(wup_scores)

In [None]:
labels = np.random.randint(len(answer_space), size=5)
preds = np.random.randint(len(answer_space), size=5)

def showAnswers(ids):
    print([answer_space[id] for id in ids])

showAnswers(labels)
showAnswers(preds)

print("Predictions vs Labels: ", batch_wup_measure(labels, preds, similarity_threshold=0.0))
print("Labels vs Labels: ", batch_wup_measure(labels, labels, similarity_threshold=0.0))

In [None]:
def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
    logits, labels = eval_tuple
    preds = logits.argmax(axis=-1)
    return {
        "wups": batch_wup_measure(labels, preds),
        "acc": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average='macro')
    }

## Model Training & Evaluation

### Define the Arguments needed for Training

In [None]:
args = TrainingArguments(
    output_dir="checkpoint",
    seed=12345,
    evaluation_strategy="steps",
    eval_steps=100,
    logging_strategy="steps",
    logging_steps=100,
    save_strategy="steps",
    save_steps=100,
    save_total_limit=3,             # Save only the last 3 checkpoints at any given time while training
    metric_for_best_model='wups',
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    remove_unused_columns=False,
    num_train_epochs=10,
    fp16=True,
#     warmup_ratio=0.01,
#     learning_rate=5e-4,
#     weight_decay=1e-4,
#     gradient_accumulation_steps=2,
    dataloader_num_workers=8,
    load_best_model_at_end=True,
)

### Create the Multimodal Models using User-Defined Text/Image  Transformers & Train it on the Dataset

In [None]:
def createAndTrainModel(dataset, args, text_model='bert-base-uncased', image_model='google/vit-base-patch16-224-in21k', multimodal_model='bert_vit'):
    collator, model = createMultimodalVQACollatorAndModel(text_model, image_model)

    multi_args = deepcopy(args)
    multi_args.output_dir = os.path.join("..", "checkpoint", multimodal_model)
    multi_trainer = Trainer(
        model,
        multi_args,
        train_dataset=dataset['train'],
        eval_dataset=dataset['test'],
        data_collator=collator,
        compute_metrics=compute_metrics
    )

    train_multi_metrics = multi_trainer.train()
    eval_multi_metrics = multi_trainer.evaluate()

    return collator, model, train_multi_metrics, eval_multi_metrics

In [None]:
collator, model, train_multi_metrics, eval_multi_metrics = createAndTrainModel(dataset, args)

In [None]:
eval_multi_metrics

## Examples of Model Inferencing

### Loading the Model from Checkpoint

In [None]:
model = MultimodalVQAModel()

# We use the checkpoint giving best results
model.load_state_dict(torch.load(os.path.join("..", "checkpoint", "bert_vit", "checkpoint-3100", "pytorch_model.bin")))
model.to(device)

In [None]:
# Assuming `model` is your trained multimodal model and `multimodal_model` is the name of your model
multimodal_model = "MM10"
torch.save(model.state_dict(), f"/kaggle/working/{multimodal_model}_model.pth")

In [None]:
%cd /kaggle/working
from IPython.display import FileLink
FileLink('MM10_model.pth')

<a href="/kaggle/working/MM10_model"> Download File </a>

In [None]:
sample = collator(dataset["test"][2000:2025])

input_ids = sample["input_ids"].to(device)
token_type_ids = sample["token_type_ids"].to(device)
attention_mask = sample["attention_mask"].to(device)
pixel_values = sample["pixel_values"].to(device)
labels = sample["labels"].to(device)
print(labels)

### Pass the Samples through the Model & inspect the Predictions

In [None]:
model.eval()
output = model(input_ids, pixel_values, attention_mask, token_type_ids, labels)

In [None]:
preds = output["logits"].argmax(axis=-1).cpu().numpy()
preds

In [None]:
for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes
    top5_preds = torch.topk(output["logits"], k=5, dim=1).indices.cpu().numpy()[0]

    # Convert indices to answer space
    top5_answers = [answer_space[pred] for pred in top5_preds]

    print("Top 5 Predicted Answers:")
    for j, answer in enumerate(top5_answers):
        print(f"{j+1}. {answer}")

    print("*********************************************************")


In [None]:
for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes and their corresponding logits
    top5_preds = torch.topk(output["logits"], k=5, dim=1)
    top5_indices = top5_preds.indices.cpu().numpy()[0]
    #top5_logits = top5_preds.values.cpu().numpy()[0]
    top5_logits = top5_preds.values.detach().cpu().numpy()[0]


    # Convert indices to answer space and print along with confidence
    print("Top 5 Predicted Answers with Confidence:")
    for j, (pred_idx, logit) in enumerate(zip(top5_indices, top5_logits)):
        answer = answer_space[pred_idx]
        print(f"{j+1}. {answer} (Confidence: {logit:.2f})")

    print("*********************************************************")


After using softmax to bound confidence

In [None]:
for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes and their corresponding logits
    top5_preds = torch.topk(output["logits"], k=5, dim=1)
    top5_indices = top5_preds.indices.cpu().numpy()[0]
    #top5_logits = top5_preds.values.cpu().numpy()[0]
    top5_logits = top5_preds.values.detach().cpu().numpy()[0]
    # Apply softmax to logits to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(top5_logits), dim=0)

    # Convert indices to answer space and print along with confidence
    print("Top 5 Predicted Answers with Confidence:")
    for j, (pred_idx, probability) in enumerate(zip(top5_indices, probabilities)):
        answer = answer_space[pred_idx]
        confidence = probability.item() * 100
        print(f"{j+1}. {answer} (Confidence: {confidence:.2f}%)")


    # Convert indices to answer space and print along with confidence
#     print("Top 5 Predicted Answers with Confidence:")
#     for j, (pred_idx, logit) in enumerate(zip(top5_indices, top5_logits)):
#         answer = answer_space[pred_idx]
#         print(f"{j+1}. {answer} (Confidence: {logit:.2f})")

#     print("*********************************************************")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Function to create colored progress bar
def progress_bar(percentage):
    bar_length = 20
    filled_length = int(bar_length * percentage / 100)
    bar = '[' + '=' * filled_length + ' ' * (bar_length - filled_length) + ']'
    return bar

for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes and their corresponding logits
    top5_preds = torch.topk(output["logits"], k=5, dim=1)
    top5_indices = top5_preds.indices.cpu().numpy()[0]
    #top5_logits = top5_preds.values.cpu().numpy()[0]
    top5_logits = top5_preds.values.detach().cpu().numpy()[0]
    # Apply softmax to logits to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(top5_logits), dim=0)


    # Convert indices to answer space and print along with confidence
    print("Top 5 Predicted Answers with Confidence:")
    for j, (pred_idx, probability) in enumerate(zip(top5_indices, probabilities)):
        answer = answer_space[pred_idx]
        confidence = probability.item() * 100
        print(f"{j+1}. {answer}:")
        print(f"   Confidence: {confidence:.2f}%")
        print("   Progress: " + progress_bar(confidence))


In [None]:
for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)
    print("Predicted Answer:\t", answer_space[preds[i-2000]])
    print("*********************************************************")

In [None]:
import numpy as np

# Function to create colored progress bar
def progress_bar(percentage):
    bar_length = 20
    filled_length = int(bar_length * percentage / 100)
    bar = '[' + '=' * filled_length + ' ' * (bar_length - filled_length) + ']'
    return bar

def colorize_progress_bar(percentage):
    color_map = [
        (80, '#'),  # Green for confidence >= 80%
        (60, '+'),  # Yellow for confidence >= 60%
        (0, '-')   # Red for confidence < 60%
    ]
    color_char = ''
    for color_thresh, char in color_map:
        if percentage >= color_thresh:
            color_char = char
            break
    return f"{color_char}{progress_bar(percentage)}"

for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes and their corresponding logits
    top5_preds = torch.topk(output["logits"], k=5, dim=1)
    top5_indices = top5_preds.indices.cpu().numpy()[0]
    #top5_logits = top5_preds.values.cpu().numpy()[0]
    top5_logits = top5_preds.values.detach().cpu().numpy()[0]
    # Apply softmax to logits to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(top5_logits), dim=0)

    # Convert indices to answer space and print along with confidence
    print("Top 5 Predicted Answers with Confidence:")
    for j, (pred_idx, probability) in enumerate(zip(top5_indices, probabilities)):
        answer = answer_space[pred_idx]
        confidence = probability.item() * 100
        print(f"{j+1}. {answer}:")
        print(f"   Confidence: {confidence:.2f}%")
        print("   Progress: " + colorize_progress_bar(confidence))



In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Function to create colored progress bar
def progress_bar(percentage):
    bar_length = 20
    filled_length = int(bar_length * percentage / 100)
    bar = '[' + '=' * filled_length + ' ' * (bar_length - filled_length) + ']'
    return bar

def colorize_progress_bar(percentage):
    color = 'green' if percentage >= 75 else ('orange' if percentage >= 50 else 'red')
    return f'\033[1;30;{color}m{progress_bar(percentage)}\033[m'

for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)

    # Get the model's output logits for the current example
    output = model(
        input_ids=input_ids[i-2000:i-2000+1],  # Select the current example
        pixel_values=pixel_values[i-2000:i-2000+1],
        attention_mask=attention_mask[i-2000:i-2000+1],
        token_type_ids=token_type_ids[i-2000:i-2000+1],
        labels=None  # No labels provided during inference
    )

    # Get the top 5 predicted classes and their corresponding logits
    top5_preds = torch.topk(output["logits"], k=5, dim=1)
    top5_indices = top5_preds.indices.cpu().numpy()[0]
    #top5_logits = top5_preds.values.cpu().numpy()[0]
    top5_logits = top5_preds.values.detach().cpu().numpy()[0]
    # Apply softmax to logits to get probabilities
    probabilities = torch.nn.functional.softmax(torch.tensor(top5_logits), dim=0)

    # Convert indices to answer space and print along with confidence
    print("Top 5 Predicted Answers with Confidence:")
    for j, (pred_idx, probability) in enumerate(zip(top5_indices, probabilities)):
        answer = answer_space[pred_idx]
        confidence = probability.item() * 100
        print(f"{j+1}. {answer}:")
        print(f"   Confidence: {confidence:.2f}%")
        print("   Progress: " + colorize_progress_bar(confidence))


In [None]:
from nltk.corpus import wordnet

def similarity(a, b):
    # Split words if it is a list and remove extra spaces
    words_a = [w.strip() for w in a.split(',')]
    words_b = [w.strip() for w in b.split(',')]

    # Split words if connected by underscore _
    a = [w_ for word in words_a for w_ in word.split('_')]
    b = [w_ for word in words_b for w_ in word.split('_')]

    res = 0
    n = 0

    # Calculate score and take average
    for i in a:
        synsets_i = wordnet.synsets(i)
        if synsets_i:
            s1 = synsets_i[0]
            for j in b:
                synsets_j = wordnet.synsets(j)
                if synsets_j:
                    s2 = synsets_j[0]
                    sim = s1.wup_similarity(s2)
                    if sim:
                        res += sim
                    n += 1

    return res / n if n != 0 else 0

# Show predictions for a range of examples
for i in range(2000, 2005):
    print("\n=========================================================\n")
    real_answer = showExample(train=False, id=i)
    predicted_answer = answer_space[preds[i - 2000]]
    print("Predicted Answer:\t", predicted_answer)
    print(f"Similarity: {similarity(real_answer, predicted_answer)}")
    print("\n=========================================================\n")

## Inspecting Model Size

In [None]:
def countTrainableParameters(model):
    num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("No. of trainable parameters:\t{0:,}".format(num_params))

In [None]:
countTrainableParameters(model) # For BERT-ViT model

In [None]:
# Assuming `model` is your trained multimodal model and `multimodal_model` is the name of your model
multimodal_model = "MM30"
torch.save(model.state_dict(), f"/kaggle/working/{multimodal_model}_model.pth")

In [None]:
checkpoint = {'model': MultimodalVQAModel(),
              'state_dict': model.state_dict()}

torch.save(checkpoint, '/kaggle/working/checkpoint.pth')

In [None]:
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath)
    model = checkpoint['model']
    model.load_state_dict(checkpoint['state_dict'])
    for parameter in model.parameters():
        parameter.requires_grad = False

    model.eval()

    return model

In [None]:
model1 = load_checkpoint('/kaggle/working/checkpoint.pth')
model1.to(device)
print(model1)

In [None]:
sample = collator(dataset["test"][2000:2025])

input_ids = input_ids.to(device)
pixel_values = pixel_values.to(device)
attention_mask = attention_mask.to(device)
token_type_ids = token_type_ids.to(device)
labels = labels.to(device)

In [None]:
model1.eval()
output = model(input_ids, pixel_values, attention_mask, token_type_ids, labels)

In [None]:
preds = output["logits"].argmax(axis=-1).cpu().numpy()
preds

In [None]:
for i in range(2000, 2025):
    print("*********************************************************")
    showExample(train=False, id=i)
    print("Predicted Answer:\t", answer_space[preds[i-2000]])
    print("*********************************************************")

In [None]:
import requests
from PIL import Image
from io import BytesIO

url = "https://images.unsplash.com/photo-1707817812089-586ca2bfe711?w=500&auto=format&fit=crop&q=60&ixlib=rb-4.0.3&ixid=M3wxMjA3fDB8MHxlZGl0b3JpYWwtZmVlZHwxOXx8fGVufDB8fHx8fA%3D%3D"
response = requests.get(url)
img = Image.open(BytesIO(response.content))
img

In [None]:
# Assuming your model requires a specific input size
img = img.resize((224, 224))
img

In [None]:
import torchvision.transforms as transforms

transform = transforms.Compose([
    transforms.ToTensor(),
    # Add other transformations if needed
])

input_image = transform(img).unsqueeze(0)  # Add batch dimension


In [None]:
from transformers import AutoTokenizer
from transformers import AutoFeatureExtractor
# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

import requests
from PIL import Image
from io import BytesIO

url = "https://images.unsplash.com/photo-1707817812089-586ca2bfe711?w=500&auto=format&fit=crop&q=60&ixlib=rb-4.0.3&ixid=M3wxMjA3fDB8MHxlZGl0b3JpYWwtZmVlZHwxOXx8fGVufDB8fHx8fA%3D%3D"
response = requests.get(url)
img = Image.open(BytesIO(response.content))
img = img.resize((224, 224))
# # Replace "your_image_url" with the URL of the image you want to use
# image_url = "your_image_url"

# # Download the image and preprocess it
# # ... (you need to implement this part based on your requirements)

# # Tokenize the text (you can use any text since you're working with an image)
# text = "How many people are there"
# inputs = tokenizer(text, return_tensors="pt")

# Assuming "pixel_values" is your preprocessed image tensor
# "attention_mask" and "token_type_ids" are obtained from tokenizer output
with torch.no_grad():
    output = model1(input_image, pixel_values, attention_mask, token_type_ids, labels)

# Process the output as needed

In [None]:
!pip install gradio

In [None]:
import gradio as gr

# Assuming `collator`, `model`, and `answer_space` are defined as in your previous code

def predict(input_text, input_image):
    # Tokenize input_text and process input_image
    sample = collator({"question": input_text, "image": input_image})

    input_ids = sample["input_ids"].to(device)
    token_type_ids = sample["token_type_ids"].to(device)
    attention_mask = sample["attention_mask"].to(device)
    pixel_values = sample["pixel_values"].to(device)

    model.eval()
    with torch.no_grad():
        output = model(input_ids, pixel_values, attention_mask, token_type_ids)

    # Get predicted answer index
    pred_index = output["logits"].argmax(axis=-1).item()

    return answer_space[pred_index]

iface = gr.Interface(fn=predict,
                     inputs=["text", "image"],
                     outputs="text",
                     live=True,
                     capture_session=True)

iface.launch()


In [None]:
!pip install --upgrade huggingface_hub

In [None]:
!pip install gradio==3.14.0 huggingface_hub==0.0.15

In [None]:
import gradio as gr
import torch

# Load the model
model = MultimodalVQAModel()
model.load_state_dict(torch.load("MM1_model.pth"))
model.eval()

# Assuming `collator` is your data collator function as defined earlier

def predict(input_text, input_image):
    # Tokenize input_text and process input_image
    sample = collator({"question": input_text, "image": input_image})

    input_ids = sample["input_ids"].to(device)
    token_type_ids = sample["token_type_ids"].to(device)
    attention_mask = sample["attention_mask"].to(device)
    pixel_values = sample["pixel_values"].to(device)

    with torch.no_grad():
        output = model(input_ids, pixel_values, attention_mask, token_type_ids)

    # Get predicted answer index
    pred_index = output["logits"].argmax(axis=-1).item()

    return answer_space[pred_index]

# Create Gradio interface
iface = gr.Interface(fn=predict,
                     inputs=["text", "image"],
                     outputs="text",
                     live=True,
                     capture_session=True
                     )

# Launch the Gradio interface
iface.launch(share=True)


In [None]:
!pip install streamlit

In [None]:
import streamlit as st
import torch

# Assuming `collator` is your data collator function as defined earlier
# Assuming `MultimodalVQAModel` is your model class

# Load the model
model = MultimodalVQAModel()
model.load_state_dict(torch.load("MM2_model.pth"))
model.eval()

# Define the prediction function
def predict(input_text, input_image):
    # Tokenize input_text and process input_image
    sample = collator({"question": input_text, "image": input_image})

    input_ids = sample["input_ids"].to(device)
    token_type_ids = sample["token_type_ids"].to(device)
    attention_mask = sample["attention_mask"].to(device)
    pixel_values = sample["pixel_values"].to(device)

    with torch.no_grad():
        output = model(input_ids, pixel_values, attention_mask, token_type_ids)

    # Get predicted answer index
    pred_index = output["logits"].argmax(axis=-1).item()

    return answer_space[pred_index]

# Streamlit app
def main():
    st.title("Multimodal QA Prediction")

    # Input components
    input_text = st.text_area("Enter Question:")
    input_image = st.file_uploader("Upload Image:", type=["jpg", "jpeg", "png"])

    # Make prediction on button click
    if st.button("Predict"):
        if input_text and input_image:
            result = predict(input_text, input_image)
            st.success(f"Predicted Answer: {result}")
        else:
            st.warning("Please provide both text and image inputs.")

# Run the Streamlit app
if __name__ == "__main__":
    main()


In [None]:
%%writefile app.py
import streamlit as st
st.write("""import streamlit as st
import torch

# Assuming `collator` is your data collator function as defined earlier
# Assuming `MultimodalVQAModel` is your model class

# Load the model
model = MultimodalVQAModel()
model.load_state_dict(torch.load("MM2_model.pth"))
model.eval()

# Define the prediction function
def predict(input_text, input_image):
    # Tokenize input_text and process input_image
    sample = collator({"question": input_text, "image": input_image})

    input_ids = sample["input_ids"].to(device)
    token_type_ids = sample["token_type_ids"].to(device)
    attention_mask = sample["attention_mask"].to(device)
    pixel_values = sample["pixel_values"].to(device)

    with torch.no_grad():
        output = model(input_ids, pixel_values, attention_mask, token_type_ids)

    # Get predicted answer index
    pred_index = output["logits"].argmax(axis=-1).item()

    return answer_space[pred_index]

# Streamlit app
def main():
    st.title("Multimodal QA Prediction")

    # Input components
    input_text = st.text_area("Enter Question:")
    input_image = st.file_uploader("Upload Image:", type=["jpg", "jpeg", "png"])

    # Make prediction on button click
    if st.button("Predict"):
        if input_text and input_image:
            result = predict(input_text, input_image)
            st.success(f"Predicted Answer: {result}")
        else:
            st.warning("Please provide both text and image inputs.")

# Run the Streamlit app
if __name__ == "__main__":
    main()
""")

In [None]:
!wget -q -O - https://loca.lt/mytunnelpassword

In [None]:
!streamlit run app.py & npx localtunnel --port 8502

In [None]:
!pip install streamlit
!npm install localtunnel

In [None]:
# Your public ip is the password to the localtunnel
!curl ipv4.icanhazip.com

In [None]:
!streamlit run app.py &>./logs.txt & npx localtunnel --port 8501

In [None]:
class MultimodalVQAModel(nn.Module):
    def __init__(
            self,
            num_labels: int = len(answer_space),
            intermediate_dim: int = 512,
            pretrained_text_name: str = 'bert-base-uncased',
            pretrained_image_name: str = 'google/vit-base-patch16-224-in21k'):

        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name

        self.text_encoder = AutoModel.from_pretrained(
            self.pretrained_text_name,
        )
        self.image_encoder = AutoModel.from_pretrained(
            self.pretrained_image_name,
        )
        self.fusion = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
        )

        self.classifier = nn.Linear(intermediate_dim, self.num_labels)

        self.criterion = nn.CrossEntropyLoss()

    def forward(
            self,
            input_ids: torch.LongTensor,
            pixel_values: torch.FloatTensor,
            attention_mask: Optional[torch.LongTensor] = None,
            token_type_ids: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):

        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        fused_output = self.fusion(
            torch.cat([encoded_text['pooler_output'], encoded_image['pooler_output'],],dim=1)
        )
        logits = self.classifier(fused_output)

        out = {
            "logits": logits
        }
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss

        return out