In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from transformers import AutoModel , AutoImageProcessor , AutoTokenizer 
from datasets import load_dataset , Dataset
from PIL import Image
import torch
from torch import nn

torch.cuda.empty_cache()

2024-05-14 17:56:14.064439: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def training_img_address(input):
    IMAGE_DIR = "./train2014_3d"
    input["image_id"] = f"{IMAGE_DIR}/{input['image_id']}"
    return input

In [3]:
SAMPLES = 10

df_acc = pd.read_pickle("./vqa_v2_acc.pkl")
dataset = Dataset.from_pandas(df_acc)
dataset = dataset.remove_columns(['__index_level_0__'])
dataset = dataset.select(range(0,SAMPLES))
dataset = dataset.map(training_img_address)

dataset

Map: 100%|██████████| 10/10 [00:00<00:00, 1112.13 examples/s]


Dataset({
    features: ['question', 'question_type', 'question_id', 'image_id', 'answer_type', 'label'],
    num_rows: 10
})

In [4]:
Image.open(dataset[0]["image_id"]).size

(640, 480)

In [5]:
X_img = np.array([np.array(Image.open(i).resize((640 , 480))) for i in dataset["image_id"]])
X_text = np.array(dataset["question"])
X_img.shape , X_text.shape

((10, 480, 640, 3), (10,))

Creating Labels

In [6]:
import itertools

labels = [item['ids'] for item in dataset['label']]
flattened_labels = list(itertools.chain(*labels))
unique_labels = list(set(flattened_labels))

label2id = {label: idx for idx, label in enumerate(unique_labels)}
id2label = {idx: label for label, idx in label2id.items()} 

In [7]:
label2id

{'red and white': 0,
 'red': 1,
 'flying disc': 2,
 'skiing': 3,
 'black': 4,
 'red & white': 5,
 'no': 6,
 'pitcher': 7,
 'mesh': 8,
 'frisbie': 9,
 'white': 10,
 'white frisbee': 11,
 'net': 12,
 'frisbee': 13,
 'catcher': 14,
 'yes': 15,
 'netting': 16,
 'orange': 17}

In [8]:
def replace_ids(inputs):
    '''Converting everything to one-hot-encoding'''
    h = [0 for i in id2label]
    for i in range(len(inputs["label"]["ids"])):
        t = inputs["label"]["ids"][i]
        w = inputs["label"]["weights"][i]
        if w > 0.5: w = 1
        else: w = 0.3
        # print(t , w)
        # print(label2id.get(t , 0))
        h[label2id.get(t , 0)] = w
    inputs["label"] = h
    return inputs


flat_dataset = dataset.map(replace_ids)
# flat_dataset = dataset.flatten()
NUM_CLASSES = np.array(flat_dataset["label"]).shape[1]
print("Number of Classes : " , NUM_CLASSES)

Map: 100%|██████████| 10/10 [00:00<00:00, 2096.00 examples/s]

Number of Classes :  18





Testing BERT and ViT

In [9]:
DEVICE = "cuda"

vit_processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224")
# model = AutoModel.from_pretrained("google/vit-base-patch16-224").to(DEVICE)
vit_model = AutoModel.from_pretrained("google/vit-base-patch16-224")

Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [10]:
from transformers import BertTokenizer , BertModel

bert_processor = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')



In [11]:
ip = vit_processor(X_img , return_tensors="pt")

In [12]:
# outp = vit_model(**ip)
outp = vit_model(ip["pixel_values"])
outp.keys()

odict_keys(['last_hidden_state', 'pooler_output'])

Testing Concatentation

In [13]:
x = outp.pooler_output


batch_sentences = dataset["question"]
encoded_input = bert_processor(batch_sentences, padding=True, truncation=True, return_tensors="pt")
input_ids = encoded_input["input_ids"]
attention_mask = encoded_input["attention_mask"]
y = bert_model(input_ids , attention_mask=attention_mask).pooler_output

x.shape , y.shape

torch.cat([x,y] , axis=1).shape

torch.Size([10, 1536])

In [85]:
class CustomVITModel(nn.Module):
    def __init__(self):
        super(CustomVITModel, self).__init__()
        self.vit = AutoModel.from_pretrained("google/vit-base-patch16-224")
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        ### New layers:
        self.linear1 = nn.Linear(768 * 2, 256)
        self.linear2 = nn.Linear(256, NUM_CLASSES) ## 3 is the number of classes in this example
        self.sigmoid = nn.Softmax()

    def forward(self, pixel_values , input_ids , attention_mask):
        vit_outp = self.vit(pixel_values)
        vit_last_hidden_state, vit_pooled_output = vit_outp.last_hidden_state , vit_outp.pooler_output
        
        bert_outp = self.bert(input_ids , attention_mask=attention_mask)
        bert_last_hidden_state, bert_pooled_output = bert_outp.last_hidden_state , bert_outp.pooler_output
        
        # last_hidden_state = last_hidden_state.cuda()
        # pooled_output = pooled_output.cuda()

        # sequence_output has the following shape: (batch_size, sequence_length, 768)
        
        combined_input = torch.cat([vit_pooled_output,bert_pooled_output] , axis=1)
        
        # linear1_output = self.linear1(last_hidden_state[:,0,:].view(-1,768)) ## extract the 1st token's embeddings
        linear1_output = self.linear1(combined_input)

        linear2_output = self.linear2(linear1_output)
        
        pred = linear2_output
        # pred = self.sigmoid(linear2_output)

        return pred

# tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-base-italian-xxl-cased") 
model_custom = CustomVITModel() # You can pass the parameters if required to have more flexible model
# model_custom.to("cuda") ## can be gpu
criterion = nn.CrossEntropyLoss() ## If required define your own criterion
# criterion = nn.BCELoss() ## If required define your own criterion
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model_custom.parameters()))

Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [86]:
EPOCHS = 2
batch_size = 16
Y = torch.tensor(flat_dataset["label"])

for epoch in range(EPOCHS):
    for i in range(0 , len(X_img) , batch_size):

        data_img = X_img[i:i + batch_size]
        data_img = torch.from_numpy(data_img)
        data_text = X_text[i:i + batch_size]
        # targets = np.array([[0],[1]])
        targets = Y[i : i + batch_size]
        targets=targets.to(torch.float)
        # data = data.cuda()
        # targets = targets.cuda()
        
        optimizer.zero_grad()   
        # encoding = tokenizer.batch_encode_plus(data, return_tensors='pt', padding=True, truncation=True,max_length=50, add_special_tokens = True)
        pixel_values = vit_processor(data_img , return_tensors="pt")["pixel_values"]
        bertop = bert_processor(data_text.tolist(), padding=True, truncation=True, return_tensors="pt")
        input_ids , attention_mask = bertop["input_ids"] , bertop["attention_mask"]
        outputs = model_custom(pixel_values , input_ids , attention_mask)

        loss = criterion(outputs, targets)
        print(loss)
        loss.backward()
        optimizer.step()

tensor(7.2198, grad_fn=<DivBackward1>)
tensor(6.4206, grad_fn=<DivBackward1>)
tensor(6.9298, grad_fn=<DivBackward1>)
tensor(8.4136, grad_fn=<DivBackward1>)
tensor(11.1243, grad_fn=<DivBackward1>)
tensor(6.2315, grad_fn=<DivBackward1>)
tensor(6.1549, grad_fn=<DivBackward1>)
tensor(6.6992, grad_fn=<DivBackward1>)
tensor(4.6314, grad_fn=<DivBackward1>)
tensor(6.4963, grad_fn=<DivBackward1>)
tensor(7.4857, grad_fn=<DivBackward1>)
tensor(7.3287, grad_fn=<DivBackward1>)
tensor(5.6075, grad_fn=<DivBackward1>)
tensor(5.0505, grad_fn=<DivBackward1>)


In [45]:
ip1 = processor(X[0] , return_tensors="pt")
logits = model_custom(**ip1)
pred_probab = nn.Sigmoid()(logits)
pred_probab

tensor([[0.6383]], grad_fn=<SigmoidBackward0>)

### Testing PyTorch Training

In [31]:
class ClassificationHead(nn.Module):
    def __init__(self, input_size, num_classes):
        super(ClassificationHead, self).__init__()
        # self.fc1 = nn.Linear(input_size, 128)
        # self.fc2 = nn.Linear(128, 64)
        # self.fc3 = nn.Linear(64, num_classes)
        # self.relu = nn.ReLU()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes),
        )
        
    
    def forward(self, x):
        # x = self.fc1(x)
        # x = self.relu(x)
        # x = self.fc2(x)
        # x = self.relu(x)
        # x = self.fc3(x)
        # # x = self.sigmoid(x)
        # return x
        
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define your number of classes
num_classes = NUM_CLASSES  # Number of classes in your classification task

classification_head = ClassificationHead(vit_model.config.hidden_size, num_classes)


loss_fn = nn.CrossEntropyLoss() ## If required define your own criterion
# optimizer = torch.optim.Adam(vit_model.parameters(), lr=0.0001)
optimizer = torch.optim.Adam(classification_head.parameters(), lr=0.0001)

# Create the classification head

EPOCHS = 2
batch_size = 2
Y = torch.tensor(flat_dataset["label"])

print("Training:")
# Training Loop
running_loss = 0.0
batch = 0
for epoch in range(EPOCHS):
    classification_head.train()
    for i in range(0 , len(X_img) , batch_size):
        batch += 1
        inputs = X_img[i:i+batch_size]
        labels = Y[i:i+batch_size]
        labels=labels.to(torch.float)
        
        ip = vit_processor(inputs , return_tensors="pt")

        # Forward pass
        outputs = vit_model(**ip).last_hidden_state[:, 0, :]
        logits = classification_head(outputs)
        
        # Calculate loss
        loss = loss_fn(logits, labels)
        running_loss += loss.item()
        
        # print(f"EPOCH : {epoch} | BATCH : {i} | LOSS : {running_loss}")
        # print(loss)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        loss, current = loss.item(), batch * batch_size + len(inputs)
        print(f"loss: {loss:>7f} ")

# print("Inference:")
# # Inference
# with torch.no_grad():
#     inputs = X
#     ip = processor(inputs , return_tensors="pt")
#     outputs = model(**ip).last_hidden_state[:, 0, :]
#     logits = classification_head(outputs)
#     ans = nn.functional.sigmoid(logits)
#     print(ans)

Training:
loss: 4.144501 
loss: 3.394263 
loss: 2.881017 
loss: 5.493414 
loss: 2.792670 
loss: 4.001240 
loss: 3.337624 
loss: 2.815248 
loss: 5.356466 
loss: 2.645538 


In [26]:
print("Inference:")
# Inference
with torch.no_grad():
    inputs = X_img
    ip = vit_processor(inputs , return_tensors="pt")
    outputs = vit_model(**ip).last_hidden_state[:, 0, :]
    logits = classification_head(outputs)
    logits = nn.Softmax(dim=1)(logits)
    print(logits)

Inference:
tensor([[0.0591, 0.0525, 0.0507, 0.0482, 0.0511, 0.0474, 0.0651, 0.0681, 0.0618,
         0.0517, 0.0544, 0.0456, 0.0711, 0.0489, 0.0517, 0.0619, 0.0478, 0.0628],
        [0.0591, 0.0525, 0.0507, 0.0482, 0.0511, 0.0474, 0.0651, 0.0681, 0.0618,
         0.0517, 0.0544, 0.0456, 0.0711, 0.0489, 0.0517, 0.0619, 0.0478, 0.0628],
        [0.0591, 0.0525, 0.0507, 0.0482, 0.0511, 0.0474, 0.0651, 0.0681, 0.0618,
         0.0517, 0.0544, 0.0456, 0.0711, 0.0489, 0.0517, 0.0619, 0.0478, 0.0628],
        [0.0591, 0.0525, 0.0507, 0.0482, 0.0511, 0.0474, 0.0651, 0.0681, 0.0618,
         0.0517, 0.0544, 0.0456, 0.0711, 0.0489, 0.0517, 0.0619, 0.0478, 0.0628],
        [0.0494, 0.0695, 0.0510, 0.0574, 0.0531, 0.0528, 0.0647, 0.0473, 0.0524,
         0.0598, 0.0624, 0.0550, 0.0618, 0.0528, 0.0587, 0.0551, 0.0496, 0.0472],
        [0.0494, 0.0695, 0.0510, 0.0574, 0.0531, 0.0528, 0.0647, 0.0473, 0.0524,
         0.0598, 0.0624, 0.0550, 0.0618, 0.0528, 0.0587, 0.0551, 0.0496, 0.0472],
        [0.

In [None]:
Training:
tensor(4.2284, grad_fn=<DivBackward1>)
tensor(3.3829, grad_fn=<DivBackward1>)
tensor(2.9335, grad_fn=<DivBackward1>)
tensor(5.4293, grad_fn=<DivBackward1>)
tensor(2.7480, grad_fn=<DivBackward1>)
tensor(4.1977, grad_fn=<DivBackward1>)
tensor(3.3491, grad_fn=<DivBackward1>)
tensor(2.8715, grad_fn=<DivBackward1>)
tensor(5.2210, grad_fn=<DivBackward1>)
tensor(2.5404, grad_fn=<DivBackward1>)

### TensorFlow

In [61]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-cased")
tokenized_data = tokenizer(dataset["question"], return_tensors="np", padding=True)
# Tokenizer returns a BatchEncoding, but we convert that to a dict for Keras
tokenized_data = dict(tokenized_data)

labels = Y.detach().cpu().numpy()

In [74]:
tokenized_data

{'input_ids': array([[ 101, 1327, 1110, ...,    0,    0,    0],
        [ 101, 1327, 1700, ...,    0,    0,    0],
        [ 101, 1327, 2942, ...,    0,    0,    0],
        ...,
        [ 101, 1731, 1242, ...,    0,    0,    0],
        [ 101, 2181, 1175, ...,    0,    0,    0],
        [ 101, 1731, 1242, ...,    0,    0,    0]]),
 'token_type_ids': array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        ...,
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0]]),
 'attention_mask': array([[1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        ...,
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0]])}

In [72]:
x = tf.keras.Input(shape=(None,))
tokenizer(x, return_tensors="np", padding=True)

ValueError: text input must be of type `str` (single example), `List[str]` (batch or single pretokenized example) or `List[List[str]]` (batch of pretokenized examples).

In [82]:
from transformers import TFAutoModelForSequenceClassification , TFAutoModel , TFAutoModelForImageClassification
from tensorflow.keras.optimizers import Adam

# Load and compile our model
# tf_model = tf.keras.models.Sequential(
#     [
#         TFAutoModelForSequenceClassification.from_pretrained("google-bert/bert-base-cased"),
#         tf.keras.layers.Dense(1 , input_shape=(768,) , activation='sigmoid')
#     ]
# )

# x = tf.keras.Input(shape=)
# tf_model = TFAutoModelForSequenceClassification.from_pretrained("google-bert/bert-base-cased")

proc = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224")
tf_model = TFAutoModel.from_pretrained("google/vit-base-patch16-224")(**tf_model)
# tf_model = tf.keras.layers.Dense(1 , input_shape=(768,1) , activation='sigmoid')(tf_model.logits)

# # Lower learning rates are often better for fine-tuning transformers
# tf_model.compile(optimizer=Adam(3e-5))  # No loss argument!

tf_model.fit(X, labels)
# tf_model


All PyTorch model weights were used when initializing TFViTForImageClassification.

All the weights of TFViTForImageClassification were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFViTForImageClassification for predictions without further training.


ValueError: Exception encountered when calling layer 'patch_embeddings' (type TFViTPatchEmbeddings).

Make sure that the channel dimension of the pixel values match with the one set in the configuration.

Call arguments received by layer 'patch_embeddings' (type TFViTPatchEmbeddings):
  • pixel_values=tf.Tensor(shape=(100, 480, 640, 3), dtype=uint8)
  • interpolate_pos_encoding=None
  • training=False