In [1]:
import os
import copy
import numpy as np
import pandas as pd
from PIL import Image, ImageFile

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.model_selection import train_test_split

from transformers import CLIPProcessor, CLIPModel


  from .autonotebook import tqdm as notebook_tqdm


In [2]:

#  We allow truncated images just in case
ImageFile.LOAD_TRUNCATED_IMAGES = True

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)


# 2) Load & Clean Data
CSV_PATH = "./memotion_dataset_7k/labels.csv"
IMAGES_DIR = "./memotion_dataset_7k/images"

df = pd.read_csv(CSV_PATH)
df = df[["image_name", "text_corrected", "offensive"]].dropna(subset=["text_corrected", "offensive"])
df["text_corrected"] = df["text_corrected"].astype(str)
df = df[df["text_corrected"].str.strip() != ""]

# Verify images, skip any fully corrupt
bad_images = 0
valid_indices = []
for i, row in df.iterrows():
    img_path = os.path.join(IMAGES_DIR, row["image_name"])
    try:
        with Image.open(img_path) as im:
            im.verify()
        valid_indices.append(i)
    except:
        bad_images += 1
df = df.loc[valid_indices].reset_index(drop=True)
print(f"Skipped {bad_images} corrupt images. Valid images: {len(df)}")

Using device: cuda
Skipped 1 corrupt images. Valid images: 6986


In [3]:
df.head()

Unnamed: 0,image_name,text_corrected,offensive
0,image_1.jpg,LOOK THERE MY FRIEND LIGHTYEAR NOW ALL SOHALIK...,not_offensive
1,image_2.jpeg,The best of #10 YearChallenge! Completed in le...,not_offensive
2,image_3.JPG,Sam Thorne @Strippin ( Follow Follow Saw every...,not_offensive
3,image_4.png,10 Year Challenge - Sweet Dee Edition,very_offensive
4,image_5.png,10 YEAR CHALLENGE WITH NO FILTER 47 Hilarious ...,very_offensive


In [4]:
# 3) Merge 4 Original Classes into2 (offensive, not_offensive)

def merge_offensive(label):
    if label in ["slight", "very_offensive",'hateful_offensive']:
        return "offensive"
    else:
        return "not_offensive"
# 合并标签
df["merge_offensive"] = df["offensive"].apply(merge_offensive)
label2id = {"not_offensive": 0, "offensive": 1}
df["label"] = df["merge_offensive"].map(label2id)
print("Merged distribution:")
print(df["merge_offensive"].value_counts(normalize=True))


Merged distribution:
merge_offensive
offensive        0.612081
not_offensive    0.387919
Name: proportion, dtype: float64


In [5]:
# 4) Stratified Split

train_df, test_df = train_test_split(
    df, test_size=0.2, stratify=df["label"], random_state=42
)
val_df, test_df = train_test_split(
    test_df, test_size=0.5, stratify=test_df["label"], random_state=42
)

print("Train/Val/Test sizes:", len(train_df), len(val_df), len(test_df))

Train/Val/Test sizes: 5588 699 699


In [6]:
# 下载模型
from huggingface_hub import snapshot_download

clip_model_name = "openai/clip-vit-base-patch32"
local_dir = "D:/clip-vit-base-patch32"  
snapshot_download(repo_id=clip_model_name, local_dir=local_dir)

Fetching 12 files: 100%|██████████| 12/12 [06:25<00:00, 32.09s/it]


'D:\\clip-vit-base-patch32'

In [7]:
# 5) CLIP Processor

# We'll use the openai/clip-vit-base-patch32 variant 
 
processor = CLIPProcessor.from_pretrained(local_dir)
# This processor will handle image transforms + text tokenization automatically
# We'll apply it inside our custom dataset.


In [8]:
###############################################################################
# 6) Custom Dataset
###############################################################################
class CLIPMemotionDataset(Dataset):
    """
    Each item returns:
      - pixel_values: The transformed image tensor
      - input_ids, attention_mask: tokenized text
      - label: the sentiment class
    """
    def __init__(self, dataframe, images_dir, processor, max_length=77):
        self.df = dataframe.reset_index(drop=True)
        self.images_dir = images_dir
        self.processor = processor
        self.max_length = max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.images_dir, row["image_name"])
        image = Image.open(img_path).convert("RGB")
        text = row["text_corrected"]
        label = torch.tensor(row["label"], dtype=torch.long)

        # The CLIP processor can handle both images & text in a single call,
        # but we'll call it separately for clarity. We'll do them in one go:
        encoded = self.processor(
            text=[text],
            images=[image],
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        # encoded is a dict with keys: pixel_values, input_ids, attention_mask
        # shape: (batch=1, channels/HW or tokens)

        # We'll squeeze out batch=1 dimension so we can return plain tensors
        pixel_values = encoded["pixel_values"].squeeze(0)       # [3, 224, 224]
        input_ids = encoded["input_ids"].squeeze(0)             # [max_length]
        attention_mask = encoded["attention_mask"].squeeze(0)   # [max_length]

        return {
            "pixel_values": pixel_values,
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "label": label
        }

train_dataset = CLIPMemotionDataset(train_df, IMAGES_DIR, processor)
val_dataset   = CLIPMemotionDataset(val_df,   IMAGES_DIR, processor)
test_dataset  = CLIPMemotionDataset(test_df,  IMAGES_DIR, processor)

from torch.utils.data import DataLoader

BATCH_SIZE = 8
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False)


In [10]:
###############################################################################
# 7) CLIP Classification Model
###############################################################################
class CLIPClassifier(nn.Module):
    """
    Wraps a CLIPModel and adds a small classifier for 2-class.
    We'll:
      - get image_embeds from model outputs
      - get text_embeds from model outputs
      - combine them, then pass through a small feedforward
    """
    def __init__(self, model_name, num_labels=2, freeze_clip=False):
        super().__init__()
        self.clip_model = CLIPModel.from_pretrained(model_name)
        self.num_labels = num_labels

        # Optionally freeze entire CLIP to reduce memory usage & avoid large updates
        if freeze_clip:
            for param in self.clip_model.parameters():
                param.requires_grad = False

        embed_dim = self.clip_model.config.projection_dim * 2  # e.g., 512 + 512 = 1024
        # Add a small classifier head
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_labels)
        )

    def forward(self, pixel_values, input_ids, attention_mask):
        # The CLIP forward pass:
        # returns image_embeds, text_embeds, etc.
        outputs = self.clip_model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            pixel_values=pixel_values
        )
        # outputs.image_embeds: [batch_size, projection_dim] default=512
        # outputs.text_embeds:  [batch_size, projection_dim]

        # By default, CLIPModel output embeddings are already normalized (unit sphere)
        image_embeds = outputs.image_embeds
        text_embeds = outputs.text_embeds

        # Concatenate them for classification
        fused = torch.cat([image_embeds, text_embeds], dim=1)  # shape: [B, 1024]
        logits = self.classifier(fused)
        return logits

num_labels = 2
model = CLIPClassifier(
    model_name=clip_model_name, 
    num_labels=num_labels, 
    freeze_clip=False  
).to(DEVICE)

In [11]:
# 8) Optimizer / Scheduler

# Fine-tuning CLIP can be costly. We do a small LR.
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
# Tried both simple scheduler and ReduceLROnPlateau
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=1, verbose=True)
criterion = nn.CrossEntropyLoss()



In [12]:
# 9) Training / Evaluation Functions
def epoch_step(model, dataloader, is_train=False):
    if is_train:
        model.train()
    else:
        model.eval()

    total_loss = 0
    all_preds = []
    all_labels = []

    for batch in dataloader:
        pixel_values = batch["pixel_values"].to(DEVICE)
        input_ids = batch["input_ids"].to(DEVICE)
        attention_mask = batch["attention_mask"].to(DEVICE)
        labels = batch["label"].to(DEVICE)

        if is_train:
            optimizer.zero_grad()

        logits = model(
            pixel_values=pixel_values,
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        loss = criterion(logits, labels)
        total_loss += loss.item() * labels.size(0)

        preds = logits.argmax(dim=1).detach().cpu().numpy()
        all_preds.append(preds)
        all_labels.append(labels.detach().cpu().numpy())

        if is_train:
            loss.backward()
            optimizer.step()

    avg_loss = total_loss / len(dataloader.dataset)
    all_preds = np.concatenate(all_preds)
    all_labels = np.concatenate(all_labels)
    acc = accuracy_score(all_labels, all_preds)
    prec, rec, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="macro")
    return avg_loss, acc, prec, rec, f1

In [13]:
# 10) Training Loop (with early stopping)

EPOCHS = 2
patience = 2
best_val_loss = float("inf")
no_improve = 0
best_state = None

for epoch in range(1, EPOCHS + 1):
    train_loss, train_acc, train_prec, train_rec, train_f1 = epoch_step(model, train_loader, is_train=True)
    val_loss, val_acc, val_prec, val_rec, val_f1 = epoch_step(model, val_loader, is_train=False)

    # Step scheduler on val_loss
    scheduler.step(val_loss)

    print(f"\nEpoch {epoch}/{EPOCHS}")
    print(f"  Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f} | P: {train_prec:.4f} | R: {train_rec:.4f} | F1: {train_f1:.4f}")
    print(f"  Val   Loss: {val_loss:.4f}   | Acc: {val_acc:.4f}   | P: {val_prec:.4f} | R: {val_rec:.4f} | F1: {val_f1:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improve = 0
        best_state = copy.deepcopy(model.state_dict())
    else:
        no_improve += 1
        if no_improve >= patience:
            print("Early stopping triggered.")
            break


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))



Epoch 1/2
  Train Loss: 0.6696 | Acc: 0.6120 | P: 0.3060 | R: 0.5000 | F1: 0.3797
  Val   Loss: 0.6686   | Acc: 0.6123   | P: 0.3062 | R: 0.5000 | F1: 0.3798


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))



Epoch 2/2
  Train Loss: 0.6682 | Acc: 0.6120 | P: 0.3060 | R: 0.5000 | F1: 0.3797
  Val   Loss: 0.6677   | Acc: 0.6123   | P: 0.3062 | R: 0.5000 | F1: 0.3798


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [14]:
if best_state:
    model.load_state_dict(best_state)

# Final Test

test_loss, test_acc, test_prec, test_rec, test_f1 = epoch_step(model, test_loader, is_train=False)
print("\n--- CLIP Test Results ---")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Acc:  {test_acc:.4f}")
print(f"Macro Precision: {test_prec:.4f}")
print(f"Macro Recall:    {test_rec:.4f}")
print(f"Macro F1:        {test_f1:.4f}")


--- CLIP Test Results ---
Test Loss: 0.6686
Test Acc:  0.6123
Macro Precision: 0.3062
Macro Recall:    0.5000
Macro F1:        0.3798


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
