# Import libraries

In [1]:
import pandas as pd
import json
from collections import Counter
from transformers import ViltConfig, AutoImageProcessor, ViTModel,  BertTokenizer, BertModel
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import f1_score
from sklearn.datasets import make_classification
import os
from PIL import Image
import numpy as np
from vqa import VQA
import random
import skimage.io as io
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torchvision.transforms.functional import to_pil_image
from tqdm import tqdm


  from .autonotebook import tqdm as notebook_tqdm


# Prepare datasets

In [2]:
with open('.\datasets\Questions_Train_abstract_v002\MultipleChoice_abstract_v002_train2015_questions.json') as json_file:
    data_questions = json.load(json_file)

    # print the type of data variable
    print("Type: ", type(data_questions))

data_questions['questions'][48]


Type:  <class 'dict'>


{'image_id': 15704,
 'question': "Does the woman's dentist recommend this dessert?",
 'multiple_choices': ['deer and squirrel',
  'ground',
  'closet',
  'sitting',
  'being carried',
  '2',
  'red',
  'blue',
  'office supplies',
  'oval',
  '3',
  '1',
  '4',
  'no',
  'yes',
  'white',
  'yellow',
  'floor'],
 'question_id': 157040}

In [3]:
with open('./datasets/Annotations_Train_abstract_v002/abstract_v002_train2015_annotations.json') as json_file:
    data_annotations = json.load(json_file)

    # print the type of data variable
    print("Type: ", type(data_annotations))

question_types = []
does = []
for idx in range(len(data_annotations['annotations'])):
    # if data_annotations['annotations'][idx]['question_type'] == 'does the':
    #     does.append(data_annotations['annotations'][idx])
    #     break
    question_types.append(data_annotations['annotations'][idx]['question_type'])

    
Counter(question_types)

Type:  <class 'dict'>


Counter({'how many': 5956,
         'what color is the': 4184,
         'is the': 3436,
         'where is the': 2621,
         'what': 2329,
         'what is': 2114,
         'are the': 1759,
         'what is the': 1758,
         'is there a': 1417,
         'does the': 1406,
         'none of the above': 1387,
         'is the woman': 1341,
         'is the man': 1273,
         'what is on the': 1237,
         'is it': 930,
         'is the girl': 904,
         'is the boy': 850,
         'is the dog': 845,
         'are they': 834,
         'who is': 775,
         'what kind of': 759,
         'what color are the': 757,
         'what is in the': 742,
         'what is the man': 724,
         'is there': 696,
         'what is the woman': 684,
         'what are the': 627,
         'what is the boy': 597,
         'are there': 577,
         'what is the girl': 556,
         'is this': 547,
         'how': 533,
         'which': 524,
         'how many people are': 511,
         'i

In [4]:
def load_image(folder):
    images = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        try:
            with Image.open(img_path) as img:
                images.append(img.copy())
        except (IOError, FileNotFoundError):
            print(f"Error opening {filename}")
    return images

    

In [5]:
image = r'.\datasets\images\scene_img_abstract_v002_train2015\abstract_v002_train2015_000000015704.png'
# images = load_image(image)117791


In [6]:
img = Image.open(image)


# ViT model

In [7]:
def img_to_tensor(img: str = "pali.png", img_size: int = 224):
    # Load image
    image = Image.open(img)

    # Define a transforms to convert the image to a tensor and apply preprocessing
    transform = transforms.Compose(
        [
            transforms.Lambda(lambda image: image.convert("RGB")),
            transforms.Resize((img_size, img_size)),  # Resize the image to 256x256
            transforms.ToTensor(),  # Convert the image to a tensor,
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
            ),  # Normalize the pixel values
        ]
    )

    # apply transforms to the image
    x = transform(image)
    return x

In [8]:
# call model and processor
img_model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
input = img_to_tensor(image, img_size=224)
# with torch.no_grad():   
#     img_output = img_model(input)

# img_output.last_hidden_state.shape

# Text model 

In [9]:
tokenizer = BertTokenizer.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')
text_model = BertModel.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')

In [10]:
text = ""

for word in data_questions['questions'][2]['multiple_choices'][:-1]:
    text += word + " , "
text += data_questions['questions'][2]['multiple_choices'][-1]

text

'away , yes , blue , 1 , 2 , mouse , couch , no , yellow , it belongs , 4 , red , on chair , his friend is coming over , 3 , white , bench , chair'

In [11]:
text_embedding = tokenizer(
    text=f"{data_questions['questions'][1]['question']} multiple choices: {text}",
    padding='max_length',
    max_length=60,
    return_tensors='pt'
    )


with torch.no_grad():
    text_output = text_model(**text_embedding)

text_output.last_hidden_state.shape




torch.Size([1, 60, 1024])

In [12]:
class MultimodalModel(nn.Module):
    def __init__(self,  img_dim, text_dim, seq_len_img, seq_len_text, output_dim):
        super(MultimodalModel, self).__init__()
        
        # Reducing dimension of text to match image dimensions if needed
        self.text_dim_reducer = nn.Linear(text_dim, img_dim)
        self.adaptive_pool = nn.AdaptiveMaxPool1d(seq_len_text)
        
        # Changing the size of image embedding to match text sequence length
        self.img_dim_matcher = nn.Linear(768, 1024)  # Assuming original img_dim is 768 and target is 1024
        # MLP for classification or regression
        self.mlp = nn.Sequential(
            nn.Flatten(),
            nn.Linear(img_dim * seq_len_text, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )

    def forward(self, img_embedding, text_embedding, labels):
        # Adjust the image embedding dimensions
        img_embedding_adjusted = self.img_dim_matcher(img_embedding)
        # Apply adaptive pooling to match sequence length
        img_embedding_pooled = self.adaptive_pool(img_embedding_adjusted.transpose(1, 2)).transpose(1, 2)

        # Optionally reduce dimension of text embeddings if necessary
        text_embedding_reduced = self.text_dim_reducer(text_embedding)
        
        # Combine embeddings and process through MLP
        combined_embedding = img_embedding_pooled * text_embedding_reduced
        output = self.mlp(combined_embedding)
        

        return output

# Dataloader

In [13]:
image_dim = 1024
text_dim = 1024
seq_len_image = 197  
seq_len_text = 512
output_dim = 18

model = MultimodalModel(image_dim, text_dim, seq_len_image, seq_len_text, output_dim)
img_model.to('cuda:0')
text_model.to('cuda:0')

class CustomDataset(Dataset):
    def __init__(self, questions, annotations, tokenizer, img_transform, img_dir):
        self.questions = questions
        self.annotations = annotations
        self.tokenizer = tokenizer
        self.img_transform = img_transform
        self.img_dir = img_dir

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        question_data = self.questions[idx]
        annotation_data = self.annotations[idx]

        text = ", ".join(question_data['multiple_choices'])
        text_embedding = self.tokenizer(
            text=f"{question_data['question']} multiple choices: {text}",
            padding='max_length',
            max_length=512,
            return_tensors='pt'
        )

        label = question_data['multiple_choices'].index(annotation_data['multiple_choice_answer'])
        label = torch.tensor(label, dtype=torch.long)

        image_num = question_data['image_id']
        image_path = f'{self.img_dir}/abstract_v002_train2015_{image_num:012}.png'
        image_input = self.img_transform(image_path)

        return text_embedding, image_input, label



def train(epoch, batch_size=32):
    model.to('cuda:0')
    best = 0
    loss_list = []
    f1_list = []
    dev_loss_list = []
    dev_f1_list = []

    PATH = './trained_model'
    optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, eps=1e-8)

    # Create training and validation datasets and dataloaders
    train_dataset = CustomDataset(data_questions['questions'][:20000], data_annotations['annotations'][:20000], tokenizer, img_to_tensor, './datasets/images/scene_img_abstract_v002_train2015')
    dev_dataset = CustomDataset(data_questions['questions'][20000:22000], data_annotations['annotations'][20000:22000], tokenizer, img_to_tensor, './datasets/images/scene_img_abstract_v002_train2015')

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

    for epoch_num in range(epoch):
        model.train()
        total_loss = 0
        total_f1 = 0

        train_pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Training Epoch {epoch_num+1}")

        for batch_idx, (text_embeddings, image_inputs, labels) in train_pbar:
            model.zero_grad()

            text_embeddings = {k: v.squeeze(1).to('cuda:0') for k, v in text_embeddings.items()}
            labels = labels.to('cuda:0')
            image_inputs = image_inputs.to('cuda:0')

            with torch.no_grad():
                text_output = text_model(**text_embeddings).last_hidden_state.to('cuda:0')
                img_output = img_model(image_inputs).last_hidden_state.to('cuda:0')

            output = model(img_output, text_output, labels).to('cuda:0')
            loss_cf = nn.CrossEntropyLoss()
            loss = loss_cf(output, labels)
            f1 = f1_score(labels.cpu().numpy(), torch.argmax(F.softmax(output, dim=1).to('cuda:0'), dim=1).cpu().numpy(), average='macro', zero_division=0)
            total_loss += loss.item()
            total_f1 += f1

            loss.backward()
            optimizer.step()

            train_pbar.set_postfix({"loss": loss.item(), "f1": f1})

        avg_f1 = total_f1 / len(train_loader)
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch: {epoch_num+1}, Loss: {avg_loss}, f1: {avg_f1}")
        loss_list.append(avg_loss)
        f1_list.append(avg_f1)

        model.eval()
        total_dev_loss = 0
        total_dev_f1 = 0

        dev_pbar = tqdm(enumerate(dev_loader), total=len(dev_loader), desc=f"Validation Epoch {epoch_num+1}")

        with torch.no_grad():
            for batch_idx, (text_embeddings, image_inputs, labels) in dev_pbar:
                text_embeddings = {k: v.squeeze(1).to('cuda:0') for k, v in text_embeddings.items()}
                labels = labels.to('cuda:0')
                image_inputs = image_inputs.to('cuda:0')

                text_output = text_model(**text_embeddings).last_hidden_state.to('cuda:0')
                img_output = img_model(image_inputs).last_hidden_state.to('cuda:0')

                output = model(img_output, text_output, labels).to('cuda:0')
                loss = loss_cf(output, labels)
                f1 = f1_score(labels.cpu().numpy(), torch.argmax(F.softmax(output, dim=1).to('cuda:0'), dim=1).cpu().numpy(), average='macro', zero_division=0)
                total_dev_loss += loss.item()
                total_dev_f1 += f1

                dev_pbar.set_postfix({"loss": loss.item(), "f1": f1})

        dev_f1 = total_dev_f1 / len(dev_loader)
        dev_loss = total_dev_loss / len(dev_loader)
        dev_loss_list.append(dev_loss)
        dev_f1_list.append(dev_f1)

        print(f"Epoch: {epoch_num+1}, dev loss: {dev_loss}, dev f1: {dev_f1}")
        if dev_f1 > best:
            best = dev_f1
            print('Save model....')
            torch.save(model.state_dict(), PATH)
                
    for i in range(len(loss_list)):
        print(f'epoch {i+1}: \n\ttrain_loss: {loss_list[i]}, train_f1:{f1_list[i]}\n\tdev_loss: {dev_loss_list[i]}, dev_f1:{dev_f1_list[i]}')

In [15]:
train(30)

Training Epoch 1:   0%|          | 1/625 [00:09<1:42:37,  9.87s/it, loss=2.92, f1=0.0131]


KeyboardInterrupt: 

In [None]:
image_dim = 1024
text_dim = 1024
seq_len_image = 197  
seq_len_text = 512
output_dim = 18

def test():
    test_model = MultimodalModel(image_dim, text_dim, seq_len_image, seq_len_text, output_dim)
    test_model.load_state_dict(torch.load('./trained_model'))
    test_model.to('cuda:0')

    test_dataset = CustomDataset(data_questions['questions'][40000:42000], data_annotations['annotations'][40000:42000], tokenizer, img_to_tensor, './datasets/images/scene_img_abstract_v002_train2015')
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    total_loss = 0
    total_f1 = 0
    loss_cf = nn.CrossEntropyLoss()

    with torch.no_grad():
        test_pbar = tqdm(enumerate(test_loader), total=len(test_loader), desc="Testing")

        for batch_idx, (text_embeddings, image_inputs, labels) in test_pbar:
            test_model.eval()

            text_embeddings = {k: v.squeeze(1).to('cuda:0') for k, v in text_embeddings.items()}
            labels = labels.to('cuda:0')
            image_inputs = image_inputs.to('cuda:0')

            text_output = text_model(**text_embeddings).last_hidden_state.to('cuda:0')
            img_output = img_model(image_inputs).last_hidden_state.to('cuda:0')

            output = test_model(img_output, text_output, labels).to('cuda:0')
            softmax = nn.Softmax(dim=1)
            loss = loss_cf(output, labels)
            f1 = f1_score(labels.cpu().numpy(), torch.argmax(softmax(output).to('cuda:0'), dim=1).cpu().numpy(), average='macro', zero_division=0)
            total_loss += loss.item()
            total_f1 += f1

            test_pbar.set_postfix({"loss": loss.item(), "f1": f1})

    test_f1 = total_f1 / len(test_loader)
    test_loss = total_loss / len(test_loader)
    print(f"Loss: {test_loss}, f1: {test_f1}")

test()


Testing:   0%|          | 0/63 [00:15<?, ?it/s]


KeyboardInterrupt: 