# CPE342 - Karena Task4 Swin tranformation

In [2]:
import os
import time
import random
import warnings
import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from sklearn.model_selection import StratifiedKFold
from sklearn.utils.class_weight import compute_class_weight
import timm
from torchvision import transforms

warnings.filterwarnings('ignore')

# ตรวจสอบการ์ดจอทันที
if torch.cuda.is_available():
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
    print(f"Memory Usage: {torch.cuda.memory_allocated()/1024**3:.2f} GB")
else:
    print("GPU not detected. Please check your PyTorch installation.")

GPU is available: NVIDIA GeForce RTX 3050 Laptop GPU
Memory Usage: 0.00 GB


In [3]:
class CFG:
    DATA_ROOT = "Dataset/task4"  
    TRAIN_CSV = "train.csv"
    VAL_CSV = "val.csv"
    TEST_CSV = "test_refined.csv"
    
    TRAIN_DIR = "train"
    VAL_DIR = "val"
    TEST_DIR = "test"
    
    OUTPUT_DIR = "output_swin_gpu"
    if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR)
    
    MODEL_NAME = "swin_small_patch4_window7_224"
    IMG_SIZE = 224

    BATCH_SIZE = 32  
    
    NUM_EPOCHS = 5
    EARLY_STOPPING = 2
    N_FOLDS = 5
    
    LR = 1e-4
    WEIGHT_DECAY = 1e-4
    SEED = 42

    NUM_WORKERS = 0
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

## Utility Functions

In [4]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True # เปิด Benchmark ให้ CUDA หาอัลกอริทึมที่เร็วสุด

def get_label_mapping(df):
    label_col = "label" if "label" in df.columns else df.columns[1]
    if df[label_col].dtype == 'O':
        uniq = sorted(df[label_col].unique())
        return {k:v for v,k in enumerate(uniq)}, len(uniq), label_col
    return None, int(df[label_col].max())+1, label_col

## Data Augmentation

In [5]:
def get_transforms(data):
    if data == 'train':
        return transforms.Compose([
            transforms.Resize((CFG.IMG_SIZE + 32, CFG.IMG_SIZE + 32)),
            transforms.RandomResizedCrop(CFG.IMG_SIZE, scale=(0.8, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.ColorJitter(0.2, 0.2, 0.2, 0.1),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    elif data == 'valid':
        return transforms.Compose([
            transforms.Resize((CFG.IMG_SIZE, CFG.IMG_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

## Dataset Class

In [6]:
class GameDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, label2idx=None, is_test=False, label_col="label"):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transform = transform
        self.is_test = is_test
        self.label2idx = label2idx
        self.label_col = label_col
        
        # Detect Image Column
        self.img_col = next((c for c in ["file_name", "filename", "image", "id"] if c in df.columns), df.columns[0])

    def __len__(self): return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_name = str(row[self.img_col])
        
        # Search for file
        img_path = os.path.join(self.img_dir, img_name)
        if not os.path.exists(img_path):
            for ext in ['.jpg', '.jpeg', '.png', '.JPG']:
                if os.path.exists(img_path + ext):
                    img_path += ext
                    break
            else:
                img_path += '.jpg' # Fallback
        
        try:
            image = Image.open(img_path).convert('RGB')
        except:
            image = Image.new('RGB', (CFG.IMG_SIZE, CFG.IMG_SIZE)) # Black image if error
            
        if self.transform: image = self.transform(image)
        
        if self.is_test: return image, img_name
        
        label = row[self.label_col]
        if self.label2idx: label = self.label2idx[label]
        return image, int(label)

## Build Model

In [7]:
def create_model(num_classes):
    model = timm.create_model(CFG.MODEL_NAME, pretrained=True, num_classes=num_classes)
    return model.to(CFG.DEVICE)

## Training Engine

In [8]:
from tqdm.auto import tqdm # เพิ่มบรรทัดนี้หัวบนสุดของ cell หรือโค้ด

def train_epoch(model, loader, criterion, optimizer, scheduler, device):
    model.train()
    scaler = torch.cuda.amp.GradScaler()
    running_loss, correct, total = 0.0, 0, 0
    
    # ครอบ loader ด้วย tqdm เพื่อสร้าง Progress Bar
    pbar = tqdm(loader, desc="Training", leave=False)
    
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)
            
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        if scheduler: scheduler.step()
        
        running_loss += loss.item() * images.size(0)
        correct += (outputs.argmax(1) == labels).sum().item()
        total += labels.size(0)
        
        # อัปเดตตัวเลขท้ายหลอดโหลด real-time
        current_acc = correct / total
        pbar.set_postfix({'loss': loss.item(), 'acc': current_acc})
        
    return running_loss / total, correct / total

@torch.no_grad()
def valid_epoch(model, loader, criterion, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        running_loss += loss.item() * images.size(0)
        correct += (outputs.argmax(1) == labels).sum().item()
        total += labels.size(0)
        
    return running_loss / total, correct / total

## Main Loop

In [9]:
def run_training():
    set_seed(CFG.SEED)
    
    # Load Data
    try:
        train_df = pd.read_csv(os.path.join(CFG.DATA_ROOT, CFG.TRAIN_CSV))
        test_df = pd.read_csv(os.path.join(CFG.DATA_ROOT, CFG.TEST_CSV))
        print(f"Data Loaded: Train {train_df.shape}, Test {test_df.shape}")
    except:
        print("Error loading CSV. Please check paths in CFG.")
        return

    label2idx, num_classes, label_col = get_label_mapping(train_df)
    
    # K-Fold
    skf = StratifiedKFold(n_splits=CFG.N_FOLDS, shuffle=True, random_state=CFG.SEED)
    y_labels = train_df[label_col].map(label2idx).values if label2idx else train_df[label_col].values.astype(int)
    
    fold_models = []
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(train_df, y_labels)):
        print(f"\n{'='*20} Fold {fold+1}/{CFG.N_FOLDS} {'='*20}")
        
        # Data Splitting
        df_train = train_df.iloc[train_idx].reset_index(drop=True)
        df_val = train_df.iloc[val_idx].reset_index(drop=True)
        
        # Class Weights handling
        y_train = y_labels[train_idx]
        class_w = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
        weights_tensor = torch.tensor(class_w, dtype=torch.float32).to(CFG.DEVICE)
        
        # Weighted Sampler
        samples_weight = class_w[y_train]
        sampler = WeightedRandomSampler(samples_weight, len(samples_weight))
        
        # Datasets & Loaders
        ds_train = GameDataset(df_train, os.path.join(CFG.DATA_ROOT, CFG.TRAIN_DIR), get_transforms('train'), label2idx, label_col=label_col)
        ds_val = GameDataset(df_val, os.path.join(CFG.DATA_ROOT, CFG.TRAIN_DIR), get_transforms('valid'), label2idx, label_col=label_col)
        
        dl_train = DataLoader(ds_train, batch_size=CFG.BATCH_SIZE, sampler=sampler, 
                              num_workers=CFG.NUM_WORKERS, pin_memory=True) # pin_memory for GPU
        dl_val = DataLoader(ds_val, batch_size=CFG.BATCH_SIZE, shuffle=False, 
                            num_workers=CFG.NUM_WORKERS, pin_memory=True)
        
        # Init Model
        model = create_model(num_classes)
        optimizer = optim.AdamW(model.parameters(), lr=CFG.LR, weight_decay=CFG.WEIGHT_DECAY)
        criterion = nn.CrossEntropyLoss(weight=weights_tensor)
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG.NUM_EPOCHS*len(dl_train))
        
        # Train Loop
        best_acc = 0
        patience = 0
        
        for epoch in range(1, CFG.NUM_EPOCHS+1):
            start = time.time()
            loss_t, acc_t = train_epoch(model, dl_train, criterion, optimizer, scheduler, CFG.DEVICE)
            loss_v, acc_v = valid_epoch(model, dl_val, criterion, CFG.DEVICE)
            
            print(f"Ep {epoch}: Train Loss {loss_t:.4f} Acc {acc_t:.4f} | Val Acc {acc_v:.4f} | {time.time()-start:.1f}s")
            
            if acc_v > best_acc:
                best_acc = acc_v
                torch.save(model.state_dict(), os.path.join(CFG.OUTPUT_DIR, f"swin_fold{fold}.pth"))
                patience = 0
            else:
                patience += 1
                if patience >= CFG.EARLY_STOPPING:
                    print(f"Early stopping at epoch {epoch}")
                    break
        
        fold_models.append(os.path.join(CFG.OUTPUT_DIR, f"swin_fold{fold}.pth"))
        
    return fold_models, label2idx

# Run!
trained_models, label_map = run_training()

Data Loaded: Train (31546, 3), Test (25889, 3)



Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 1: Train Loss 0.2590 Acc 0.8958 | Val Acc 0.9380 | 34696.2s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 2: Train Loss 0.0828 Acc 0.9666 | Val Acc 0.9681 | 2719.8s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 3: Train Loss 0.0484 Acc 0.9802 | Val Acc 0.9791 | 3019.4s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 4: Train Loss 0.0240 Acc 0.9893 | Val Acc 0.9834 | 2743.1s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 5: Train Loss 0.0172 Acc 0.9932 | Val Acc 0.9854 | 2648.2s



Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 1: Train Loss 0.2559 Acc 0.8963 | Val Acc 0.9442 | 2454.0s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 2: Train Loss 0.0825 Acc 0.9681 | Val Acc 0.9729 | 2468.6s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 3: Train Loss 0.0453 Acc 0.9829 | Val Acc 0.9738 | 3242.0s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 4: Train Loss 0.0213 Acc 0.9911 | Val Acc 0.9822 | 3246.3s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 5: Train Loss 0.0149 Acc 0.9934 | Val Acc 0.9846 | 3250.0s



Training:   0%|          | 0/789 [00:00<?, ?it/s]

Ep 1: Train Loss 0.2501 Acc 0.8946 | Val Acc 0.9697 | 3141.4s


Training:   0%|          | 0/789 [00:00<?, ?it/s]

KeyboardInterrupt: 

## Inference

In [None]:
def inference(models_path, label2idx):
    test_df = pd.read_csv(os.path.join(CFG.DATA_ROOT, CFG.TEST_CSV))
    ds_test = GameDataset(test_df, os.path.join(CFG.DATA_ROOT, CFG.TEST_DIR), get_transforms('valid'), is_test=True)
    dl_test = DataLoader(ds_test, batch_size=CFG.BATCH_SIZE, shuffle=False, num_workers=CFG.NUM_WORKERS, pin_memory=True)
    
    num_classes = len(label2idx) if label2idx else test_df['label'].nunique() # Check logic
    if not num_classes: num_classes = 5 # Default fallback
    
    all_probs = []
    
    for path in models_path:
        print(f"Predicting with {path}...")
        model = create_model(num_classes)
        model.load_state_dict(torch.load(path))
        model.eval()
        
        probs = []
        with torch.no_grad():
            for img, _ in dl_test:
                out = model(img.to(CFG.DEVICE))
                probs.append(torch.softmax(out, dim=1).cpu().numpy())
        all_probs.append(np.concatenate(probs))
        
    # Average Predictions
    avg_probs = np.mean(all_probs, axis=0)
    preds = avg_probs.argmax(1)
    
    if label2idx:
        inv_map = {v:k for k,v in label2idx.items()}
        final_preds = [inv_map[p] for p in preds]
    else:
        final_preds = preds

# Run Inference if training finished
if 'trained_models' in locals() and trained_models:
    inference(trained_models, label_map)


--- Unfreezing model for Fine-Tuning ---
