
# Kaggle Image Recognition (PyTorch) — Notebook
This notebook trains an image classifier on a Kaggle-style dataset, automatically creating **train / val / test** splits and saving a best-model checkpoint. It also supports **your own custom model**.

**Highlights**
- Works with folder-per-class *or* CSV (`filepath,label`).
- Stratified splitting (if scikit-learn is installed), otherwise random.
- Transfer learning (ResNet50 / EfficientNet-B0) or your own `nn.Module`.
- Mixed precision, cosine LR schedule, early stopping, gradient clipping.
- Artifacts: `best_model.pt`, `class_to_idx.json`, `training_metrics.csv`, `split_*.csv`.
- Inference on single image or directory.


In [None]:

# ==== CONFIG ====
# Set these paths according to your environment/dataset

DATA_ROOT = '/path/to/dataset_root'  # folder with subfolders per class; or base root for CSV-relative paths
CSV_PATH = None  # e.g. '/path/to/labels.csv' with columns: filepath,label (relative to DATA_ROOT or absolute); set to None for folder mode

OUTPUT_DIR = 'runs/notebook_run'  # where to write artifacts
SEED = 42

# Split ratios
VAL_SIZE = 0.10
TEST_SIZE = 0.10

# Training
EPOCHS = 10
BATCH_SIZE = 32
LR = 3e-4
WEIGHT_DECAY = 1e-4
IMG_SIZE = 224
AUG = True
WORKERS = 4
AMP = True
PATIENCE = 5         # early stopping epochs (no improvement in val loss)
GRAD_CLIP = 1.0
FREEZE_BACKBONE = False

# Model options
#   Built-ins: 'resnet50' or 'efficientnet_b0'
MODEL = 'resnet50'
PRETRAINED = True

# (Optional) Your own model:
# Provide "module.path:ClassName" OR leave as None. Your module must be on PYTHONPATH.
CUSTOM_MODEL = None          # e.g. 'my_models.vision:TinyNet'
CUSTOM_WEIGHTS = None        # Optional: path to a state_dict or checkpoint

# Inference config (used in the inference cell at the bottom)
INFER_CHECKPOINT = None      # set after training, e.g. f"{OUTPUT_DIR}/best_model.pt"
INFER_CLASS_INDEX = None     # optional override .json if you want
INFER_INPUT = '/path/to/image_or_dir'  # image file or directory
TOPK = 5


In [None]:

import os, json, math, random, time, csv
from pathlib import Path
from typing import List, Tuple, Dict, Optional

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from PIL import Image

try:
    from sklearn.model_selection import StratifiedShuffleSplit
    HAVE_SKLEARN = True
except Exception:
    HAVE_SKLEARN = False

import matplotlib.pyplot as plt

def set_seed(seed: int = 42):
    if seed is None or seed < 0:
        return
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def timestamp():
    return time.strftime("%Y%m%d-%H%M%S")


In [None]:

class CSVDataset(Dataset):
    def __init__(self, rows: List[Tuple[str, str]], data_root: str, transform=None, class_to_idx: Optional[Dict[str, int]] = None):
        self.data_root = Path(data_root)
        self.rows = rows
        self.transform = transform
        labels = sorted({r[1] for r in rows})
        if class_to_idx is None:
            self.class_to_idx = {c: i for i, c in enumerate(labels)}
        else:
            self.class_to_idx = class_to_idx

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        path_str, label = self.rows[idx]
        img_path = Path(path_str)
        if not img_path.is_absolute():
            img_path = self.data_root / img_path
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        target = self.class_to_idx[label]
        return img, target

def read_csv_labels(csv_path: str) -> List[Tuple[str, str]]:
    rows = []
    with open(csv_path, newline='') as f:
        reader = csv.DictReader(f)
        assert 'filepath' in reader.fieldnames and 'label' in reader.fieldnames, \            "CSV must contain 'filepath' and 'label' columns"
        for r in reader:
            rows.append((r['filepath'], r['label']))
    return rows

def collect_folder_images(root: str) -> List[Tuple[str, str]]:
    root = Path(root)
    rows = []
    for class_dir in sorted([p for p in root.iterdir() if p.is_dir()]):
        label = class_dir.name
        for img_path in class_dir.rglob('*'):
            if img_path.suffix.lower() in {'.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff', '.webp'}:
                rows.append((str(img_path), label))
    if not rows:
        raise RuntimeError(f"No images found under {root}. Expected folder-per-class structure.")
    return rows

def stratified_split(rows: List[Tuple[str, str]], val_size: float, test_size: float, seed: int):
    labels = [r[1] for r in rows]
    n = len(rows)
    idx = list(range(n))
    if HAVE_SKLEARN:
        sss1 = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=seed)
        trainval_idx, test_idx = next(sss1.split(idx, labels))
        labels_trainval = [labels[i] for i in trainval_idx]
        val_relative = val_size / (1 - test_size)
        sss2 = StratifiedShuffleSplit(n_splits=1, test_size=val_relative, random_state=seed)
        train_idx, val_idx = next(sss2.split(trainval_idx, labels_trainval))
        train_idx = [trainval_idx[i] for i in train_idx]
        val_idx = [trainval_idx[i] for i in val_idx]
    else:
        random.Random(seed).shuffle(idx)
        n_test = int(math.floor(n * test_size))
        n_val = int(math.floor(n * val_size))
        test_idx = idx[:n_test]
        val_idx = idx[n_test:n_test + n_val]
        train_idx = idx[n_test + n_val:]
    return train_idx, val_idx, test_idx


In [None]:

def build_transforms(img_size: int, aug: bool):
    train_tfms = [transforms.Resize((img_size, img_size))]
    if aug:
        train_tfms.extend([
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05)], p=0.5),
            transforms.RandomAffine(degrees=10, translate=(0.05, 0.05), scale=(0.95, 1.05)),
        ])
    train_tfms.extend([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    eval_tfms = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transforms.Compose(train_tfms), eval_tfms


In [None]:

def build_model(num_classes: int, arch: str = 'resnet50', pretrained: bool = True, 
                freeze_backbone: bool = False, custom_model: Optional[str] = None, custom_weights: Optional[str] = None):
    # Custom model path "module.path:ClassName"
    if custom_model:
        import importlib
        try:
            module_name, class_name = custom_model.split(":")
            Mod = importlib.import_module(module_name)
            ModelClass = getattr(Mod, class_name)
        except Exception as e:
            raise ValueError(f"Failed to import custom model '{custom_model}': {e}")
        # Try common constructor signatures
        try:
            model = ModelClass(num_classes=num_classes, pretrained=pretrained)
        except TypeError:
            try:
                model = ModelClass(num_classes=num_classes)
            except TypeError:
                model = ModelClass()
                # Heuristic: replace final layer if possible
                if hasattr(model, 'fc') and isinstance(model.fc, nn.Linear):
                    in_features = model.fc.in_features
                    model.fc = nn.Linear(in_features, num_classes)
                elif hasattr(model, 'classifier'):
                    if isinstance(model.classifier, nn.Linear):
                        in_features = model.classifier.in_features
                        model.classifier = nn.Linear(in_features, num_classes)
                    elif isinstance(model.classifier, nn.Sequential) and isinstance(model.classifier[-1], nn.Linear):
                        in_features = model.classifier[-1].in_features
                        model.classifier[-1] = nn.Linear(in_features, num_classes)
        if custom_weights:
            sd = torch.load(custom_weights, map_location='cpu')
            if isinstance(sd, dict) and 'state_dict' in sd:
                sd = sd['state_dict']
            model.load_state_dict(sd, strict=False)
        return model

    arch = arch.lower()
    if arch == 'resnet50':
        weights = models.ResNet50_Weights.IMAGENET1K_V2 if pretrained else None
        model = models.resnet50(weights=weights)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
    elif arch == 'efficientnet_b0':
        weights = models.EfficientNet_B0_Weights.IMAGENET1K_V1 if pretrained else None
        model = models.efficientnet_b0(weights=weights)
        in_features = model.classifier[-1].in_features
        model.classifier[-1] = nn.Linear(in_features, num_classes)
    else:
        raise ValueError(f"Unsupported model arch: {arch}")

    if freeze_backbone:
        for name, p in model.named_parameters():
            if 'fc' in name or 'classifier' in name:
                p.requires_grad = True
            else:
                p.requires_grad = False
    return model


In [None]:

def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)
        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))
        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append((correct_k.mul_(100.0 / batch_size).item()))
        return res

def save_json(obj: Dict, path: Path):
    with open(path, 'w') as f:
        json.dump(obj, f, indent=2)

def train_one_epoch(model, loader, optimizer, scaler, device, grad_clip):
    model.train()
    loss_fn = nn.CrossEntropyLoss()
    running_loss, running_top1 = 0.0, 0.0
    for images, targets in loader:
        images = images.to(device, non_blocking=True)
        targets = targets.to(device, non_blocking=True)
        optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=scaler is not None):
            outputs = model(images)
            loss = loss_fn(outputs, targets)
        if scaler is not None:
            scaler.scale(loss).backward()
            if grad_clip > 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            if grad_clip > 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            optimizer.step()

        top1, = accuracy(outputs, targets, topk=(1,))
        running_loss += loss.item() * images.size(0)
        running_top1 += top1 * images.size(0)
    return running_loss / len(loader.dataset), running_top1 / len(loader.dataset)

def evaluate(model, loader, device):
    model.eval()
    loss_fn = nn.CrossEntropyLoss()
    total_loss, total_top1, total_top5 = 0.0, 0.0, 0.0
    with torch.no_grad():
        for images, targets in loader:
            images = images.to(device, non_blocking=True)
            targets = targets.to(device, non_blocking=True)
            outputs = model(images)
            loss = loss_fn(outputs, targets)
            top1, top5 = accuracy(outputs, targets, topk=(1, 5))
            total_loss += loss.item() * images.size(0)
            total_top1 += top1 * images.size(0)
            total_top5 += top5 * images.size(0)
    n = len(loader.dataset)
    return total_loss / n, total_top1 / n, total_top5 / n


In [None]:

def build_datasets(data_root: str, csv_path: Optional[str], img_size: int, aug: bool, 
                   val_size: float, test_size: float, seed: int):
    if csv_path:
        rows = read_csv_labels(csv_path)
    else:
        rows = collect_folder_images(data_root)

    train_idx, val_idx, test_idx = stratified_split(rows, val_size, test_size, seed)

    out = Path(OUTPUT_DIR)
    out.mkdir(parents=True, exist_ok=True)
    for name, indices in [('train', train_idx), ('val', val_idx), ('test', test_idx)]:
        with open(out / f"split_{name}.csv", 'w', newline='') as f:
            w = csv.writer(f); w.writerow(['filepath', 'label'])
            for i in indices: w.writerow(rows[i])

    train_tfms, eval_tfms = build_transforms(img_size, aug)
    labels_sorted = sorted({r[1] for r in rows})
    class_to_idx = {c: i for i, c in enumerate(labels_sorted)}

    def subset(indices, train=False):
        subrows = [rows[i] for i in indices]
        return CSVDataset(subrows, data_root, transform=train_tfms if train else eval_tfms, class_to_idx=class_to_idx)

    ds_train = subset(train_idx, train=True)
    ds_val   = subset(val_idx, train=False)
    ds_test  = subset(test_idx, train=False)
    return ds_train, ds_val, ds_test, class_to_idx


In [None]:

# ==== TRAIN ====
set_seed(SEED)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ds_train, ds_val, ds_test, class_to_idx = build_datasets(DATA_ROOT, CSV_PATH, IMG_SIZE, AUG, VAL_SIZE, TEST_SIZE, SEED)
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
save_json(class_to_idx, Path(OUTPUT_DIR) / 'class_to_idx.json')

model = build_model(num_classes=len(class_to_idx), arch=MODEL, pretrained=PRETRAINED, 
                    freeze_backbone=FREEZE_BACKBONE, custom_model=CUSTOM_MODEL, custom_weights=CUSTOM_WEIGHTS)
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.AdamW(params, lr=LR, weight_decay=WEIGHT_DECAY)
scaler = torch.cuda.amp.GradScaler(enabled=AMP and device.type == 'cuda')

train_loader = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True, num_workers=WORKERS, pin_memory=True)
val_loader   = DataLoader(ds_val,   batch_size=BATCH_SIZE, shuffle=False, num_workers=WORKERS, pin_memory=True)
test_loader  = DataLoader(ds_test,  batch_size=BATCH_SIZE, shuffle=False, num_workers=WORKERS, pin_memory=True)

steps_per_epoch = max(1, len(train_loader))
scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS * steps_per_epoch)

best_val = float('inf')
best_path = Path(OUTPUT_DIR) / 'best_model.pt'
history_path = Path(OUTPUT_DIR) / 'training_metrics.csv'

with open(history_path, 'w') as f:
    f.write('epoch,train_loss,train_top1,val_loss,val_top1,val_top5\n')

patience_counter = 0
hist = {'epoch': [], 'train_loss': [], 'train_top1': [], 'val_loss': [], 'val_top1': [], 'val_top5': []}

for epoch in range(1, EPOCHS + 1):
    t0 = time.time()
    train_loss, train_top1 = train_one_epoch(model, train_loader, optimizer, scaler, device, GRAD_CLIP)
    val_loss, val_top1, val_top5 = evaluate(model, val_loader, device)

    scheduler.step()

    with open(history_path, 'a') as f:
        f.write(f"{epoch},{train_loss:.6f},{train_top1:.2f},{val_loss:.6f},{val_top1:.2f},{val_top5:.2f}\n")

    hist['epoch'].append(epoch)
    hist['train_loss'].append(train_loss)
    hist['train_top1'].append(train_top1)
    hist['val_loss'].append(val_loss)
    hist['val_top1'].append(val_top1)
    hist['val_top5'].append(val_top5)

    print(f"Epoch {epoch:03d} | train loss {train_loss:.4f} acc@1 {train_top1:.2f}% | val loss {val_loss:.4f} acc@1 {val_top1:.2f}% acc@5 {val_top5:.2f}% | {time.time()-t0:.1f}s")

    if val_loss < best_val:
        best_val = val_loss
        torch.save({'model_state': model.state_dict(),
                    'arch': MODEL,
                    'custom_model': CUSTOM_MODEL,
                    'num_classes': len(class_to_idx),
                    'class_to_idx': class_to_idx,
                    'img_size': IMG_SIZE}, best_path)
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE:
            print(f"Early stopping triggered at epoch {epoch}.")
            break

# Final test on best ckpt
ckpt = torch.load(best_path, map_location=device)
rebuild_model = build_model(ckpt.get('num_classes', len(ckpt['class_to_idx'])),
                            arch=ckpt.get('arch','resnet50'),
                            pretrained=False,
                            custom_model=ckpt.get('custom_model'))
rebuild_model.load_state_dict(ckpt['model_state'], strict=False)
rebuild_model.to(device)

test_loss, test_top1, test_top5 = evaluate(rebuild_model, test_loader, device)
print(f"Test: loss {test_loss:.4f} acc@1 {test_top1:.2f}% acc@5 {test_top5:.2f}%")


In [None]:

# ==== PLOT METRICS ====
# Single-axes plots without specifying colors or styles (per instructions)
if len(hist['epoch']) > 0:
    plt.figure()
    plt.plot(hist['epoch'], hist['train_loss'], label='train_loss')
    plt.plot(hist['epoch'], hist['val_loss'], label='val_loss')
    plt.xlabel('epoch'); plt.ylabel('loss'); plt.legend(); plt.title('Loss vs Epoch')
    plt.show()

    plt.figure()
    plt.plot(hist['epoch'], hist['train_top1'], label='train_acc1')
    plt.plot(hist['epoch'], hist['val_top1'], label='val_acc1')
    plt.xlabel('epoch'); plt.ylabel('acc@1 (%)'); plt.legend(); plt.title('Accuracy vs Epoch')
    plt.show()


In [None]:

def load_checkpoint(checkpoint: str, device: torch.device):
    ckpt = torch.load(checkpoint, map_localtion=device) if False else torch.load(checkpoint, map_location=device)
    arch = ckpt.get('arch', 'resnet50')
    custom_model = ckpt.get('custom_model', None)
    num_classes = ckpt.get('num_classes', len(ckpt['class_to_idx']))
    model = build_model(num_classes, arch=arch, pretrained=False, custom_model=custom_model)
    model.load_state_dict(ckpt['model_state'], strict=False)
    model.to(device)
    model.eval()
    return model, ckpt['class_to_idx'], ckpt.get('img_size', 224)

def infer_on_path(model, class_to_idx: Dict[str, int], img_path: Path, img_size: int, device: torch.device, topk: int = 5):
    inv_map = {v: k for k, v in class_to_idx.items()}
    tfm = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    img = Image.open(img_path).convert("RGB")
    x = tfm(img).unsqueeze(0).to(device)
    with torch.no_grad():
        logits = model(x)
        probs = torch.softmax(logits, dim=1)
        top_probs, top_idx = probs.topk(topk, dim=1)
    preds = [(inv_map[i.item()], float(top_probs[0, j].item())) for j, i in enumerate(top_idx[0])]
    return preds


In [None]:

# ==== INFERENCE ====
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
ckpt_path = INFER_CHECKPOINT or (Path(OUTPUT_DIR) / 'best_model.pt')
assert os.path.exists(ckpt_path), f"Checkpoint not found: {ckpt_path}"

model, class_to_idx, img_size = load_checkpoint(str(ckpt_path), device)

if INFER_CLASS_INDEX and os.path.exists(INFER_CLASS_INDEX):
    with open(INFER_CLASS_INDEX) as f:
        class_to_idx = json.load(f)

target = Path(INFER_INPUT)
assert target.exists(), "INFER_INPUT must be a valid file or directory"

paths = []
if target.is_dir():
    for p in sorted(target.rglob('*')):
        if p.suffix.lower() in {'.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff', '.webp'}:
            paths.append(p)
else:
    paths = [target]

for p in paths:
    preds = infer_on_path(model, class_to_idx, p, img_size, device, topk=TOPK)
    print(f"{p} ->")
    for cls, prob in preds:
        print(f"  {cls:>20s}: {prob*100:.2f}%")
