In [None]:
!pip install opendatasets

In [None]:
import opendatasets as od

od.download('https://www.kaggle.com/datasets/ingbiodanielh/vizwiz')

In [3]:
!ls /content/vizwiz/data/Images | wc -l

31173


In [None]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

In [5]:
import clip
import json
import numpy as np
import torch
import torchtext
from PIL import Image
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
img = Image.open('/content/vizwiz/data/Images/VizWiz_train_000000000000.jpg')

In [None]:
img.show()

In [6]:
data = None

with open('/content/vizwiz/data/Annotations/train.json') as f:
  data = json.load(f)

In [100]:
tokenizer = torchtext.data.utils.get_tokenizer('basic_english')
tokenized_answers = np.array([tokenizer(answer) for answer in answers[:3000]], dtype=object)
print(tokenized_answers[10])
vocab = torchtext.vocab.build_vocab_from_iterator(tokenized_answers, min_freq=1)
vocab.insert_token('<unk>', 0)
vocab.insert_token('<eos>', 1)
vocab.set_default_index(vocab['<unk>'])
print(len(vocab))
print(vocab.get_itos()[:10])

7119


In [7]:
len(data)

20000

In [8]:
d = {'img': 0, 'answerable': 1, 'question': 2, 'answer_type': 3, 'answer': 4}

In [9]:
df = []

for entry in data:
  df.append([entry['image'], entry['answerable'], entry['question'], entry['answer_type'], entry['answers'][4]['answer']])

df = np.array(df)

In [10]:
df.shape

(20000, 5)

In [11]:
y = np.array([(entry[3], entry[4]) for entry in df])

In [12]:
answers = np.array([entry[4] for entry in df])

In [13]:
max_len = len(max(answers,key=len))

In [14]:
answer_types = np.array([entry[3] for entry in df])

In [8]:
y.shape

(20000, 2)

In [9]:
y[10]

array(['other', 'samsung phone'], dtype='<U92')

In [15]:
model, preprocess = clip.load('ViT-B/32', device=device)

100%|████████████████████████████████████████| 338M/338M [00:03<00:00, 115MiB/s]


In [17]:
X = []

with torch.no_grad():
  for i in range(3000):
    encoded_img = model.encode_image(preprocess(Image.open('/content/vizwiz/data/Images/'+df[i][0])).unsqueeze(0).to(device))
    encoded_q = model.encode_text(clip.tokenize(df[i][2]).to(device))

    X.append(torch.hstack([encoded_img, encoded_q]))

X = torch.vstack(X)

In [33]:
X.size()

torch.Size([3000, 1024])

In [34]:
tokenizer = torchtext.data.utils.get_tokenizer('basic_english')
tokenized_answers = np.array([tokenizer(answer) for answer in answers[:3000]], dtype=object)
print(tokenized_answers[10])

['samsung', 'phone']


In [35]:
tokenized_answers.shape

(3000,)

In [36]:
vocab = torchtext.vocab.build_vocab_from_iterator(tokenized_answers, min_freq=1)
vocab.insert_token('<unk>', 0)
vocab.insert_token('<eos>', 1)
vocab.set_default_index(vocab['<unk>'])
print(len(vocab))
print(vocab.get_itos()[:10])

1900
['<unk>', '<eos>', 'unanswerable', 'unsuitable', 'no', 'white', 'black', 'blue', 'yes', 'grey']


In [39]:
fin_answers = []

for answer in tokenized_answers:
  answer.append('<eos>')
  tokens = np.array([vocab[token] for token in answer])
  fin_answers.append(np.pad(tokens, (0,max_len-len(tokens))))

fin_answers = np.array(fin_answers)
print(fin_answers[10])

[540  14   1   1   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
   0   0]


In [40]:
fin_answers.shape

(3000, 92)

In [41]:
le = preprocessing.LabelEncoder()

encoded_answer_types = le.fit_transform(answer_types)

print(encoded_answer_types[10])

1


In [42]:
le.classes_

array(['number', 'other', 'unanswerable', 'yes/no'], dtype='<U12')

In [75]:
y = [(a_type, a) for a_type, a in zip(encoded_answer_types, fin_answers)]
# y = np.array(y, dtype=object)

In [45]:
y.shape

(3000, 2)

In [76]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)

In [77]:
from torch.utils.data import Dataset, DataLoader

# Custom Dataset class
class VQADataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.qtypes = []
        self.answers = []
        for qtype, ans in labels:
          self.qtypes.append(qtype)
          self.answers.append(ans)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        x = torch.Tensor(self.data[index])
        qy, ay = self.qtypes[index], torch.Tensor(self.answers[index])

        return x, qy, ay

In [78]:
train_set = DataLoader(VQADataset(X_train, y_train), batch_size=32)
test_set = DataLoader(VQADataset(X_test, y_test), batch_size=32)

In [93]:
class VQA_Network(nn.Module):
    def __init__(self, num_classes, seq_len, hidden_dim, embedding_dim):
        super(VQA_Network, self).__init__()

        self.fc = nn.Linear(embedding_dim, hidden_dim)
        self.fc2_answers = nn.Linear(hidden_dim, seq_len)

        self.fc2_aux = nn.Linear(hidden_dim, num_classes)
        self.fc3_aux = nn.Linear(num_classes, seq_len)

        self.norm = nn.LayerNorm(hidden_dim)
        self.dropout = nn.Dropout()
        self.softmax = nn.Softmax(dim=0)

    def forward(self, x):
        x = x.to(dtype=torch.float32)
        x = self.fc(x)
        x = self.norm(x)
        x = self.dropout(x)

        qtype = self.fc2_aux(x)
        aux = self.fc3_aux(qtype)

        answers = self.fc2_answers(x)

        answers = answers * self.softmax(aux)

        # answers = answers.to(dtype=torch.int64)

        return answers, qtype


In [None]:
num_classes = 4
seq_len = 92
hidden_dim = 512
embedding_dim = 1024

num_epochs = 200

# Instantiate the model
model = VQA_Network(num_classes, seq_len, hidden_dim, embedding_dim).to(device)

# Define loss function
criterion = nn.CrossEntropyLoss()

# Define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss_a = 0.0
    running_loss_q = 0.0

    for xs, qtypeset, answerset in train_set:
        # Move data to the device
        xs = xs.to(device)
        qtypeset = qtypeset.to(device)
        answerset = answerset.to(device)

        # Clear gradients
        optimizer.zero_grad()

        # Forward pass
        answers, qtypes = model(xs)
        # print(xs.size())
        # print(labelset.size())
        # Compute loss
        answer_loss = criterion(answers, answerset)

        # Backward pass
        answer_loss.backward(retain_graph=True)

        # Compute loss
        qtype_loss = criterion(qtypes, qtypeset)

        # Backward pass
        qtype_loss.backward()

        # Update weights
        optimizer.step()

        # Update running loss
        running_loss_a += answer_loss.item()
        running_loss_q += qtype_loss.item()

    # Calculate average loss for the epoch
    average_loss_a = running_loss_a / len(train_set)
    average_loss_q = running_loss_q / len(train_set)

    # Print progress
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss_a: {average_loss_a:.4f}, Loss_q: {average_loss_q:.4f}")

# Training complete


In [None]:
X_test[1]

In [None]:
y_test[3]

In [None]:
print(model(X_test[3]))