# **Importing All Libraries**

In [None]:
!pip install openai-clip

Collecting openai-clip
  Downloading openai-clip-1.0.1.tar.gz (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ftfy (from openai-clip)
  Downloading ftfy-6.2.0-py3-none-any.whl (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.4/54.4 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: openai-clip
  Building wheel for openai-clip (setup.py) ... [?25l[?25hdone
  Created wheel for openai-clip: filename=openai_clip-1.0.1-py3-none-any.whl size=1368605 sha256=be93fb4297f69cc75429ff09500fcd6c51eaded98e230801d5094a98e5ebe7e5
  Stored in directory: /root/.cache/pip/wheels/08/77/8e/8d2f862df6bf7fb4e2007062d2cbaeae49862ec7b56d041229
Successfully built openai-clip
Installing collected packages: ftfy, openai-clip
Successfully installed ftfy-6.2.0 openai-clip-1.0.1


In [None]:
# Essential imports
import os
import zipfile
import urllib.request

import json
from io import open

# PyTorch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, Dataset

# Transformers and CLIP imports
import clip
from transformers import CLIPProcessor, CLIPModel, GPT2Tokenizer, GPT2LMHeadModel, GPT2Config
from torchvision import transforms
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm

# PIL import
from PIL import Image

# Google Colab import
from google.colab import drive

# **Necessary Setup**

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def download_and_unzip(url, extract_to='.'):
    filename = os.path.basename(url)
    print(f"Downloading {filename}...")
    urllib.request.urlretrieve(url, filename)
    print(f"Downloaded {filename}")

    print(f"Unzipping {filename}...")
    with zipfile.ZipFile(filename, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Unzipped {filename}")

    print(f"Deleting {filename}...")
    os.remove(filename)
    print(f"Deleted {filename}")

train_images_link = "http://images.cocodataset.org/zips/train2014.zip"
val_images_link = "http://images.cocodataset.org/zips/val2014.zip"
test_images_link = "http://images.cocodataset.org/zips/test2015.zip"
download_and_unzip(train_images_link, 'train2014')
# download_and_unzip(val_images_link, 'val2014')
# download_and_unzip(test_images_link, 'test2015')

train_questions_link = "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Train_mscoco.zip"
val_questions_link = "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Val_mscoco.zip"
test_questions_link = "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Questions_Test_mscoco.zip"
download_and_unzip(train_questions_link, 'train_questions')
# download_and_unzip(val_questions_link, 'val_questions')
# download_and_unzip(test_questions_link, 'test_questions')

train_annotations_link = "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Annotations_Train_mscoco.zip"
val_annotations_link = "https://s3.amazonaws.com/cvmlp/vqa/mscoco/vqa/v2_Annotations_Val_mscoco.zip"
download_and_unzip(train_annotations_link, 'train_annotations')
# download_and_unzip(val_annotations_link, 'val_annotations')

print("All files downloaded, unzipped, and cleaned up successfully.")

Downloading train2014.zip...
Downloaded train2014.zip
Unzipping train2014.zip...
Unzipped train2014.zip
Deleting train2014.zip...
Deleted train2014.zip
Downloading v2_Questions_Train_mscoco.zip...
Downloaded v2_Questions_Train_mscoco.zip
Unzipping v2_Questions_Train_mscoco.zip...
Unzipped v2_Questions_Train_mscoco.zip
Deleting v2_Questions_Train_mscoco.zip...
Deleted v2_Questions_Train_mscoco.zip
Downloading v2_Annotations_Train_mscoco.zip...
Downloaded v2_Annotations_Train_mscoco.zip
Unzipping v2_Annotations_Train_mscoco.zip...
Unzipped v2_Annotations_Train_mscoco.zip
Deleting v2_Annotations_Train_mscoco.zip...
Deleted v2_Annotations_Train_mscoco.zip
All files downloaded, unzipped, and cleaned up successfully.


In [None]:
# SETTING UP PATHS
train_images_path = "/content/train2014/train2014"
val_images_path = "/content/val2014/val2014"
test_images_path = "/content/test2015/test2015"

train_questions_path = "/content/train_questions/v2_OpenEnded_mscoco_train2014_questions.json"
val_questions_path = "/content/val_questions/v2_OpenEnded_mscoco_val2014_questions.json"
test_questions_path = "/content/test_questions/v2_OpenEnded_mscoco_test2015_questions.json"
test_questions_dev_path = "/content/test_questions/v2_OpenEnded_mscoco_test-dev2015_questions.json"

train_annotations_path = "/content/train_annotations/v2_mscoco_train2014_annotations.json"
val_annotations_path = "/content/val_annotations/v2_mscoco_val2014_annotations.json"

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Get LISTS of Train and Val Answers
with open(train_annotations_path, 'r') as f:
    train_annotations = json.load(f)

# with open(val_annotations_path, 'r') as f:
#     val_annotations = json.load(f)

list_train_annotations = train_annotations['annotations']
# list_val_annotations = val_annotations['annotations']

# the extracted lists contain the answers of an image, the question_id and the image_id
extracted_train_annotations =  [{'answers': item['answers'], 'image_id': item['image_id'], 'question_id': item['question_id']} for item in list_train_annotations]
# extracted_val_annotations = [{'answers': item['answers'], 'image_id': item['image_id'], 'question_id': item['question_id']} for item in list_val_annotations]

In [None]:
with open(train_questions_path, 'r') as f:
    train_questions = json.load(f)
# with open(val_questions_path, 'r') as f:
#     val_questions = json.load(f)
# with open(test_questions_path, 'r') as f:
#     test_questions = json.load(f)

list_train_questions = train_questions['questions']
# list_val_questions = val_questions['questions']
# list_test_questions = test_questions['questions']

In [None]:
extracted_train_annotations[0]

{'answers': [{'answer': 'net', 'answer_confidence': 'maybe', 'answer_id': 1},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 2},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 3},
  {'answer': 'netting', 'answer_confidence': 'yes', 'answer_id': 4},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 5},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 6},
  {'answer': 'mesh', 'answer_confidence': 'maybe', 'answer_id': 7},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 8},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 9},
  {'answer': 'net', 'answer_confidence': 'yes', 'answer_id': 10}],
 'image_id': 458752,
 'question_id': 458752000}

In [None]:
list_train_questions[1]

{'image_id': 458752,
 'question': 'What position is this man playing?',
 'question_id': 458752001}

In [None]:
def idAsDicKey(questions_list):

  questions_dict = {}
  for sample in questions_list:
    questions_dict[sample["question_id"]] = {"question": sample["question"] ,"image_id":sample["image_id"] }

  return questions_dict

In [None]:
train_questions_dict = idAsDicKey(list_train_questions)

# **Dataset and Dataloaders**

In [None]:
MAX_LENGTH_ANNOTATIONS = 7
MAX_LENGTH_QUESTIONS = 15

In [None]:
class VqaDataset(Dataset):
  def __init__(self,base_img_path,num_samples,annotation_list,questions_dict):


    super(VqaDataset, self).__init__()
    self.num_samples = num_samples
    self.annotations = annotation_list
    self.questions_dict = questions_dict
    self.base_img_path = base_img_path
    self.total_id_len = len("000000000009")

    self.clip_encoder, self.preprocess = clip.load("ViT-B/32",device=device)



  def __len__(self):
      """
      Returns the total number of samples in the dataset.
      """
      return self.num_samples

  def __getitem__(self, idx):


      sample = self.annotations[idx]
      answer = sample["answers"][0]["answer"]

      question_id = sample["question_id"]
      question = self.questions_dict[question_id]


      image_id = question["image_id"]
      image_id = (self.total_id_len - len(str(image_id))) * '0' + str(image_id)
      image_path = self.base_img_path + str(image_id) +'.jpg'
      pilImg = Image.open(image_path).convert('L')

      img = self.preprocess(pilImg).unsqueeze(0).to(device)

      question = clip.tokenize(question["question"]).to(device)




      with torch.no_grad():
        img_features = self.clip_encoder.encode_image(img).squeeze()
        question_features = self.clip_encoder.encode_text(question).squeeze()



      return img_features, question_features,[answer]

# **Model Architecture**

In [None]:
class VQAModel(nn.Module):
    def __init__(self, gpt2_model_name="gpt2", num_unfrozen_layers=5):
        super(VQAModel, self).__init__()

        # Load GPT-2 model and tokenizer
        self.gpt2_model = GPT2LMHeadModel.from_pretrained(gpt2_model_name)
        self.gpt2_tokenizer = GPT2Tokenizer.from_pretrained(gpt2_model_name)
        self.gpt2_tokenizer = GPT2Tokenizer.from_pretrained(gpt2_model_name)


        self.gpt2_tokenizer.pad_token = self.gpt2_tokenizer.eos_token
        config = GPT2Config.from_pretrained(gpt2_model_name)

        for param in self.gpt2_model.parameters():
            param.requires_grad = False

        # Unfreeze last few layers
        for name, param in self.gpt2_model.named_parameters():
            if any(layer in name for layer in [f"layer.{i}" for i in range(config.n_layer - num_unfrozen_layers, config.n_layer)]):
                param.requires_grad = True

        # Linear layer to project 1024-dim concatenated features to GPT-2's embedding size
        self.projection = nn.Linear(1024, self.gpt2_model.config.n_embd)

        # Define the loss function
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, combined_features, target_strings=None, max_length=MAX_LENGTH_ANNOTATIONS):
        # Project the combined features to match GPT-2's embedding size
        projected_features = self.projection(combined_features)  # Shape: [batch_size, n_embd]

        # Initialize the GcPT-2 model input with the projected features
        model_inputs = projected_features.unsqueeze(1)  # Shape: [batch_size, 1, n_embd]

        generated_tokens = []
        all_logits = []

        for _ in range(max_length):
            outputs = self.gpt2_model(inputs_embeds=model_inputs)
            logits = outputs.logits[:, -1, :]
            next_token = torch.argmax(logits, dim=-1)
            generated_tokens.append(next_token)
            all_logits.append(logits)

            # Update model inputs
            next_token_embeds = self.gpt2_model.transformer.wte(next_token)
            model_inputs = torch.cat((model_inputs, next_token_embeds.unsqueeze(1)), dim=1)

            if (next_token == self.gpt2_tokenizer.eos_token_id).all():
                break

        # Stack generated tokens and logits
        generated_tokens = torch.stack(generated_tokens, dim=1)  # Shape: [batch_size, seq_len]
        all_logits = torch.stack(all_logits, dim=1)  # Shape: [batch_size, seq_len, vocab_size]

        # Pad logits to max_length if needed
        if all_logits.size(1) < max_length:
            pad_size = max_length - all_logits.size(1)
            pad_logits = torch.full((all_logits.size(0), pad_size, all_logits.size(2)), float('-inf'), device=all_logits.device)
            all_logits = torch.cat((all_logits, pad_logits), dim=1)

        # Decode generated tokens to get the answer
        decoded_answers = [self.gpt2_tokenizer.decode(tokens, skip_special_tokens=True) for tokens in generated_tokens]

        # Calculate the loss if target strings are provided
        if target_strings is not None:
            # Tokenize the target strings
            target_tokens = self.gpt2_tokenizer(target_strings, padding='max_length', truncation=True, max_length=max_length, return_tensors="pt")
            target_ids = target_tokens.input_ids.to(combined_features.device)  # Shape: [batch_size, seq_len]

            # Flatten the logits and target ids for loss computation
            loss = self.loss_fn(all_logits.view(-1, all_logits.size(-1)), target_ids.view(-1))
        else:
            loss = None

        return decoded_answers, loss



# **Loss Function**
*not used*

In [None]:
clip_model_name="openai/clip-vit-base-patch32"
clip_encoder = CLIPModel.from_pretrained(clip_model_name)
clip_processor = CLIPProcessor.from_pretrained(clip_model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/862k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

In [None]:
def compute_loss(generated_answers, real_answers):
  # print("generated ",generated_answers)
  # print("real ",real_answer)
  # Encode generated and real answers using CLIP text encoder
  generated_inputs =clip_processor(text=generated_answers, return_tensors="pt", padding=True, truncation=True)
  real_inputs = clip_processor(text=real_answers, return_tensors="pt", padding=True, truncation=True)

  with torch.no_grad():  # No need to compute gradients for CLIP encoding
      generated_features = clip_encoder.get_text_features(**generated_inputs)
      real_features = clip_encoder.get_text_features(**real_inputs)

  # Compute cosine similarity
  cosine_sim = cosine_similarity(generated_features, real_features)

  # Compute the loss (1 - cosine similarity)
  loss = 1 - cosine_sim

  # Return the mean loss over the batch
  return loss.mean()

# **Training**

In [None]:
def save_checkpoint(model, optimizer, loss, path="/content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint.pth"):
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss
    }
    torch.save(checkpoint, path)
    print(f"Checkpoint saved to {path}")

In [None]:
from tqdm import tqdm

def train_epoch(dataloader, c_epoch, decoder,decoder_optimizer):

    total_loss = 0
    for data in tqdm(dataloader, desc=f"Epoch {c_epoch}"):
        # image , question , answer
        image_tensor, question_tensor, answer_list = data


        answer_list = answer_list[0]

        image_tensor = image_tensor.to(device)
        question_tensor = question_tensor.to(device)

        decoder_optimizer.zero_grad()


        fused_input = torch.cat((image_tensor,question_tensor),dim=1).to(torch.float32)

        outputs,loss = decoder(fused_input,answer_list)


        loss.backward()

        decoder_optimizer.step()



        total_loss += loss.item()
    print('truth -> ',answer_list, "preds -> ",outputs)
    print("total loss ",total_loss)

    return total_loss / len(dataloader)

In [None]:
def train(train_dataloader, decoder, n_epochs, learning_rate=0.001,
               print_every=100, plot_every=100,load_checkpoint=False):


    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)

    if(load_checkpoint==True):

      chkpnt = load_from_checkpoint()

      decoder.load_state_dict(chkpnt["model_state_dict"])
      decoder_optimizer.load_state_dict(chkpnt["optimizer_state_dict"])
      plot_losses = chkpnt["loss"]



    for epoch in range(1, n_epochs + 1):
        loss = train_epoch(train_dataloader, epoch , decoder, decoder_optimizer)
        print_loss_total += loss
        plot_loss_total += loss

        print("loss ",loss)
        save_checkpoint(decoder, decoder_optimizer, plot_losses,path=f"/content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint_{epoch}.pth")

In [None]:
def load_from_checkpoint(path="/content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint.pth"):
  chkpnt = torch.load(path)
  print("loaded from checkpoint ",path)
  return chkpnt

In [None]:
dataset = VqaDataset("/content/train2014/train2014/COCO_train2014_",80000,extracted_train_annotations,train_questions_dict)

100%|███████████████████████████████████████| 338M/338M [00:13<00:00, 25.9MiB/s]


In [None]:
trainloader = DataLoader(dataset,batch_size=16)

In [None]:
decoder = VQAModel().to(device)

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

In [None]:
train(trainloader, decoder, 50, print_every=5, plot_every=5,load_checkpoint=True)

loaded from checkpoint  /content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint.pth


Epoch 1: 100%|██████████| 5000/5000 [1:01:11<00:00,  1.36it/s]


truth ->  ('up', 'spoon', 'yes', 'back', 'dessert', 'no', 'on table', 'no', 'no', 'no', 'glass', 'pudding', 'cake', 'souffle', 'desert', 'no') preds ->  ['down', 'knife', 'no', 's', 's', 'no', 'on', 'no', 's', 'no', 'sand', 'sand', 'cake', 's', 'break', 'no']
total loss  nan
loss  nan
Checkpoint saved to /content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint_1.pth


Epoch 2: 100%|██████████| 5000/5000 [1:02:09<00:00,  1.34it/s]


truth ->  ('up', 'spoon', 'yes', 'back', 'dessert', 'no', 'on table', 'no', 'no', 'no', 'glass', 'pudding', 'cake', 'souffle', 'desert', 'no') preds ->  ['down', 'knife', 'no', 's', 's', 'no', 'plate', 'no', 's', 'no', 'glass', 'sand', 'cake', 's', 'break', 'no']
total loss  nan
loss  nan
Checkpoint saved to /content/drive/MyDrive/VQA_Checkpoints/80000_last_checkpoint_2.pth


Epoch 3:  41%|████      | 2059/5000 [26:23<43:24,  1.13it/s]

# evaluation

In [None]:
def evaluate(eval_loader, decoder):
    decoder.eval()
    with torch.no_grad():
        sum_acc = 0
        total_loss = 0  # Initialize total_loss

        for data in tqdm(eval_loader, desc="Evaluation"):
            image_tensor, question_tensor, answer_list = data
            answer_list = answer_list[0]  # Assuming answer_list is a list of tensors

            image_tensor = image_tensor.to(device)
            question_tensor = question_tensor.to(device)

            fused_input = torch.cat((image_tensor, question_tensor), dim=1).to(torch.float32)

            outputs, loss = decoder(fused_input, answer_list)

            acc = get_acc(outputs, answer_list)

            sum_acc += acc
            total_loss += loss.item()  # Accumulate the loss

        avg_loss = total_loss / len(eval_loader)
        avg_acc = sum_acc / len(eval_loader)
        print("Average Loss:", avg_loss)
        print("Average Accuracy:", avg_acc)

        return avg_loss, avg_acc

def get_acc(y, y_hat,threshold=0.6):
    generated_inputs = clip_processor(text=y, return_tensors="pt", padding=True, truncation=True)
    real_inputs = clip_processor(text=y_hat, return_tensors="pt", padding=True, truncation=True)

    with torch.no_grad():
        generated_features = clip_encoder.get_text_features(**generated_inputs)
        real_features = clip_encoder.get_text_features(**real_inputs)

    cosine_sim = cosine_similarity(generated_features, real_features)

    acc = (cosine_sim > threshold).sum().item()  # Compute accuracy over the batch
    return acc / len(y_hat)  # Return the mean accuracy
