In [3]:
import timm
from transformers import DistilBertModel, DistilBertConfig, DistilBertTokenizer
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import models,transforms
# import torch_geometric

In [4]:
if torch.cuda.is_available():
    device = 'cuda' 
    n_gpus = torch.cuda.device_count()
elif torch.backends.mps.is_available():
    device = 'mps'
else:
    device = 'cpu'

In [5]:
from Dataset.HatefulMemeDataset import HatefulMemeDataset

dataset = HatefulMemeDataset('./data','train') 


In [6]:
class ImageEncoder(nn.Module):
    """
    Encode images to a fixed size vector
    """

    def __init__(
        self, model_name='resnet50', pretrained=True, trainable=True
    ):
        super().__init__()
        self.model = timm.create_model(
            model_name, pretrained, num_classes=0, global_pool="avg"
        )
        for p in self.model.parameters():
            p.requires_grad = trainable

    def forward(self, x):
        return self.model(x)

In [7]:
class TextEncoder(nn.Module):
    def __init__(self, model_name='distilbert-base-uncased', pretrained=True, trainable=True):
        super().__init__()
        if pretrained:
            self.model = DistilBertModel.from_pretrained(model_name)
        else:
            self.model = DistilBertModel(config=DistilBertConfig())
            
        for p in self.model.parameters():
            p.requires_grad = trainable

        # we are using the CLS token hidden representation as the sentence's embedding
        self.target_token_idx = 0

    def forward(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = output.last_hidden_state
        return last_hidden_state[:, self.target_token_idx, :]

In [8]:
class ProjectionHead(nn.Module):
    def __init__(
        self,
        embedding_dim,
        projection_dim=256,
        dropout=0.1
    ):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, projection_dim)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(projection_dim, projection_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(projection_dim)
    
    def forward(self, x):
        projected = self.projection(x)
        x = self.gelu(projected)
        x = self.fc(x)
        x = self.dropout(x)
        x = x + projected
        x = self.layer_norm(x)
        return x

In [9]:
device = 'cpu'

In [10]:
image,text,label = dataset[0]
imgTensor = transforms.ToTensor()(image).unsqueeze(0).to(device)

./data/img/42953.png


In [106]:
image1,text1,label1 = dataset[1]


./data/img/23058.png


In [11]:
imgModel = ImageEncoder()
image_features = imgModel(imgTensor)

In [12]:
image_projection = ProjectionHead(2048)

In [113]:
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
encoded_query = tokenizer([text])

In [65]:
txtEncoder = TextEncoder()

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_transform.bias', 'vocab_projector.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_layer_norm.bias', 'vocab_projector.weight']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [16]:
text_projection = ProjectionHead(768)

In [114]:
encoded_query

{'input_ids': [[101, 2049, 2037, 2839, 2025, 2037, 3609, 2008, 5609, 102]], 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]}

In [110]:
encoded_query

{'input_ids': [[101, 2049, 2037, 2839, 2025, 2037, 3609, 2008, 5609, 102], [101, 2123, 1005, 1056, 2022, 4452, 2000, 2293, 2153, 3071, 2003, 2025, 2066, 2115, 4654, 102]], 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]}

In [111]:
batch = {
        key: torch.tensor(values).to(device)
        for key, values in encoded_query.items()
    }

ValueError: expected sequence of length 10 at dim 1 (got 16)

In [19]:
batch

{'input_ids': tensor([[ 101, 2049, 2037, 2839, 2025, 2037, 3609, 2008, 5609,  102]]),
 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [20]:
text_features = txtEncoder(
        input_ids=batch["input_ids"], attention_mask=batch["attention_mask"]
    )

In [21]:
text_features.shape

torch.Size([1, 768])

In [22]:
text_embeddings = text_projection(text_features)
text_embeddings.shape


torch.Size([1, 256])

In [23]:
image_embeddings = image_projection(image_features)
image_embeddings.shape

torch.Size([1, 256])

In [24]:
# image_embeddings @ image_embeddings.T
# text_embeddings @ text_embeddings.T
# text_embeddings @ image_embeddings.T

In [25]:
imgTensor.shape

torch.Size([1, 3, 400, 265])

In [82]:
from transformers import (
    AutoConfig,
    AutoModel,
    AutoTokenizer,
    MMBTConfig,
    MMBTModel,
    MMBTForClassification,
    get_linear_schedule_with_warmup,
)
import clip

In [83]:
clip_model, preprocess = clip.load("ViT-B/32", device=device)
# clip_model, preprocess = clip.load("RN50x4", device=device, jit=False)

In [84]:
for p in clip_model.parameters():
    p.requires_grad = False


In [85]:
num_image_embeds = 4
image_features_size = 640

In [86]:
class ClipEncoderMulti(nn.Module):
    def __init__(self, num_embeds, num_features=image_features_size):
        super().__init__()        
        self.model = clip_model
        self.num_embeds = num_embeds
        self.num_features = num_features

    def forward(self, x):
        # 4x3x288x288 -> 1x4x640
        out = self.model.encode_image(x.view(-1,3,288,288))
        out = out.view(-1, self.num_embeds, self.num_features).float()
        return out  # Bx4x640

In [87]:
model_name = 'Hate-speech-CNERG/bert-base-uncased-hatexplain'
transformer_config = AutoConfig.from_pretrained(model_name) 
transformer = AutoModel.from_pretrained(model_name, config=transformer_config)
img_encoder = ClipEncoderMulti(num_image_embeds)


tokenizer = AutoTokenizer.from_pretrained(model_name, do_lower_case=True)

Some weights of the model checkpoint at Hate-speech-CNERG/bert-base-uncased-hatexplain were not used when initializing BertModel: ['classifier.bias', 'classifier.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [98]:
imgTensor.shape

torch.Size([1, 3, 400, 265])

In [99]:
config = MMBTConfig(transformer_config, num_labels=1, modal_hidden_size=400)
model = MMBTForClassification(config, transformer, img_encoder)

In [None]:
model.to(device)

In [101]:
encoded_query = tokenizer([text])
batch = {
        key: torch.tensor(values).to(device)
        for key, values in encoded_query.items()
    }

In [102]:
batch

{'input_ids': tensor([[ 101, 2049, 2037, 2839, 2025, 2037, 3609, 2008, 5609,  102]]),
 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]),
 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [103]:
# sentence = torch.LongTensor(tokenizer.encode(text, add_special_tokens=True))
# sentence

In [104]:
imgTensor.shape

torch.Size([1, 3, 400, 265])

In [None]:
inputs = {
    "input_ids": batch['input_ids'],
    "input_modal": imgTensor,
    "attention_mask": batch['attention_mask'],
    "return_dict": False
}
outputs = model(**inputs)


In [57]:
from transformers import CLIPProcessor, CLIPModel

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")



inputs = processor(text=[text], images=image, return_tensors="pt", padding=True)

outputs = model(**inputs)

Downloading: 100%|██████████| 4.19k/4.19k [00:00<00:00, 3.21MB/s]
Downloading: 100%|██████████| 605M/605M [00:13<00:00, 45.2MB/s] 
Downloading: 100%|██████████| 316/316 [00:00<00:00, 298kB/s]
Downloading: 100%|██████████| 568/568 [00:00<00:00, 543kB/s]
Downloading: 100%|██████████| 862k/862k [00:00<00:00, 10.3MB/s]
Downloading: 100%|██████████| 525k/525k [00:00<00:00, 7.37MB/s]
Downloading: 100%|██████████| 2.22M/2.22M [00:00<00:00, 14.0MB/s]
Downloading: 100%|██████████| 389/389 [00:00<00:00, 91.8kB/s]


In [62]:
(outputs.image_embeds).shape

torch.Size([1, 512])

In [61]:
(outputs.text_embeds).shape

torch.Size([1, 512])

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
encoded_query = tokenizer([text])