In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Kaggle notebook
This is the original link of our kaggle notebook that you can find every training that we are running following with the logs and output: https://www.kaggle.com/code/thanhduycao/ediss-ds-lab3.

To view the results, you click the "Show versions" right next to "Save Version" section and you can found it.

## Dataset Import
First, you need to import a dataset and a info file from Kaggle Dataset.
The dataset and info file from Kaggle Dataset are as follow:
- https://www.kaggle.com/datasets/duycaothanh/harmeme/data
- https://www.kaggle.com/datasets/duycaothanh/harmeme-info-revised/data

Upload this notebook to Kaggle and search for the data above within the Input section and Kaggle will automatically download it for you.

In [None]:
!git clone https://github.com/SiddhantBikram/MemeCLIP.git

In [None]:
!pip install pytorch_lightning openai-clip yacs

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/MemeCLIP.py
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torchmetrics
from clip import clip
from tqdm import tqdm
import os
from functools import partial
import torch.nn.functional as F
from transformers import AutoTokenizer
torch.set_default_dtype(torch.float32)
from models import LinearClassifier, CosineClassifier, LinearProjection, CLIP_Text, Adapter

class MemeCLIP(pl.LightningModule):

    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg

        self.acc = torchmetrics.Accuracy(task='multiclass', num_classes = cfg.num_classes)
        self.auroc = torchmetrics.AUROC(task='multiclass', num_classes = cfg.num_classes)
        self.f1 = torchmetrics.F1Score(task='multiclass', num_classes = cfg.num_classes, average='macro')

        self.clip_model, _ = clip.load(self.cfg.clip_variant, device="cuda", jit=False)
        self.clip_model.float()

        pre_output_input_dim = self.cfg.map_dim
        pre_output_layers = [nn.Dropout(p=cfg.drop_probs[1])]
        output_input_dim = pre_output_input_dim

        self.classifier = CosineClassifier(feat_dim = output_input_dim, num_classes=cfg.num_classes, dtype=self.clip_model.dtype)
        self.init_head_text_feat()
        self.text_encoder =  CLIP_Text(self.clip_model)
        self.img_adapter = Adapter(self.cfg.map_dim, 4).to(self.clip_model.dtype)
        self.text_adapter = Adapter(self.cfg.map_dim, 4).to(self.clip_model.dtype)
        self.clip_model.visual.proj = None

        for _, p in self.clip_model.named_parameters():
            p.requires_grad_(False)
        
        for name, param in self.classifier.named_parameters():
            param.requires_grad_(True)

        self.image_map = LinearProjection(self.cfg.unmapped_dim, self.cfg.map_dim,
                                          self.cfg.num_mapping_layers, self.cfg.drop_probs)
        self.text_map = LinearProjection(self.cfg.unmapped_dim, self.cfg.map_dim,
                                         self.cfg.num_mapping_layers, self.cfg.drop_probs)
        
        self.soft = nn.Softmax(dim=1)
            
        if self.cfg.num_pre_output_layers >= 1:
            pre_output_layers.extend(
                [nn.Linear(pre_output_input_dim, self.cfg.map_dim), nn.ReLU(), nn.Dropout(p=cfg.drop_probs[2])])
            output_input_dim = self.cfg.map_dim

        for _ in range(1, self.cfg.num_pre_output_layers):
            pre_output_layers.extend(
                [nn.Linear(self.cfg.map_dim, self.cfg.map_dim), nn.ReLU(), nn.Dropout(p=cfg.drop_probs[2])])

        self.pre_output = nn.Sequential(*pre_output_layers)
        self.cross_entropy_loss = torch.nn.CrossEntropyLoss(reduction='mean')

    def forward(self, batch):
        pass
    
    def init_head_text_feat(self):

        print("Initialize head with text features")
        template = "a photo of a {}."
        prompts = [template.format(c.replace("_", " ")) for c in self.cfg.class_names]
        prompts = clip.tokenize([p for p in prompts], context_length=77, truncate=True).to(self.cfg.device)
        text_features = self.clip_model.encode_text(prompts)
        text_features = F.normalize(text_features, dim=-1)
        text_features = text_features @ self.clip_model.visual.proj.t()
        text_features = F.normalize(text_features, dim=-1)
        self.classifier.apply_weight(text_features)

    def common_step(self, batch):

        image_embeds = batch['image_features']
        text_embeds = batch['text_features']

        image_projection = self.image_map(image_embeds)
        txt_projection = self.text_map(text_embeds)

        image_features = self.img_adapter(image_projection)
        text_features = self.text_adapter(txt_projection)

        text_features = self.cfg.ratio  * text_features + (1 - self.cfg.ratio ) * txt_projection
        image_features = self.cfg.ratio  * image_features + (1 - self.cfg.ratio ) * image_projection

        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        features = torch.mul(image_features, text_features)

        features_pre_output = self.pre_output(features)
        logits = self.classifier(features_pre_output).squeeze(dim=1) 
        preds_proxy = torch.sigmoid(logits)
        _ , preds = logits.data.max(1)

        output = {}
        output['loss'] = self.cross_entropy_loss(logits, batch['labels'])
        output['accuracy'] = self.acc(preds, batch['labels'])
        output['auroc'] = self.auroc(preds_proxy, batch['labels'])
        output['f1'] = self.f1(preds, batch['labels'])

        return output
    
    def training_step(self, batch, batch_idx):
        output = self.common_step(batch)

        total_loss = output['loss']

        self.log('train/total_loss', total_loss)
        self.log('train/loss', output['loss'])
        self.log('train/accuracy', output['accuracy'])
        self.log(f'train/auroc', output['auroc'], on_step=False, on_epoch=True, prog_bar=True)

        return total_loss

    def validation_step(self, batch, batch_idx):
        output = self.common_step(batch)

        total_loss = output['loss']

        self.log(f'val/total_loss', total_loss)
        self.log(f'val/loss', output['loss'])
        self.log(f'val/accuracy', output['accuracy'], on_step=False, on_epoch=True, prog_bar=True)
        self.log(f'val/auroc', output['auroc'], on_step=False, on_epoch=True, prog_bar=True)
        self.log(f'val/f1', output['f1'], on_step=False, on_epoch=True, prog_bar=True)


        return total_loss

    def test_step(self, batch, batch_idx):

        output = self.common_step(batch)
        self.log(f'test/accuracy', output['accuracy'])
        self.log(f'test/auroc', output['auroc'])
        self.log(f'test/f1', output['f1'])

        return output

    def on_train_epoch_end(self):
        self.acc.reset()
        self.auroc.reset()
        self.f1.reset()
        
    def on_validation_epoch_end(self):
        self.acc.reset()
        self.auroc.reset()
        self.f1.reset()

    def on_test_epoch_end(self):
        self.acc.reset()
        self.auroc.reset()
        self.f1.reset()

    def configure_optimizers(self):
        param_dicts = [
            {"params": [p for n, p in self.named_parameters() if p.requires_grad]}
        ]
        optimizer = torch.optim.AdamW(param_dicts, lr=self.cfg.lr, weight_decay=self.cfg.weight_decay)

        return optimizer

def create_model(cfg):
    model = MemeCLIP(cfg)
    return model


In [None]:
%%writefile /kaggle/working/MemeCLIP/code/datasets.py
import os
import pandas as pd
import torch
import clip

from PIL import Image
from torch.utils.data import Dataset
from configs import cfg
from transformers import AutoProcessor, CLIPVisionModel


os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
torch.set_default_dtype(torch.float32)

from configs import cfg

class Custom_Dataset(Dataset):
    def __init__(self, cfg, root_folder, dataset, label, split='train', image_size=224):
        super(Custom_Dataset, self).__init__()
        self.cfg = cfg
        self.root_folder = root_folder
        self.dataset = dataset
        self.split = split
        self.label = label

        self.image_size = image_size

        self.info_file = cfg.info_file
        self.df = pd.read_csv(self.info_file)
        self.df = self.df[self.df['split'] == self.split].reset_index(drop=True)

        if self.label == 'target':
            self.df = self.df[self.df['harm'] >= 1].reset_index(drop=True)

        float_cols = self.df.select_dtypes(float).columns
        self.df[float_cols] = self.df[float_cols].fillna(-1).astype('Int64')

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        if row['text'] == 'None':
            txt = 'null'
        else:
            txt = row['text']

        image_fn = row['name']
        image = Image.open(f"{self.cfg.img_folder}/{image_fn}").convert('RGB')\
            .resize((self.image_size, self.image_size))
        text = txt

        item = {
            'image': image,
            'text': text,
            'label': row[self.label],
            'idx_meme': row['name'],
            'origin_text': txt
        }

        return item

class Custom_Collator(object):
    def __init__(self, cfg):
        self.cfg = cfg
        self.clip_model, _ = clip.load(self.cfg.clip_variant, device="cuda", jit=False)
        _, self.clip_preprocess = clip.load(self.cfg.clip_variant, device="cuda", jit=False)
        self.clip_model.float().eval()

    def __call__(self, batch):
        labels = torch.LongTensor([item['label'] for item in batch])
        idx_memes = [item['idx_meme'] for item in batch]

        batch_new = {'labels': labels,
                     'idx_memes': idx_memes,
                     }
        
        image_embed_list = []
        text_embed_list = []

        for item in batch:

            pixel_values = self.clip_preprocess(item['image']).unsqueeze(0)
            text = clip.tokenize(item['text'], context_length=77, truncate=True)

            image_features, text_features = self.compute_CLIP_features_without_proj(self.clip_model,
                                                                    pixel_values.to(self.cfg.device),
                                                                    text.to(self.cfg.device))
            text_embed_list.append(text_features.cpu().detach())
            image_embed_list.append(image_features.cpu().detach())

        image_features = torch.cat([item for item in image_embed_list], dim=0)
        text_features = torch.cat([item for item in text_embed_list], dim=0)

        batch_new['image_features'] = image_features
        batch_new['text_features'] = text_features

        return batch_new
    
    def compute_CLIP_features_without_proj(self, clip_model, img_input, text_input):
        image_features = clip_model.visual(img_input.type(clip_model.dtype))

        x = clip_model.token_embedding(text_input).type(clip_model.dtype)
        x = x + clip_model.positional_embedding.type(clip_model.dtype)
        x = x.permute(1, 0, 2)
        x = clip_model.transformer(x)
        x = x.permute(1, 0, 2)
        x = clip_model.ln_final(x).type(clip_model.dtype)
        text_features = x[torch.arange(x.shape[0]), text_input.argmax(dim=-1)]

        return image_features, text_features


def load_dataset(cfg, split):
    dataset = Custom_Dataset(cfg = cfg, root_folder=cfg.root_dir, dataset=cfg.dataset_name, split=split,
                           image_size=cfg.image_size, label = cfg.label)

    return dataset


## Harm Classification Seed 42

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_harm_42')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'harm'
cfg.seed = 42
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True


In [None]:
!python /kaggle/working/MemeCLIP/code/main.py

## Harm Classification Seed 100

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_harm_100')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'harm'
cfg.seed = 100
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True

In [None]:
!python /kaggle/working/MemeCLIP/code/main.py

## Harm Classification Seed 510

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_harm_510')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'harm'
cfg.seed = 510
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True

In [None]:
!python /kaggle/working/MemeCLIP/code/main.py

## Target Classification Seed 42

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_target_42')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'target'
cfg.seed = 42
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True

In [None]:
!python /kaggle/working/MemeCLIP/code/main.py

## Target Classification Seed 100

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_target_100')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'target'
cfg.seed = 100
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True

In [None]:
!python /kaggle/working/MemeCLIP/code/main.py

## Target Classification Seed 510

In [None]:
%%writefile /kaggle/working/MemeCLIP/code/configs.py
import os
from yacs.config import CfgNode 

cfg = CfgNode()
cfg.root_dir = '/kaggle/working/'
cfg.img_folder = '/kaggle/input/harmeme/harmeme/images'
cfg.info_file = '/kaggle/input/harmeme-info-revised/merged_split_labels_numeric.csv'
cfg.checkpoint_path = os.path.join(cfg.root_dir, 'checkpoints_target_510')
cfg.checkpoint_file = os.path.join(cfg.checkpoint_path,'model.ckpt')

cfg.clip_variant = "ViT-L/14"
cfg.dataset_name = 'Harm'
cfg.name = 'MemeCLIP' 
cfg.label = 'target'
cfg.seed = 510
cfg.test_only = False
cfg.device = 'cuda'
cfg.gpus = [0]

if cfg.label =='hate':
    cfg.class_names = ['Benign Meme', 'Harmful Meme']
elif cfg.label == 'humour':
    cfg.class_names = ['No Humour', 'Humour']
elif cfg.label == 'target':
    cfg.class_names = ['Society', 'Individual', 'Community', 'Organization']
elif cfg.label == 'stance':
    cfg.class_names = ['Neutral', 'Support', 'Oppose']
elif cfg.label == 'harm':
    cfg.class_names = ['Not Harm', 'Somewhat Harmful', 'Very Harmful']
  
cfg.batch_size = 16
cfg.image_size = 224
cfg.reproduce = False
cfg.num_mapping_layers = 1
cfg.unmapped_dim = 768
cfg.map_dim = 1024
cfg.num_pre_output_layers = 1
cfg.drop_probs = [0.1, 0.4, 0.2]
cfg.lr = 1e-4 # 1e-5, 3e-4, 1e-3
cfg.max_epochs = 10
cfg.ratio = 0.2
cfg.weight_decay = 1e-4
cfg.num_classes = len(cfg.class_names)
cfg.scale = 30 
cfg.print_model = True

In [None]:
!python /kaggle/working/MemeCLIP/code/main.py