In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
torch.cuda.empty_cache()

In [None]:
# List the contents of the uploaded directory
import os

directory_path = '/content/drive/MyDrive/DAQUAR'
contents = os.listdir(directory_path)
print(contents)

# Access a specific file within the directory
file_path = os.path.join(directory_path, 'answer_space.txt')
with open(file_path, 'r') as file:
    data = file.read()



['new_data_val.csv', 'new_data_test.csv', 'new_data_train.csv', 'all_qa_pairs.txt', 'answer_space.txt', 'test_images_list.txt', 'train_images_list.txt', 'images']


In [None]:
import os
import pandas as pd
from PIL import Image

# Define the path to the DAQUAR dataset directory
dataset_path = '/content/drive/MyDrive/DAQUAR'

# Load the lists of image file names for training and testing
train_images_list = pd.read_csv(os.path.join(dataset_path, '/content/drive/MyDrive/DAQUAR/train_images_list.txt'), header=None).squeeze()
test_images_list = pd.read_csv(os.path.join(dataset_path, '/content/drive/MyDrive/DAQUAR/test_images_list.txt'), header=None).squeeze()

# Load the new structured data for training, validation, and testing in CSV format
new_data_train = pd.read_csv(os.path.join(dataset_path, '/content/drive/MyDrive/DAQUAR/new_data_train.csv'))
new_data_val = pd.read_csv(os.path.join(dataset_path, '/content/drive/MyDrive/DAQUAR/new_data_val.csv'))
new_data_test = pd.read_csv(os.path.join(dataset_path, '/content/drive/MyDrive/DAQUAR/test_images_list.txt'))

# Path to the images directory
images_path = os.path.join(dataset_path, 'images')

# Function to load an image
def load_image(image_file):
    # Append the correct file extension '.png' instead of '.jpg'
    image_path = os.path.join(images_path, image_file + '.png')
    image = Image.open(image_path).convert('RGB')  # Convert to RGB
    return image

# Example usage: Load the first image from the training list
first_image_file = train_images_list.iloc[0]
first_image = load_image(first_image_file)  # Make sure the file extension is '.png'
first_image.show()





# image preprocessing
import torchvision.transforms as transforms

# Define the image transformation
image_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Apply the transformation to an image
preprocessed_image = image_transform(first_image)  # Assuming 'first_image' is a PIL image

# text preprocessing
from transformers import AutoTokenizer

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')

# Tokenize and encode the text
def preprocess_text(text, max_length=512):
    return tokenizer(
        text,
        padding='max_length',
        truncation=True,
        max_length=max_length,
        return_tensors='pt',  # Return PyTorch tensors
    )

# Example usage
sample_question = "What is shown in the picture?"
encoded_question = preprocess_text(sample_question)


tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

In [None]:
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import torch
from PIL import Image
import os
from transformers import AutoTokenizer

# Assuming new_data_train, images_path, image_transform are already defined
unique_answers = new_data_train['answer'].unique()
answer_to_label_mapping = {answer: idx for idx, answer in enumerate(unique_answers)}

# Define tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')

class VQADataset(Dataset):
    def __init__(self, dataframe, images_path, transform, tokenizer, answer_to_label_mapping):
        self.dataframe = dataframe
        self.images_path = images_path
        self.transform = transform
        self.tokenizer = tokenizer
        self.answer_to_label_mapping = answer_to_label_mapping

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_file_name = self.dataframe.iloc[idx]['image_id']
        img_name = os.path.join(self.images_path, image_file_name + '.png').replace('\\', '/')


        try:
            image = Image.open(img_name).convert('RGB')
        except FileNotFoundError:
            print(f"File not found: {img_name}")
            raise

        if self.transform:
            image = self.transform(image)

        if self.tokenizer:
            question = self.dataframe.iloc[idx]['question']
            question = self.tokenizer(
                question,
                padding='max_length',
                truncation=True,
                max_length=512,
                return_tensors='pt'
            )
        else:
            question = {}

        answer_text = self.dataframe.iloc[idx]['answer']
        answer_label = self.answer_to_label_mapping[answer_text]
        answer_tensor = torch.tensor(answer_label, dtype=torch.long)

        return image, question, answer_tensor

# Instantiate the dataset
train_dataset = VQADataset(new_data_train, images_path, image_transform, tokenizer, answer_to_label_mapping)
val_dataset = VQADataset(new_data_val, images_path, image_transform, tokenizer, answer_to_label_mapping)
test_dataset = VQADataset(new_data_test, images_path, image_transform, tokenizer, answer_to_label_mapping)

# DataLoader
train_dataloader = DataLoader(train_dataset, batch_size=512, shuffle=True)
validation_dataloader = DataLoader(val_dataset, batch_size=256, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)


In [None]:
from tqdm import tqdm
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer
from torchvision.models import resnet50
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize

# Define the VQAModel
class VQAModel(nn.Module):
    def __init__(self, visual_embedding_dim, textual_embedding_dim, num_classes):
        super(VQAModel, self).__init__()
        # Define the linear layer and ReLU activation for dimensionality reduction
        self.fc = nn.Linear(visual_embedding_dim + textual_embedding_dim, 512)
        self.relu = nn.ReLU()
        # Classifier layer
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, visual_embeddings, textual_embeddings):
        # Concatenate the embeddings
        # Check the dimensions of visual_embeddings
        print("Shape of visual_embeddings:", visual_embeddings.shape)

    # Check the dimensions of textual_embeddings
        print("Shape of textual_embeddings:", textual_embeddings.shape)
        combined = torch.cat((visual_embeddings, textual_embeddings), dim=1)
        # Dimensionality reduction
        combined = self.relu(self.fc(combined))
        # Classify
        output = self.classifier(combined)
        return output

# Initialize the VQAModel
visual_embedding_dim = 2048  # This depends on your ResNet model output
textual_embedding_dim = 384  # This depends on your SBERT model output
num_classes = 30  # As mentioned in your task description
model = VQAModel(visual_embedding_dim, textual_embedding_dim, num_classes)

# Define loss function and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Initialize ResNet and SBERT models
resnet = resnet50(pretrained=True)

resnet = nn.Sequential(*(list(resnet.children())[:-1]))  # Remove the classification layer
resnet.eval()

tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
sbert = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
sbert.eval()



from torch.utils.data import DataLoader

# Assume the DataLoader and dataset are properly defined
# train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

for sample in train_dataloader:
    print(f"Number of items in a batch: {len(sample)}")
    break

# Training Loop
num_epochs = 4  # Define the number of epochs
validation_interval = 1
# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Move SBERT model to the same device as the input tensors
sbert = sbert.to(device)
resnet = resnet.to(device)
for epoch in tqdm(range(num_epochs), desc="Epochs"):
    print(f"Epoch {epoch+1}/{num_epochs}")

    # Training loop without tqdm for batches
    for batch_idx, batch in enumerate(train_dataloader):
        images, questions, answers = batch  # Unpack the batch

        # Calculate the total number of batches
        total_batches = len(train_dataloader)

        # Move images, questions, and answers to the available device (GPU or CPU)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        images = images.to(device)

        # Modify the 'questions' dictionary to remove the extra dimension
        questions = {key: value.squeeze(1).to(device) for key, value in questions.items()}

        if torch.is_tensor(answers):
            answers = answers.to(device)



        with torch.no_grad():

            # Move SBERT model to the same device as the input tensors
            sbert = sbert.to(device)

            # Move the input tensors (questions) to the same device
            input_ids = questions['input_ids'].to(device)
            attention_mask = questions['attention_mask'].to(device)

            # Pass the input to SBERT correctly
            question_embeddings = sbert(input_ids=input_ids, attention_mask=attention_mask).pooler_output  # Adjust based on SBERT output


        # Forward pass
        image_embeddings = resnet(images).squeeze(-1).squeeze(-1)  # Adjust based on ResNet output
        outputs = model(image_embeddings, question_embeddings)
        loss = loss_function(outputs, answers)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"Batch [{batch_idx+1}/{total_batches}], Loss: {loss.item()}")

    if epoch % validation_interval == 0:
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for images, questions, answers in validation_dataloader:
                # Similar preprocessing and embedding extraction
                outputs = model(resnet(images), sbert(questions))
                _, predicted = torch.max(outputs.data, 1)
                total += answers.size(0)
                correct += (predicted == answers).sum().item()

            validation_accuracy = 100 * correct / total
            print(f'Epoch {epoch}/{num_epochs}, Validation Accuracy: {validation_accuracy:.2f}%')
    model.train()

# After training, perform testing
predictions =[]
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, questions, answers in test_loader:
        # Similar preprocessing and embedding extraction
        outputs = model(resnet(images), sbert(questions))
        _, predicted = torch.max(outputs.data, 1)
        total += answers.size(0)
        correct += (predicted == answers).sum().item()

    test_accuracy = 100 * correct / total
    print(f'Test Accuracy: {test_accuracy:.2f}%')

# Save model checkpoint
torch.save(model.state_dict(), 'vqa_model_checkpoint.pth')

# Save predictions
# Assuming 'predictions' is a list of model predictions
with open('predictions.txt', 'w') as f:
    for item in predictions:
        f.write("%s\n" % item)


Number of items in a batch: 3


Epochs:   0%|          | 0/4 [00:00<?, ?it/s]

Epoch 1/4


Epochs:   0%|          | 0/4 [00:11<?, ?it/s]


OutOfMemoryError: ignored