In [None]:
import torch

from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import os
from PIL import Image
from sklearn.utils import class_weight
from datasets import list_metrics, load_metric
from transformers import BertTokenizer, VisualBertForPreTraining, AutoTokenizer
from transformers import Swinv2Model,AutoImageProcessor

## Load in Dataset

In [None]:
path = "/home/jramos/Documents/OMSCS/CS-7643 Deep Learning/Project/Hateful-Memes/data/"
df = pd.read_csv(path + 'data.csv')
df_train = df[:8500]
df_val = df[8500:9540]
df_test = df[9540:]

In [None]:
df_train['text_len'] = df_train['text'].str.split().str.len()
df_train['idx'] = df_train['id'].astype(str).str.zfill(5)
df_val['idx'] = df_val['id'].astype(str).str.zfill(5)
df_test['idx'] = df_test['id'].astype(str).str.zfill(5)

## Compute Class Weight

In [None]:

y_train = df_train["label"].values.tolist()
class_weights = class_weight.compute_class_weight(class_weight ='balanced',
                                                 classes = np.unique(y_train),
                                                 y = y_train)
print(class_weights)

## Load Metrics

In [None]:

metrics_list = list_metrics()
print(metrics_list)

In [None]:
acc_metric = load_metric('accuracy')
f1_metric = load_metric('f1')
precision_metric = load_metric('precision')
recall_metric = load_metric('recall')

## Load Models

In [None]:

tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = "microsoft/swinv2-base-patch4-window12-192-22k"

feature_extractor = AutoImageProcessor.from_pretrained(model)
feature_model = Swinv2Model.from_pretrained(model).to('cuda')

## Create Dataset function

In [None]:

class DatasetBuilder(Dataset):
    def __init__(self, df, tokenizer, sequence_length, 
                 print_text=False):         

        self.sequence_length = sequence_length
        self.tokenizer = tokenizer
        self.print_text = print_text

        texts = df["text"].values.tolist()
        labels = df["label"].values.tolist()
        images = df["img"].values.tolist()
        ids =  df["idx"].values.tolist()

        self.dataset = []
        for i, inp in enumerate(texts):
            self.dataset.append({"text": inp, "label": labels[i], 'idx': ids[i], 'image': images[i]})
  
    def __len__(self):
        return len(self.dataset)


    def tokenize_data(self, example):
   
        idx = example['idx']
        idx = [idx] if isinstance(idx, str) else idx
        encoded_dict = self.tokenizer(example['text'], padding='max_length', max_length=self.sequence_length, truncation=True, return_tensors='pt')
        tokens = encoded_dict['input_ids']
        token_type_ids = encoded_dict['token_type_ids']
        attn_mask = encoded_dict['attention_mask']
        
        targets = torch.tensor(example['label']).type(torch.int64)

        ## Get Visual Embeddings
        try:
            img = example['image']
            img = Image.open(os.path.join('hateful_memes', img))
            img = np.array(img)
            img = img[...,:3]
            inputs = feature_extractor(images=img, return_tensors="pt")
            outputs = feature_model(**inputs.to('cuda'))
            visual_embeds = outputs.last_hidden_state
            visual_embeds = visual_embeds.cpu()
        except:
            
            visual_embeds = np.zeros(shape=(36, 1024), dtype=float)


        visual_attention_mask = torch.ones(visual_embeds.shape[:-1], dtype=torch.int64)
        visual_token_type_ids = torch.ones(visual_embeds.shape[:-1], dtype=torch.int64)

        inputs={"input_ids": tokens.squeeze(),
            "attention_mask": attn_mask.squeeze(),
            "token_type_ids": token_type_ids.squeeze(),
            "visual_embeds": visual_embeds.squeeze(),
            "visual_token_type_ids": visual_token_type_ids.squeeze(),
            "visual_attention_mask": visual_attention_mask.squeeze(),
            "label": targets.squeeze()
        }
        
        return inputs
  
    def __getitem__(self, index):
        inputs = self.tokenize_data(self.dataset[index])
        
        if self.print_text:
            for k in inputs.keys():
                print(k, inputs[k].shape, inputs[k].dtype)

        return inputs

In [None]:
dataset = DatasetBuilder(df_val, tokenizer, 50, True)

## Tuning using Pytorch Lightning

In [None]:
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader
from pytorch_lightning.loggers import WandbLogger
from datasets import load_metric
import torch.nn.functional as F
from sklearn.metrics import accuracy_score
from transformers import (
    AdamW,
    VisualBertModel,
    get_linear_schedule_with_warmup
)
import logging
import argparse
import time
from torch.nn import CrossEntropyLoss
from sklearn.metrics import roc_auc_score

from transformers import BertTokenizer, VisualBertModel, TrainingArguments, Trainer, VisualBertConfig


In [None]:
# from pytorch_lightning.loggers.wandb import WandbLogger
import os
from pathlib import Path
from string import punctuation
import torch.nn as nn

## VisualBERT Model

In [None]:
from torch.autograd import grad

class VisualBERT(torch.nn.Module):
    def __init__(self):
        """
        Steps:
           1. Initialize pre-trained VisualBERT model.
           2. Initialize two nn.Linear layers to get desired tensor outputs.
           3. Initialize weights from weighted mean: [0.77510622, 1.40873991]

        """
        super(VisualBERT, self).__init__()
        configuration = VisualBertConfig.from_pretrained('uclanlp/visualbert-nlvr2-coco-pre',
                                                hidden_dropout_prob=0.1, attention_probs_dropout_prob=0.1)
        self.visualbert = VisualBertModel.from_pretrained('uclanlp/visualbert-nlvr2-coco-pre', config=configuration)
        self.embed_cls = nn.Linear(1024, 1024)
        # self.visualbert = VisualBertModel.from_pretrained('uclanlp/visualbert-nlvr2-coco-pre')
        self.num_labels = 2
        self.dropout = nn.Dropout(0.3)
        self.cls=  nn.Linear(768, self.num_labels)
        self.weight = torch.FloatTensor([class_weights]) #torch.FloatTensor([0.77510622, 1.40873991]),

        nSamples = [5178, 2849]
        normedWeights = [1 - (x / sum(nSamples)) for x in nSamples]
        self.loss_fct = CrossEntropyLoss(weight=torch.FloatTensor(normedWeights))
        
    
    def forward(self, input_ids, attention_mask, token_type_ids, visual_embeds, visual_attention_mask,
                visual_token_type_ids, labels):
        visual_embeds_cls = self.embed_cls(visual_embeds)
        #print('Input id Size: ',input_ids.shape)
        ##print('Attention Size Mask: ',attention_mask.shape)
        #print('Token Type id Size: ',token_type_ids.shape)
        #print('visual_embeds_cls Size: ',visual_embeds_cls.shape)
        #print('visual attention mask Size: ', visual_attention_mask.shape)
        #print('Visual Token Type id Size: ',visual_token_type_ids.shape) 




        outputs = self.visualbert(
                input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids,
                visual_embeds=visual_embeds_cls,
                visual_attention_mask=visual_attention_mask,
                visual_token_type_ids=visual_token_type_ids,
            )
        
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.cls(pooled_output)
        reshaped_logits = logits.view(-1, self.num_labels)

        loss = self.loss_fct(reshaped_logits, labels.view(-1))
      
        #print('Current Loss: ',loss)
        #print('Current Loss Size: ',loss.shape)
        #print('Current Loss Size: ',len(loss))
        #print(pooled_output.shape)
        #print('Last Hidden State Size: ',outputs.last_hidden_state.shape)
        #print('Visual Embedding Size: ',visual_embeds_cls.shape)
        if pooled_output.requires_grad:

            #print(outputs)
            perturbed_sentence = self.adv_attack(outputs.last_hidden_state, loss)
            #print('Perturbed Sentence: ',perturbed_sentence.shape)
            perturbed = self.adv_attack(visual_embeds_cls, loss)
            #print('Perturbed: ',perturbed.shape)
            mask =(attention_mask, visual_attention_mask)
            #print(mask.shape)
            perturbed = (perturbed_sentence, perturbed)
            #print('Attention Mask Size: ',attention_mask.shape)
            adv_loss = self.adversarial_loss(perturbed, input_ids,visual_embeds_cls, attention_mask, mask, labels, token_type_ids,visual_token_type_ids)
            loss = loss + adv_loss
            #print('Perturbed Loss Size: ',loss.shape)
        

        return loss, reshaped_logits
    
    def adv_attack(self, emb, loss, epsilon=0.05):
        loss_grad = grad(loss, emb, retain_graph=True)[0]
        #print("Loss grad Size: ",loss_grad.shape)
        loss_grad_norm = torch.sqrt(torch.sum(loss_grad**2, (1,2)))
        perturbed_sentence = emb + epsilon * (loss_grad/(loss_grad_norm.reshape(-1,1,1)))
        
        return perturbed_sentence

    def adversarial_loss(self, perturbed, input_ids, visual_embeds_cls, attention_mask, mask, labels, token_type_ids,visual_token_type_ids):
        #print(perturbed.shape)
        #print('\n\n===================================================================')
        #print('Before')
        #print('Adversarial Input Embed Size: ',input_ids.shape)
        #print('Adversarial Attention Mask Size: ',attention_mask.shape)
        #print('Adversarial Token Type Size: ',token_type_ids.shape)
        #print('Adversarial Visual Embed Size: ',perturbed[1].shape)
        #print('Adversarial Visual Attention Mask Size: ',mask[1].shape)
        #print('Adversarial Visual Token Type Size: ',visual_token_type_ids.shape)
        #print('Input id Size: ',perturbed[0][:, 0:512, :].shape)
        #print('Perturbed 1: ',perturbed[1].shape)
        #print('Perturbed Alt: ',perturbed[0][:, -36:, :].shape)
        out = self.visualbert(
                #input_ids = input_ids,
                inputs_embeds = perturbed[0][:, :50, :],
                attention_mask=attention_mask
                ,token_type_ids=token_type_ids
                ,visual_embeds = perturbed[1]
                ,visual_attention_mask=mask[1]
                ,visual_token_type_ids=visual_token_type_ids
            )
        

        #print('After')



        encoded_layers_last = out['pooler_output']
        encoded_layers_last = self.dropout(encoded_layers_last)
        logits = self.cls(encoded_layers_last)
        loss_fct = torch.nn.CrossEntropyLoss(ignore_index=-1)
        adv_loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        return adv_loss



In [None]:
model = VisualBERT().to('cuda')

## HuggingFace Trainer 

In [None]:
from transformers import TrainingArguments, Trainer
batch_size = 24
seq_len = 50


metric_name = "auroc"

args = TrainingArguments(
    output_dir = "OUTPUT_DIRECTORY",
    seed = 4, 
    evaluation_strategy = "steps",
    learning_rate=1e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs= 100,
    weight_decay=0.05,
    load_best_model_at_end=True,
    metric_for_best_model=metric_name,
    eval_steps = 50,
    save_steps = 500,
    fp16 = False,
    gradient_accumulation_steps = 2
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metric.compute(predictions=predictions, references=labels)
    precision = precision_metric.compute(predictions=predictions, references=labels)
    recall = recall_metric.compute(predictions=predictions, references=labels)
    auc_score = roc_auc_score(labels, predictions)
    
    return {"accuracy": acc['accuracy'], "auroc": auc_score,'f1':f1['f1'],'precision':precision['precision'],'recall':recall['recall']} 


trainer = Trainer(
    model,
    args,
    train_dataset = DatasetBuilder(df_train,tokenizer=tokenizer, sequence_length=seq_len),
    eval_dataset =  DatasetBuilder(df_val,tokenizer=tokenizer, sequence_length=seq_len),
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

resume_from = 'CHECKPOINT_LOCATION'
#trainer.train(resume_from)
trainer.train()
trainer.evaluate()

In [None]:
trainer.save_model('MODEL_SAVE_LOCATION')