In [1]:


import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from dataclasses import dataclass
from torchtune.modules import RMSNorm
from tokenizers import Tokenizer
from pathlib import Path
from transformers import RobertaTokenizer, RobertaModel
from torchvision.transforms import Compose, Resize, CenterCrop, Normalize, ToTensor
from torchvision.transforms.v2 import RGB

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from torch.utils.data import random_split
from PIL import Image
from transformers import ViTImageProcessor, ViTForImageClassification, ViTFeatureExtractor

In [4]:

@dataclass
class ModelArgs:
    #Hyperparameters
    img_size = (224, 224)
    block_size = 77
    batch_size = 32
    embeddings_dims = 768
    projection_dims = 768
    attn_dropout = 0.1
    no_of_heads = 12 #IMP needs to be thoroughly calculated
    dropout = 0.1
    epochs = 100
    lr = 4e-4
    no_of_decoder_layers = 12 #IMP needs to be thoroughly calculated
    weight_decay_optim = 0.2
    beta_1 = 0.9
    beta_2 = 0.98
    epsilon = 1e-6
    device = 'cuda'
    vocab_size = 2000
    head_lr = 1e-3
    image_encoder_lr = 1e-4
    text_encoder_lr = 1e-5

In [5]:
class Normalization(nn.Module):
    def __init__(
        self,
        embeddings_dims: int = ModelArgs.embeddings_dims
    ):  
        super().__init__()
        self.layernorm_layer = torch.nn.LayerNorm(normalized_shape=embeddings_dims)
        
        
    def forward(self, x):
        
        x = self.layernorm_layer(x)
        return x
        

In [6]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base')

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [7]:
class TextModel(nn.Module):
    def __init__(self):
        super().__init__()
            
        
        
        self.layer_norm = Normalization()
        self.model = RobertaModel.from_pretrained('roberta-base')
        self.tokenizer = tokenizer
        self.multimodalTextLayerProjector = nn.Linear(in_features=ModelArgs.embeddings_dims, out_features=ModelArgs.projection_dims, device=ModelArgs.device)
        
        for p in self.model.parameters():
            p.requires_grad = True
        self.model.train()
    def forward(self, x):
        # print("Problemetic x shape: ", x['input_ids'].shape)
        # print("Problemetic x shape: ", x['attention_mask'].shape)
        x['input_ids'] = x['input_ids'].squeeze(1)
        x['attention_mask'] = x['attention_mask'].squeeze(1) 
        x = self.model(input_ids = x['input_ids'], attention_mask = x['attention_mask'])['last_hidden_state'][:, 0, :] 
        # print(x)
        x = self.layer_norm(x)
        return self.multimodalTextLayerProjector(x)

In [8]:
class VisionModel(nn.Module):
    def __init__(self):
        super().__init__()
            

        self.multimodalVisionLayerProjector = nn.Linear(in_features=151296, out_features=ModelArgs.projection_dims, device=ModelArgs.device)
        # self.processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
        self.model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224', output_hidden_states=True)
        # Initialize the feature extractor
        self.feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')

        self.main = nn.Sequential(
            nn.Flatten()
        )
        for p in self.model.parameters():
            p.requires_grad = True
        self.model.train()    
        
        
    def forward(self, x):
        x = self.feature_extractor(x['image'],return_tensors="pt")
        x = x.to(ModelArgs.device)
        
        
        
        with torch.no_grad():
            x = self.model(**x)
            x = x.hidden_states[-1]
            x = self.main(x)
            # print(x)
            # print(x.shape)
            return self.multimodalVisionLayerProjector(x)
        
        

In [15]:
class CLiP(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.vision = VisionModel()
        self.text = TextModel()
        # self.tokenizer = tokenizer
        self.multimodelTextLayerPorjector = nn.Linear(in_features=ModelArgs.embeddings_dims, out_features=ModelArgs.projection_dims, device=ModelArgs.device)
        self.multimodalVisionLayerProjector = nn.Linear(in_features=ModelArgs.embeddings_dims, out_features=ModelArgs.projection_dims, device=ModelArgs.device)
        # self.temperature = nn.Parameter(torch.ones(size=(ModelArgs.batch_size,), device=ModelArgs.device), requires_grad=True)
        self.temperature = nn.Parameter(torch.ones([], requires_grad=True), requires_grad=True)

    def forward(self, x):
        
        embeds_text = self.text(x)
        # print("Inside CLiP text: ", embeds_text.shape)
        proj_txt = torch.nn.functional.normalize(self.multimodelTextLayerPorjector(embeds_text))
        embeds_img = self.vision(x)
        # print("Inside ViT: ", embeds_img.shape)
        proj_img = torch.nn.functional.normalize(self.multimodalVisionLayerProjector(embeds_img))
        # print(proj_txt.shape)
        # print(proj_img.shape)
        logits = (proj_txt @ proj_img.T) * torch.exp(self.temperature)
        # print("Inside CLiP logits shape: ", logits.shape)
        return logits

In [16]:
clip = CLiP()

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [17]:
#Config
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2

train_transforms = A.Compose(
    [   
        A.Resize(height=224, width=224),
        A.CenterCrop(height=224, width=224),
        # A.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], std=[0.26862954, 0.26130258, 0.27577711], max_pixel_value=224.0,),
        # A.ToFloat(max_value=224),
        ToTensorV2(),
    ]
)

test_tyransforms = A.Compose(
    [
        # A.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], std=[0.26862954, 0.26130258, 0.27577711], max_pixel_value=224.0,),
        # A.ToFloat(max_value=224),
        ToTensorV2(),
    ]
)

In [18]:
import pandas as pd

df = pd.read_csv('data/flickr8000/captions.txt', sep=',')
df

Unnamed: 0,image,caption
0,1000268201_693b08cb0e.jpg,A child in a pink dress is climbing up a set o...
1,1000268201_693b08cb0e.jpg,A girl going into a wooden building .
2,1000268201_693b08cb0e.jpg,A little girl climbing into a wooden playhouse .
3,1000268201_693b08cb0e.jpg,A little girl climbing the stairs to her playh...
4,1000268201_693b08cb0e.jpg,A little girl in a pink dress going into a woo...
...,...,...
40450,997722733_0cb5439472.jpg,A man in a pink shirt climbs a rock face
40451,997722733_0cb5439472.jpg,A man is rock climbing high in the air .
40452,997722733_0cb5439472.jpg,A person in a red shirt climbing up a rock fac...
40453,997722733_0cb5439472.jpg,A rock climber in a red shirt .


In [19]:
df_sampled = df.sample(frac=0.01, random_state=42)
df_sampled

Unnamed: 0,image,caption
17775,2973269132_252bfd0160.jpg,A large wild cat is pursuing a horse across a ...
13506,270263570_3160f360d3.jpg,Two brown dogs fight on the leafy ground .
4325,2053006423_6adf69ca67.jpg,A man in shorts is standing on a rock looking ...
37870,512101751_05a6d93e19.jpg,a muzzled white dog is running on the grass .
21321,3156406419_38fbd52007.jpg,A person skiing downhill .
...,...,...
35640,391020801_aaaae1e42b.jpg,A man gesticulates .
12364,2629027962_9cc3b46527.jpg,With a barn in the background a child puts her...
17672,2966552760_e65b22cd26.jpg,A smiling child sits against a wall on a blank...
24614,3290105461_7590f23371.jpg,Cricket player with red cap hits the ball outd...


In [20]:
import os
import numpy as np
class CLiPDatatset(Dataset):
    def __init__(self, path):
        self.tokenizer = tokenizer
        self.path = path
        # self.dir = os.listdir(self.path)        
    def __len__(self):
        
        return df_sampled.shape[0]
        
    def __getitem__(self, idx):
        
        text, img = df_sampled.iloc[idx][1], df_sampled.iloc[idx][0]
        # print(text)
        # print(img)
        img_path = os.path.join(self.path, img) 
        # print(img_path)
        img = np.array(Image.open(img_path))

        input_transformed = train_transforms(image = img)['image']
        
        text_tokenized = self.tokenizer(text, return_tensors='pt', padding='max_length', truncation=True, max_length=ModelArgs.block_size)
        
        # print(text_tokenized)
        encoded_items = {
            
            key: torch.tensor(values)
            for key, values in text_tokenized.items()
            
        }
        encoded_items['image'] = input_transformed
        return encoded_items

In [21]:
dir = 'data/flickr8000/images'
dataset = CLiPDatatset(dir)

# Assuming 'dataset' is already created
# Split the dataset into training and validation sets
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])


#Creating dataloaders

trainloader = DataLoader(train_dataset, batch_size=ModelArgs.batch_size, shuffle=True)
valloader = DataLoader(val_dataset, batch_size=ModelArgs.batch_size, shuffle=False)


In [24]:
import itertools
params = [
        {"params": clip.vision.parameters(), "lr": ModelArgs.image_encoder_lr},
        {"params": clip.text.parameters(), "lr": ModelArgs.text_encoder_lr},
        {"params": itertools.chain(
            clip.multimodalVisionLayerProjector.parameters(), clip.multimodelTextLayerPorjector.parameters(), [clip.temperature]
        ), "lr": ModelArgs.head_lr, "weight_decay": ModelArgs.weight_decay_optim}
    ]

optimizer = torch.optim.Adam(lr=ModelArgs.lr, params=params, eps=ModelArgs.epsilon)
loss_fn = nn.CrossEntropyLoss()

# def cross_entropy(pred=None, targets=None, dim=None):
#     # print("Targets shape is: ",targets.shape)
#     # print("Predictions shape is :", pred.shape)
    
#     preds = nn.functional.log_softmax(pred, dim=-1)

#     l = (-targets * preds).sum(1).mean()
#     return l

In [25]:
from going_modular import engine

In [26]:
results = engine.train(model=clip,
                       writer=None,
                       train_dataloader=trainloader,
                       test_dataloader=valloader,
                       optimizer=optimizer,
                       loss_fn=loss_fn,
                       epochs=30,
                       device=ModelArgs.device)

  0%|          | 0/30 [00:00<?, ?it/s]

  text, img = df_sampled.iloc[idx][1], df_sampled.iloc[idx][0]
  key: torch.tensor(values)
  loss_t = torch.nn.functional.cross_entropy(y_pred.T, labels.T)


Epoch: 1 | train_loss: 3.3857 | test_loss: 2.8319 
Epoch: 2 | train_loss: 3.3842 | test_loss: 2.8363 
Epoch: 3 | train_loss: 3.3898 | test_loss: 2.8558 
Epoch: 4 | train_loss: 3.2075 | test_loss: 2.5720 
Epoch: 5 | train_loss: 2.7535 | test_loss: 2.3091 
Epoch: 6 | train_loss: 2.4067 | test_loss: 2.0960 
Epoch: 7 | train_loss: 2.2730 | test_loss: 2.2074 
Epoch: 8 | train_loss: 2.2631 | test_loss: 2.1497 
Epoch: 9 | train_loss: 2.2433 | test_loss: 2.1711 
Epoch: 10 | train_loss: 2.1443 | test_loss: 2.0108 
Epoch: 11 | train_loss: 2.1279 | test_loss: 2.1695 
Epoch: 12 | train_loss: 2.0682 | test_loss: 2.1878 
Epoch: 13 | train_loss: 2.0128 | test_loss: 2.0808 
Epoch: 14 | train_loss: 1.9314 | test_loss: 2.1472 
Epoch: 15 | train_loss: 1.9334 | test_loss: 2.0239 
Epoch: 16 | train_loss: 1.8983 | test_loss: 2.0606 
Epoch: 17 | train_loss: 1.7936 | test_loss: 2.0642 
Epoch: 18 | train_loss: 1.8178 | test_loss: 2.1490 
Epoch: 19 | train_loss: 1.8154 | test_loss: 2.1520 
Epoch: 20 | train_los