In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision import models
from transformers import BertTokenizer, VisualBertModel
from PIL import Image
import os
import warnings
warnings.filterwarnings("ignore")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
train_df = pd.read_json("../data/facebook/train.json")
dev_df = pd.read_json("../data/facebook/dev.json")
train_df.head()

Unnamed: 0,id,img,text,label
0,42953,train/non_hateful/42953.png,it their charact not their color that matter,0
1,23058,train/non_hateful/23058.png,dont be afraid to love again everyon is not li...,0
2,13894,train/non_hateful/13894.png,put bow on your pet,0
3,37408,train/non_hateful/37408.png,i love everyth and everybodi except for squirr...,0
4,82403,train/non_hateful/82403.png,everybodi love chocol chip cooki even hitler,0


In [7]:
# Some global variables
BATCH_SIZE = 64
EPOCHS = 10
ROOT_PATH = '../data/facebook'
IMAGE_SIZE = 224*224
NUM_CLASSES = 2
TEXTUAL_DIMENSION = 512
VISUAL_DIMENSION = 512
DEVICE = torch.device('cuda' if torch.cuda.is_available() else'cpu')

In [4]:
# Initialize the dataset and maintain the dataloader
class DynamicDataset(Dataset):
    def __init__(self, json_path, transform = None):
        self.df = pd.read_json(json_path)
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        img_path = self.df.loc[index, 'img']
        img_file = os.path.join(ROOT_PATH, img_path)
        image = Image.open(img_file).convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        
        text = self.df.loc[index, 'text']
        if 'label' not in self.df.columns:
            return image, text
        label = self.df.loc[index, 'label']

        return image ,text, label

In [5]:
# Define a transform function for image preprocessing
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Create objects of each set of data
train_data = DynamicDataset(os.path.join(ROOT_PATH, 'train.json'), transform = transform)
dev_data = DynamicDataset(os.path.join(ROOT_PATH, 'dev.json'), transform = transform)

# Create a dataloader
train_loader = DataLoader(train_data, batch_size = BATCH_SIZE, shuffle = True)
dev_loader = DataLoader(dev_data, batch_size = BATCH_SIZE, shuffle = True)

In [6]:
# Define tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

You are using a model of type bert to instantiate a model of type visual_bert. This is not supported for all configurations of models and can yield errors.
Some weights of the model checkpoint at bert-base-uncased were not used when initializing VisualBertModel: ['bert.encoder.layer.1.attention.output.LayerNorm.weight', 'bert.encoder.layer.6.attention.self.key.weight', 'bert.encoder.layer.8.attention.output.LayerNorm.bias', 'bert.encoder.layer.7.intermediate.dense.bias', 'bert.encoder.layer.11.attention.output.LayerNorm.weight', 'bert.encoder.layer.3.attention.output.dense.weight', 'bert.encoder.layer.4.attention.self.key.weight', 'bert.encoder.layer.2.attention.self.value.weight', 'bert.encoder.layer.5.attention.self.key.bias', 'bert.encoder.layer.11.attention.output.dense.weight', 'bert.encoder.layer.9.attention.self.value.bias', 'bert.encoder.layer.6.attention.self.query.bias', 'bert.encoder.layer.3.output.LayerNorm.weight', 'bert.encoder.layer.3.output.dense.weight', 'bert.encoder.

In [8]:
class Embeddings(nn.Module):
    def __init__(self):
        super().__init__()

        # Visual Bert for extracting textual features
        visual_bert = VisualBertModel.from_pretrained('uclanlp/visualbert-vqa')
        self.visual_bert = visual_bert

        # Dense Layer for reducing the dimension
        dense_layers = nn.Sequential(
            nn.Linear(768, 512),
            nn.ReLU(),
        )
        self.dense_layers = dense_layers

        # ResNet50 for extracting visual features
        resnet50 = models.resnet50(weights = models.ResNet50_Weights.DEFAULT)
        self.resnet50 = nn.Sequential(*list(resnet50.children())[:-1])

        # Conv Layer for reducing the dimension
        convolution_layers = nn.Sequential(
            nn.Conv2d(2048, 1024, kernel_size = (3, 3), stride = (1, 1), padding = (1, 1)),
            nn.ReLU(),
            nn.Conv2d(1024, 512, kernel_size = (3, 3), stride = (1, 1), padding = (1, 1)),
            nn.ReLU(),
        )
        self.convolution_layers = convolution_layers
    
    def textual_features(self, texts):
        # Generate tokens for input ids and attention mask
        inputs = tokenizer.batch_encode_plus(texts, padding=True, truncation=True, return_tensors='pt')
        input_ids = inputs['input_ids'].to(DEVICE)
        attention_mask = inputs['attention_mask'].to(DEVICE)

        # Extract features from texts
        textual_features = self.dense_layers(self.visual_bert(input_ids = input_ids, attention_mask = attention_mask))
        textual_features = textual_features.last_hidden_state[:, 0, :]

        return textual_features

    def visual_features(self, images):
        # Extract features from images
        visual_features = self.convolution_layers(self.resnet50(images))
        visual_features = visual_features.view(visual_features.size(0), -1)

        return visual_features

In [11]:
class Fusion(nn.Module):
    def __init__(self):
        super().__init__()
        
        # Initialize embeddings
        self.embeddings = Embeddings()

        # Define Fusion layers
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 2)

    def forward(self, images, texts):
        # Get visual and textual features
        visual_features = self.embeddings.visual_features(images)
        textual_features = self.embeddings.textual_features(texts)

        # Concatenate visual and textual features
        features = torch.cat((visual_features, textual_features), dim = 1)

        # Apply fusion layers
        features = self.fc1(features)
        output = self.fc2(features)

        return output

In [12]:
# Define model, loss function and optimizer
model = Fusion().to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.01)

Some weights of the model checkpoint at uclanlp/visualbert-vqa were not used when initializing VisualBertModel: ['cls.bias', 'cls.weight']
- This IS expected if you are initializing VisualBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing VisualBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
