# **Mount Google Drive**

In [None]:
from google.colab import drive
import sys

drive.mount('/content/drive')

# **Import Neccesary Packages**

In [None]:
!pip install -q transformers datasets scikit-learn seaborn

import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import CLIPProcessor, CLIPTokenizer, CLIPModel, get_scheduler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from tqdm.notebook import tqdm
import glob
import gc
import matplotlib.pyplot as plt
import seaborn as sns

# **Configuration for Subtask D**

In [None]:
class CFG_D:
    """
    Configuration Class for Subtask D: Humor Detection.
    """
    DRIVE_PATH = "/content/drive/MyDrive/case-2025"

    # File & Directory Paths
    train_dir = os.path.join(DRIVE_PATH, "SubTaskD/Train")
    train_text_path = os.path.join(DRIVE_PATH, "SubTaskD/Train/STask_D_train.csv")
    val_image_dir = os.path.join(DRIVE_PATH, "SubTaskD/Eval/STask_D_val_img")
    val_labels_path = os.path.join(DRIVE_PATH, "SubTaskD/Eval/STask-D(index,label)val.csv")
    val_text_path = os.path.join(DRIVE_PATH, "SubTaskD/Eval/STask-D(index,text)val.csv")
    test_image_dir = os.path.join(DRIVE_PATH, "SubTaskD/Test/STask_D_test_img")
    test_csv_path = os.path.join(DRIVE_PATH, "SubTaskD/Test/STask-D(index,text)test.csv")
    output_dir = os.path.join(DRIVE_PATH, "output_subtask_d_hyper")

    #Model & Training Parameters
    model_name = 'openai/clip-vit-large-patch14'
    image_size = 224
    max_token_len = 77
    learning_rate_base = 1e-6
    learning_rate_head = 1e-5
    batch_size = 16
    epochs = 8
    num_workers = 2
    device = "cuda" if torch.cuda.is_available() else "cpu"

os.makedirs(CFG_D.output_dir, exist_ok=True)
print(f" Subtask D Configuration defined. Output will be saved to: {CFG_D.output_dir}")

# **Data Loading**

In [None]:
try:
    #Training Data Loading
    train_image_paths = glob.glob(os.path.join(CFG_D.train_dir, '**/*.png'), recursive=True)
    train_data = []
    class_folders = ['Humor', 'No Humor']
    for path in train_image_paths:
        label = os.path.basename(os.path.dirname(path))
        if label in class_folders:
            train_data.append({'index': os.path.basename(path), 'label_text': label})

    ground_truth_labels_df = pd.DataFrame(train_data)
    text_data_df = pd.read_csv(CFG_D.train_text_path, usecols=['index', 'text'])
    train_df_d = pd.merge(ground_truth_labels_df, text_data_df, on="index")

    # Validation Data Loading
    val_labels_df = pd.read_csv(CFG_D.val_labels_path)
    val_text_df = pd.read_csv(CFG_D.val_text_path)
    val_df_d = pd.merge(val_text_df, val_labels_df, on="index")

    print("Data loaded successfully.")
    print(f"Training samples:   {len(train_df_d)}")
    print(f"Validation samples: {len(val_df_d)}")

except FileNotFoundError as e:
    print(f"\n ERROR: A data file was not found. Please double-check your paths in CFG_D.")
    print(f" Details: {e}")
    sys.exit()

# Visualization: Label Distribution

In [None]:
plt.style.use('seaborn-v0_8-whitegrid')
plt.figure(figsize=(8, 5))
class_order = ['No Humor', 'Humor']
sns.countplot(x='label_text', data=train_df_d, palette='rocket', order=class_order)
plt.title('Subtask D: Training Set Label Distribution')
plt.xlabel('Humor Class')
plt.ylabel('Count')
plt.show()

# **DATA PREPARATION**

In [None]:
#Define the Official Mapping
official_target_map_d = {
    'No Humor': 0,
    'Humor': 1
}

# Apply the Mapping
train_df_d['label_encoded'] = train_df_d['label_text'].map(official_target_map_d)
val_df_d.rename(columns={'label': 'label_encoded'}, inplace=True)
val_df_d['label_encoded'] = val_df_d['label_encoded'].astype(int)


# Calculate Class Weights for the Loss Function
print("\n Calculating class weights to handle data imbalance.")
class_counts = train_df_d['label_encoded'].value_counts().sort_index()
print(f"   - Class counts: {class_counts.to_dict()}")
weights = len(train_df_d) / (len(class_counts) * class_counts)
CFG_D.class_weights = torch.tensor(weights.values, dtype=torch.float).to(CFG_D.device)
print(f"Calculated weights for loss function: {CFG_D.class_weights}")

# Store the number of classes in config.
CFG_D.num_classes = len(official_target_map_d)

# **BUILDING THE DATA PIPELINE**

In [None]:
#Load the Official CLIP Processor
processor = CLIPProcessor.from_pretrained(CFG_D.model_name)

#Define the Dataset Class
class HumorDataset(Dataset):
    def __init__(self, df, processor, image_dir, image_size, max_token_len, is_test=False):
        self.df = df
        self.processor = processor
        self.image_dir = image_dir
        self.image_size = image_size
        self.max_token_len = max_token_len
        self.is_test = is_test
        self.image_path_map = {os.path.basename(p): p for p in glob.glob(os.path.join(image_dir, '**/*.*'), recursive=True)}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        text = str(row.get('text', ''))

        tokenized = self.processor(text=text, truncation=True, max_length=self.max_token_len, padding="max_length", return_tensors="pt")
        image_name = row['index']
        image_path = self.image_path_map.get(image_name)
        if image_path:
            image = Image.open(image_path).convert("RGB")
        else:
            image = Image.new('RGB', (self.image_size, self.image_size), 'black')
        processed_image = self.processor(images=image, return_tensors="pt")

        item = {
            'input_ids': tokenized['input_ids'].squeeze(0),
            'attention_mask': tokenized['attention_mask'].squeeze(0),
            'pixel_values': processed_image['pixel_values'].squeeze(0)
        }
        if not self.is_test:
            item['label'] = torch.tensor(row['label_encoded'], dtype=torch.long)
        return item

# **Data Loaders**

In [None]:
test_df_d = pd.read_csv(CFG_D.test_csv_path)

train_dataset_d = HumorDataset(train_df_d, processor, CFG_D.train_dir, CFG_D.image_size, CFG_D.max_token_len)
val_dataset_d = HumorDataset(val_df_d, processor, CFG_D.val_image_dir, CFG_D.image_size, CFG_D.max_token_len)
test_dataset_d = HumorDataset(test_df_d, processor, CFG_D.test_image_dir, CFG_D.image_size, CFG_D.max_token_len, is_test=True)

train_loader_d = DataLoader(train_dataset_d, batch_size=CFG_D.batch_size, shuffle=True, num_workers=CFG_D.num_workers)
val_loader_d = DataLoader(val_dataset_d, batch_size=CFG_D.batch_size, shuffle=False, num_workers=CFG_D.num_workers)
test_loader_d = DataLoader(test_dataset_d, batch_size=CFG_D.batch_size, shuffle=False, num_workers=CFG_D.num_workers)

print(f"DataLoaders created.")
print(f"Training batches:   {len(train_loader_d)}")
print(f"Validation batches: {len(val_loader_d)}")
print(f"Test batches:       {len(test_loader_d)}")

gc.collect()

# **The Main Model Architecture**

In [None]:
class HumorClassifier(nn.Module):
    def __init__(self, model_name, num_classes):
        super().__init__()
        self.clip = CLIPModel.from_pretrained(model_name)
        projection_dim = self.clip.projection_dim

        self.classifier = nn.Sequential(
            nn.Linear(2 * projection_dim, projection_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(projection_dim, num_classes)
        )

    def forward(self, input_ids, attention_mask, pixel_values):
        outputs = self.clip(
            input_ids=input_ids,
            attention_mask=attention_mask,
            pixel_values=pixel_values
        )
        image_features = outputs.image_embeds
        text_features = outputs.text_embeds
        combined_features = torch.cat((image_features, text_features), dim=1)
        logits = self.classifier(combined_features)
        return logits

# **Instantiate the Model for Subtask D**

In [None]:
model_d = HumorClassifier(
    model_name=CFG_D.model_name,
    num_classes=CFG_D.num_classes
).to(CFG_D.device)
print(f"Model instantiated with {CFG_D.num_classes} output classes.")


# **Defining the Training Engine**

In [None]:
criterion = nn.CrossEntropyLoss(weight=CFG_D.class_weights)
print(f"Using weighted CrossEntropyLoss with weights: {CFG_D.class_weights.cpu().numpy()}")

optimizer = AdamW([
    {'params': model_d.clip.parameters(), 'lr': CFG_D.learning_rate_base},
    {'params': model_d.classifier.parameters(), 'lr': CFG_D.learning_rate_head}
], weight_decay=1e-2)

num_training_steps = CFG_D.epochs * len(train_loader_d)
lr_scheduler = get_scheduler(
    name="linear",
    optimizer=optimizer,
    num_warmup_steps=int(0.1 * num_training_steps),
    num_training_steps=num_training_steps
)
print("Optimizer and LR Scheduler are ready.")

# **Helper Functions**

In [None]:
def train_one_epoch(model, loader, optimizer, criterion, scheduler, device):
    model.train()
    total_loss = 0
    progress_bar = tqdm(loader, desc="Training", leave=False)
    for batch in progress_bar:
        pixel_values = batch['pixel_values'].to(device)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids, attention_mask, pixel_values)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        total_loss += loss.item()
        progress_bar.set_postfix(loss=f"{loss.item():.4f}")
    return total_loss / len(loader)

def validate_one_epoch(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds, all_labels = [], []
    with torch.no_grad():
        progress_bar = tqdm(loader, desc="Validating", leave=False)
        for batch in progress_bar:
            pixel_values = batch['pixel_values'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids, attention_mask, pixel_values)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    avg_loss = total_loss / len(loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro', zero_division=0)
    return avg_loss, accuracy, precision, recall, f1

print("Training and validation helper functions are ready.")

# **Main Training Loop**

In [None]:
history_d = {
    'train_loss': [], 'val_loss': [], 'val_accuracy': [],
    'val_precision': [], 'val_recall': [], 'val_f1': []
}
best_val_f1 = 0.0
model_path_d = os.path.join(CFG_D.output_dir, 'best_model_subtask_d.pth')

print("\n Starting Subtask D Fine-Tuning: ")
for epoch in range(CFG_D.epochs):
    print(f"\n===== Epoch {epoch + 1}/{CFG_D.epochs} =====")
    train_loss = train_one_epoch(model_d, train_loader_d, optimizer, criterion, lr_scheduler, CFG_D.device)
    val_loss, val_acc, val_prec, val_rec, val_f1 = validate_one_epoch(model_d, val_loader_d, criterion, CFG_D.device)

    history_d['train_loss'].append(train_loss); history_d['val_loss'].append(val_loss)
    history_d['val_accuracy'].append(val_acc); history_d['val_precision'].append(val_prec)
    history_d['val_recall'].append(val_rec); history_d['val_f1'].append(val_f1)

    print(f"Epoch {epoch + 1} Summary:")
    print(f"  - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    print(f"  - Accuracy: {val_acc:.4f}, F1-Score (Macro): {val_f1:.4f}")

    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        print(f"New best F1-score! Saving model to {model_path_d}")
        torch.save(model_d.state_dict(), model_path_d)
    else:
        print("F1-score did not improve.")

print("\n Training Finished")
print(f"Best validation F1-Score for Subtask D achieved: {best_val_f1:.4f}")

# **Plotting the Training History**

In [None]:
fig, ax1 = plt.subplots(figsize=(12, 5))
ax1.set_xlabel('Epoch')
ax1.plot(history_d['train_loss'], 'r-o', label='Train Loss')
ax1.plot(history_d['val_loss'], 'orange', marker='o', label='Validation Loss')
ax1.set_ylabel('Loss')
ax1.legend(loc='upper left')
ax2 = ax1.twinx()
ax2.plot(history_d['val_f1'], 'b-x', label='Validation F1-Score (Macro)')
ax2.set_ylabel('F1-Score (Macro)')
ax2.legend(loc='upper right')
plt.title('Subtask D: Training and Validation History')
plt.show()


# **Prediction on Test Set**

In [None]:
import json

# Prediction Function
def predict_subtask_d(model_path, test_loader, device):
    model = HumorClassifier(
        model_name=CFG_D.model_name,
        num_classes=CFG_D.num_classes
    ).to(device)

    # Load the saved weights from the best model path saved in the folder
    print(f"--> Loading best model weights from: {model_path}")
    model.load_state_dict(torch.load(model_path))
    model.eval()

    all_preds = []

    with torch.no_grad():
        progress_bar = tqdm(test_loader, desc="Predicting on Test Set")
        for batch in progress_bar:
            pixel_values = batch['pixel_values'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            outputs = model(input_ids, attention_mask, pixel_values)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())

    return all_preds

# Run the Prediction and Create the Submission File
model_path = os.path.join(CFG_D.output_dir, 'best_model_subtask_d.pth')
predictions = predict_subtask_d(model_path, test_loader_d, CFG_D.device)

indices = test_df_d['index'].tolist()

print("\n--> Creating JSON Lines submission file...")
# Define the output path.
submission_path = os.path.join(CFG_D.output_dir, 'submission.json')

with open(submission_path, 'w') as f:
    for index, prediction in zip(indices, predictions):
        result = {
            "index": index,
            "prediction": int(prediction)
        }
        f.write(json.dumps(result) + '\n')

print(f"\n Submission file for Subtask D created successfully at: {submission_path}")
