In [9]:
import pandas as pd
import numpy as np
from PIL import Image
from typing import Optional
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertTokenizer, BertModel
import open_clip

In [10]:
class MisogynyDataset(Dataset):
    def __init__(self, data, label_map, transform=None):
        self.data = data.reset_index(drop=True)
        self.label_map = label_map

        if transform is None:
            self.transform = transforms.Compose([
                transforms.Resize((224, 224)),
                transforms.ToTensor()
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((224, 224)),
                transform,
                transforms.ToTensor()
            ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image = Image.open(row["image_path"]).convert("RGB")
        image = self.transform(image)
        label = self.label_map[row["image_label"]]
        caption = row["image_caption"]
        return image, caption, label

In [11]:
class MisogynyDataLoader:
    def __init__(self, csv_file="data_csv.csv", batch_size=16, test_size=0.2, random_state=42,
                 train_transform=None, test_transform=None, num_workers=0, pin_memory=False):
        data = pd.read_csv(csv_file)
        label_map = {"kitchen":0, "shopping":1, "working":2, "leadership":3}

        train_df, test_df = train_test_split(
            data,
            test_size=test_size,
            random_state=random_state,
            shuffle=True,
            stratify=data["image_label"]
        )

        self.train_dataset = MisogynyDataset(train_df, label_map, transform=train_transform)
        self.test_dataset = MisogynyDataset(test_df, label_map, transform=test_transform)

        self.train_loader = DataLoader(self.train_dataset, batch_size=batch_size, shuffle=True,
                                       num_workers=num_workers, pin_memory=pin_memory)
        self.test_loader = DataLoader(self.test_dataset, batch_size=batch_size, shuffle=False,
                                      num_workers=num_workers, pin_memory=pin_memory)

In [12]:
class BERTEmbedder(nn.Module):
    def __init__(self):
        super().__init__()
        self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
        self.model_bert = BertModel.from_pretrained("bert-base-uncased")
        self.model_bert.eval()

    def forward(self, input_text):
        inputs = self.tokenizer(input_text, return_tensors="pt", padding=True, truncation=True)
        with torch.no_grad():
            outputs = self.model_bert(**inputs)
        token_embeddings = outputs.last_hidden_state
        attention_mask = inputs["attention_mask"]
        mask = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        sentence_embeddings = (token_embeddings * mask).sum(dim=1) / mask.sum(dim=1)
        embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
        return embeddings

In [13]:
class OpenClipVitEmbedder(nn.Module):
    def __init__(self, device=None):
        super().__init__()
        self.model, _, self.preprocess = open_clip.create_model_and_transforms(
            model_name="ViT-B-32", pretrained="openai"
        )
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.model.to(self.device)
        self.model.eval()
        for p in self.model.parameters():
            p.requires_grad = False

    def forward(self, image_tensor):
        image_tensor = image_tensor.to(self.device)
        with torch.no_grad():
            image_features = self.model.encode_image(image_tensor)
        image_features = F.normalize(image_features, p=2, dim=-1)
        return image_features

In [14]:
class PCALayer(nn.Module):
    def __init__(self, mean, components):
        super().__init__()
        mean = torch.tensor(mean, dtype=torch.float32)
        components = torch.tensor(components, dtype=torch.float32)
        self.register_buffer("mean", mean)
        self.register_buffer("weight", components)

    def forward(self, x):
        # x: (B, D)
        x = x - self.mean
        return torch.matmul(x, self.weight.T)  # (B, K)

In [15]:
def collect_embeddings(dataloader, text_model, image_model, device):
    text_embeddings = []
    image_embeddings = []

    for images, captions, _ in dataloader:
        captions = list(captions)
        with torch.no_grad():
            text_emb = text_model(captions).to("cpu")
            image_emb = image_model(images).to("cpu")
        text_embeddings.append(text_emb.numpy())
        image_embeddings.append(image_emb.numpy())

    text_embeddings = np.vstack(text_embeddings)
    image_embeddings = np.vstack(image_embeddings)
    return text_embeddings, image_embeddings

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

dataloaders = MisogynyDataLoader()
train_loader = dataloaders.train_loader

text_model = BERTEmbedder().to(device)
image_model = OpenClipVitEmbedder(device=device)

text_train_emb, image_train_emb = collect_embeddings(train_loader, text_model, image_model, device)

Loading weights: 100%|██████████| 199/199 [00:00<00:00, 2201.86it/s, Materializing param=pooler.dense.weight]                               
BertModel LOAD REPORT from: bert-base-uncased
Key                                        | Status     |  | 
-------------------------------------------+------------+--+-
cls.predictions.transform.dense.weight     | UNEXPECTED |  | 
cls.predictions.transform.LayerNorm.weight | UNEXPECTED |  | 
cls.predictions.bias                       | UNEXPECTED |  | 
cls.seq_relationship.weight                | UNEXPECTED |  | 
cls.predictions.transform.dense.bias       | UNEXPECTED |  | 
cls.predictions.transform.LayerNorm.bias   | UNEXPECTED |  | 
cls.seq_relationship.bias                  | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


In [None]:
pca_text = PCA(n_components=400, svd_solver="full")
pca_text.fit(text_train_emb)
pca_image = PCA(n_components=400, svd_solver="full")
pca_image.fit(image_train_emb)

np.save("bert_pca_mean.npy", pca_text.mean_)
np.save("bert_pca_components.npy", pca_text.components_)
np.save("clip_pca_mean.npy", pca_image.mean_)
np.save("clip_pca_components.npy", pca_image.components_)

In [20]:
bert_pca_layer = PCALayer(pca_text.mean_, pca_text.components_)
clip_pca_layer = PCALayer(pca_image.mean_, pca_image.components_)

In [21]:
for images, captions, _ in train_loader:
    text_emb = text_model(captions)
    text_emb_pca = bert_pca_layer(text_emb)

    image_emb = image_model(images)
    image_emb_pca = clip_pca_layer(image_emb)

    print("Text PCA shape:", text_emb_pca.shape)     # (B, 400)
    print("Image PCA shape:", image_emb_pca.shape)   # (B, 400)
    break

Text PCA shape: torch.Size([16, 400])
Image PCA shape: torch.Size([16, 400])
