In [4]:
import os
import random

import numpy as np
import torch


def seed_everything(seed: int = 42) -> None:
    """Seed semua sumber acak untuk hasil yang reprodusibel."""
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


SEED = 42
seed_everything(SEED)
print(f"Reproducibility seed set to {SEED}")

Reproducibility seed set to 42


In [None]:
# ===================================================================
# === 1. IMPORT LIBRARY ===
# ===================================================================

import os
from contextlib import nullcontext
from pathlib import Path

import cv2
import imagehash
import numpy as np
import pandas as pd
import timm
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image
from sklearn.metrics import classification_report, f1_score
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm.auto import tqdm

# ===================================================================
# === 2. KONFIGURASI DAN SETUP ===
# ===================================================================

BASE_DIR = os.getcwd()  # Menggunakan current working directory
TRAIN_DIR = os.path.join(BASE_DIR, "dataset fixx", "train")
TEST_DIR = os.path.join(BASE_DIR, "dataset fixx", "test", "test")

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
AMP_ENABLED = DEVICE.type == "cuda"
IMG_SIZE = 224
BATCH_SIZE = 16
NUM_WORKERS = os.cpu_count() or 1

EPOCHS_S1 = 3
LR_S1 = 3e-4
EPOCHS_S2 = 5
LR_S2 = 1e-4
EPOCHS_S3 = 12
LR_S3 = 5e-5

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]
BEST_MODEL_PATH = "best_swin_model.pth"

print(f"Menggunakan device: {DEVICE}")
print(f"Ukuran gambar: {IMG_SIZE}x{IMG_SIZE}, Batch Size: {BATCH_SIZE}")

autocast_ctx = torch.cuda.amp.autocast if AMP_ENABLED else nullcontext
scaler = torch.cuda.amp.GradScaler(enabled=AMP_ENABLED)

# ===================================================================
# === 3. FUNGSI DAN KELAS HELPER ===
# ===================================================================

def convert_path_to_df(dataset, is_test=False):
    image_dir = Path(dataset)
    filepaths = list(image_dir.glob(r"**/*.*"))
    if is_test:
        filepaths = pd.Series(filepaths, name="Filepath").astype(str)
        return pd.DataFrame({"Filepath": filepaths})
    labels = [p.parts[-2] for p in filepaths]
    filepaths = pd.Series(filepaths, name="Filepath").astype(str)
    labels = pd.Series(labels, name="Label")
    return pd.concat([filepaths, labels], axis=1)


class CustomDataset(Dataset):
    def __init__(self, dataframe, image_column, label_column=None, transform=None):
        self.dataframe = dataframe
        self.image_column = image_column
        self.label_column = label_column
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx][self.image_column]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        if self.label_column is None:
            return image
        label = self.dataframe.iloc[idx][self.label_column]
        return image, torch.tensor(label, dtype=torch.long)


class DualTransformDataset(Dataset):
    def __init__(self, dataframe, image_column, label_column, transform_main, transform_extra):
        self.dataframe = dataframe
        self.image_column = image_column
        self.label_column = label_column
        self.transform_main = transform_main
        self.transform_extra = transform_extra

    def __len__(self):
        return len(self.dataframe) * 2

    def __getitem__(self, idx):
        base_idx = idx // 2
        use_extra = idx % 2 == 1
        row = self.dataframe.iloc[base_idx]
        img_path = row[self.image_column]
        image = Image.open(img_path).convert("RGB")
        image = self.transform_extra(image) if use_extra else self.transform_main(image)
        label = torch.tensor(row[self.label_column], dtype=torch.long)
        return image, label


class CLAHETransform:
    def __init__(self, clip_limit=2.0, tile_grid_size=(8, 8)):
        self.clip_limit = clip_limit
        self.tile_grid_size = tile_grid_size

    def __call__(self, img):
        img_np = np.array(img)
        lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB)
        l, a, b = cv2.split(lab)
        clahe = cv2.createCLAHE(clipLimit=self.clip_limit, tileGridSize=self.tile_grid_size)
        l = clahe.apply(l)
        lab = cv2.merge((l, a, b))
        img_rgb_clahe = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB)
        return Image.fromarray(img_rgb_clahe)


class TestDataset(Dataset):
    def __init__(self, dataframe, image_column, transform=None):
        self.dataframe = dataframe
        self.image_column = image_column
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx][self.image_column]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, img_path


# ===================================================================
# === 4. PERSIAPAN DATA DENGAN STRATEGI "VALIDASI BERSIH" ===
# ===================================================================

def get_phash(filepath):
    try:
        with Image.open(filepath) as img:
            return imagehash.phash(img)
    except Exception:
        return None


train_df = convert_path_to_df(TRAIN_DIR)
print(f"Jumlah data training sebelum filtering manual: {len(train_df)}")

print()
print("=" * 50)
print("Mengidentifikasi kebocoran data (train vs test)...")

test_df = convert_path_to_df(TEST_DIR, is_test=True)
tqdm.pandas(desc="Menghitung Hash Data Test")
test_hashes = set(test_df["Filepath"].progress_apply(get_phash))
test_hashes.discard(None)

tqdm.pandas(desc="Menghitung Hash Data Train")
train_df["hash"] = train_df["Filepath"].progress_apply(get_phash)
train_df["is_leak"] = train_df["hash"].isin(test_hashes)
print(f"Ditemukan {train_df['is_leak'].sum()} gambar di training set yang identik dengan gambar di test set.")

print()
print("Menerapkan strategi 'Validasi Bersih'...")
leaked_df = train_df[train_df["is_leak"]].copy()
clean_df = train_df[~train_df["is_leak"]].copy()

label_mapping = {
    "Ayam Bakar": 0,
    "Ayam Betutu": 1,
    "Ayam Goreng": 2,
    "Ayam Pop": 3,
    "Bakso": 4,
    "Coto Makassar": 5,
    "Gado Gado": 6,
    "Gudeg": 7,
    "Nasi Goreng": 8,
    "Pempek": 9,
    "Rawon": 10,
    "Rendang": 11,
    "Sate Madura": 12,
    "Sate Padang": 13,
    "Soto": 14,
}

clean_df["Label"] = clean_df["Label"].map(label_mapping)
leaked_df["Label"] = leaked_df["Label"].map(label_mapping)

val_split = pd.DataFrame()
if not clean_df.empty:
    try:
        clean_train_split, val_split = train_test_split(
            clean_df, test_size=0.2, random_state=42, stratify=clean_df["Label"]
        )
    except ValueError:
        print("Peringatan: Gagal stratify, menggunakan split biasa.")
        clean_train_split, val_split = train_test_split(clean_df, test_size=0.2, random_state=42)
else:
    clean_train_split = clean_df

train_split = pd.concat([clean_train_split, leaked_df], ignore_index=True)
train_split.drop(columns=["hash", "is_leak"], inplace=True, errors="ignore")

train_split["Label"] = train_split["Label"].astype(int)
if "Label" in val_split.columns:
    val_split["Label"] = val_split["Label"].astype(int)

print(f"Total data train setelah digabung: {len(train_split)}")
print(f"Total data validasi bersih: {len(val_split)}")

# ===================================================================
# === 5. TRANSFORMASI DATA ===
# ===================================================================

train_transform = transforms.Compose([
    CLAHETransform(),
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.2),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3)], p=0.3),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])

train_transform_extra = transforms.Compose([
    CLAHETransform(),
    transforms.RandomRotation(degrees=15),
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.6, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])

val_test_transform = transforms.Compose([
    CLAHETransform(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])

train_dataset = DualTransformDataset(
    train_split,
    image_column="Filepath",
    label_column="Label",
    transform_main=train_transform,
    transform_extra=train_transform_extra,
)

val_dataset = CustomDataset(val_split, "Filepath", "Label", val_test_transform)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    persistent_workers=NUM_WORKERS > 0,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    persistent_workers=NUM_WORKERS > 0,
)

class_weights = compute_class_weight(
    "balanced", classes=np.unique(train_split["Label"]), y=train_split["Label"]
)
class_weights = torch.tensor(class_weights, dtype=torch.float32, device=DEVICE)

# ===================================================================
# === 6. DEFINISI MODEL SWIN TRANSFORMER ===
# ===================================================================


class SingleSwinModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = timm.create_model(
            "swin_base_patch4_window7_224", pretrained=True, num_classes=num_classes
        )

    def forward(self, x):
        return self.backbone(x)

    def freeze_all(self):
        for param in self.backbone.parameters():
            param.requires_grad = False

    def enable_head(self):
        for name, param in self.backbone.named_parameters():
            if "head" in name or "norm" in name:
                param.requires_grad = True

    def enable_stages(self, stages):
        for name, param in self.backbone.named_parameters():
            if any(f"layers.{idx}" in name for idx in stages):
                param.requires_grad = True

    def enable_all(self):
        for param in self.backbone.parameters():
            param.requires_grad = True


model = SingleSwinModel(num_classes=len(label_mapping)).to(DEVICE)
model.freeze_all()
model.enable_head()

criterion = nn.CrossEntropyLoss(weight=class_weights)


def train_one_epoch(model, loader, optimizer, criterion, scheduler=None):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(loader, desc="Train", leave=False):
        images = images.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad(set_to_none=True)
        with autocast_ctx():
            outputs = model(images)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        scaler.step(optimizer)
        scaler.update()
        if scheduler is not None:
            scheduler.step()
        running_loss += loss.item()
    return running_loss / max(1, len(loader))


def evaluate(model, loader, criterion):
    if len(getattr(loader, "dataset", [])) == 0:
        return 0.0, 0.0, "Validasi kosong - dilewati."
    model.eval()
    running_loss = 0.0
    all_preds, all_labels = [], []
    with torch.no_grad():
        for images, labels in tqdm(loader, desc="Val", leave=False):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            with autocast_ctx():
                outputs = model(images)
                loss = criterion(outputs, labels)
            running_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    avg_loss = running_loss / max(1, len(loader))
    f1 = f1_score(all_labels, all_preds, average="macro")
    report = classification_report(all_labels, all_preds, target_names=list(label_mapping.keys()))
    return avg_loss, f1, report


best_f1 = 0.0

print()
print("=" * 50)
print("TAHAP 1: Fine-tuning Head Klasifikasi")
print("=" * 50)
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR_S1, weight_decay=1e-4)
for epoch in range(EPOCHS_S1):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, f1, report = evaluate(model, val_loader, criterion)
    print(f"Epoch [{epoch + 1}/{EPOCHS_S1}] | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Macro F1: {f1:.4f}")
    print("--- Laporan Klasifikasi Validasi ---")
    print(report)
    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f"✅ Model disimpan (F1 terbaik baru: {best_f1:.4f})")

print()
print("=" * 50)
print("TAHAP 2: Membuka Stage Terakhir Swin")
print("=" * 50)
model.enable_stages([3])
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR_S2, weight_decay=1e-4)
for epoch in range(EPOCHS_S2):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, f1, report = evaluate(model, val_loader, criterion)
    print(f"Epoch [{epoch + 1}/{EPOCHS_S2}] | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Macro F1: {f1:.4f}")
    print("--- Laporan Klasifikasi Validasi ---")
    print(report)
    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f"✅ Model disimpan (F1 terbaik baru: {best_f1:.4f})")

print()
print("=" * 50)
print("TAHAP 3: Fine-tuning Seluruh Backbone")
print("=" * 50)
model.enable_all()
optimizer = optim.AdamW(model.parameters(), weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=LR_S3,
    epochs=EPOCHS_S3,
    steps_per_epoch=len(train_loader),
)
for epoch in range(EPOCHS_S3):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, scheduler)
    val_loss, f1, report = evaluate(model, val_loader, criterion)
    print(f"Epoch [{epoch + 1}/{EPOCHS_S3}] | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Macro F1: {f1:.4f}")
    print("--- Laporan Klasifikasi Validasi ---")
    print(report)
    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f"✅ Model disimpan (F1 terbaik baru: {best_f1:.4f})")

print()
print("=" * 50)
print("TRAINING SELESAI")
print("=" * 50)
print(f"Model terbaik tersimpan di: {BEST_MODEL_PATH} dengan F1 {best_f1:.4f}")

Menggunakan device: cuda
Ukuran gambar: 224x224, Batch Size: 16


  scaler = torch.cuda.amp.GradScaler(enabled=AMP_ENABLED)


Jumlah data training sebelum filtering manual: 4052

Mengidentifikasi kebocoran data (train vs test)...


Menghitung Hash Data Test:   0%|          | 0/2057 [00:00<?, ?it/s]

In [None]:
###################################################################################
### BLOK C: FINAL INFERENCE & SUBMISSION (Single Swin Transformer) ###
###################################################################################

print("\n" + "#" * 80)
print("### MEMULAI BLOK C: INFERENCE & SUBMISSION (Swin Transformer) ###")
print("#" * 80 + "\n")

model_swin = SingleSwinModel(num_classes=len(label_mapping)).to(DEVICE)
MODEL_PATH = BEST_MODEL_PATH
print(f"Memuat bobot dari: {MODEL_PATH}")
model_swin.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
model_swin.eval()

final_test_transform = transforms.Compose([
    CLAHETransform(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
])

test_df = convert_path_to_df(TEST_DIR, is_test=True)
test_dataset = TestDataset(test_df, "Filepath", transform=final_test_transform)
test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    persistent_workers=NUM_WORKERS > 0,
)

reverse_label_mapping = {v: k for k, v in label_mapping.items()}

submission_data = []
with torch.no_grad():
    for images, paths in tqdm(test_loader, desc="Inferensi Swin"):
        images = images.to(DEVICE)
        outputs = model_swin(images)
        preds = torch.argmax(outputs, dim=1)
        for img_path, label_idx in zip(paths, preds.cpu().tolist()):
            img_id = os.path.splitext(os.path.basename(img_path))[0]
            submission_data.append({"id": img_id, "style": reverse_label_mapping[label_idx]})

print("\nMenyimpan hasil prediksi ke submission_swin.csv...")
submission_df = pd.DataFrame(submission_data)
submission_df.sort_values(by="id", inplace=True)
submission_df.to_csv("submission_swin.csv", index=False)

print("✅ File submission_swin.csv berhasil dibuat!")
print("\n" + "=" * 50 + "\nPROSES SELESAI (SWIN TRANSFORMER)\n" + "=" * 50)