# Preparation



In [None]:
# --- Mount Google Drive to save models/checkpoints ---
from google.colab import drive
drive.mount('/content/drive')

# Where to save outputs
SAVE_DIR = "/content/drive/MyDrive/model_comparison"
import os
os.makedirs(SAVE_DIR, exist_ok=True)
print("SAVE_DIR:", SAVE_DIR)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
SAVE_DIR: /content/drive/MyDrive/model_comparison


In [None]:
!pip -q install kaggle kagglehub

from google.colab import files
print("👉 Upload your kaggle.json (create from https://www.kaggle.com/settings/account)")
files.upload()  # select kaggle.json

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

import kagglehub
rtk_path = kagglehub.dataset_download("tallwinkingstan/road-traversing-knowledge-rtk-dataset")
print("Path to dataset files:", rtk_path)

👉 Upload your kaggle.json (create from https://www.kaggle.com/settings/account)


Saving kaggle.json to kaggle.json
Downloading from https://www.kaggle.com/api/v1/datasets/download/tallwinkingstan/road-traversing-knowledge-rtk-dataset?dataset_version_number=1...


100%|██████████| 723M/723M [00:20<00:00, 36.9MB/s]


Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/tallwinkingstan/road-traversing-knowledge-rtk-dataset/versions/1


In [None]:
import os, shutil, random
from pathlib import Path
random.seed(42)

# Try to locate the root RTK directory (case/typo tolerant)
candidates = [
    "RTK_Dataset", "RTK_dataset", "RTK_dataset/",
    "RTK_Dataset/", "RTK_dataset/RTK_Dataset", "RTK_Dataset/RTK_dataset"
]
DATASET_DIR = None
for c in candidates:
    p = os.path.join(rtk_path, c)
    if os.path.isdir(p):
        DATASET_DIR = p
        break

# If none of the above worked, try directly scanning one level down
if DATASET_DIR is None:
    for name in os.listdir(rtk_path):
        full = os.path.join(rtk_path, name)
        if os.path.isdir(full) and "rtk" in name.lower():
            DATASET_DIR = full
            break

print("Detected DATASET_DIR:", DATASET_DIR)
assert DATASET_DIR and os.path.isdir(DATASET_DIR), "Could not find RTK dataset folder. Inspect rtk_path printed above."

# Build a tolerant mapping for source → target class
def first_existing(*parts):
    """Return the first subpath that actually exists inside DATASET_DIR."""
    for rel in parts:
        full = os.path.join(DATASET_DIR, rel)
        if os.path.isdir(full):
            return rel
    return None

label_map = {}

# Asphalt
label_map[first_existing("asphalt/asphaltGood", "asphalt/Good", "asphalt/good")] = "asphalt_good"
label_map[first_existing("asphalt/asphaltRegular", "asphalt/Regular", "asphalt/regular")] = "asphalt_regular"
label_map[first_existing("asphalt/asphaltBad", "asphalt/Bad", "asphalt/bad")] = "asphalt_bad"

# Paved
label_map[first_existing("paved/pavedRegular", "paved/Regular", "paved/regular")] = "paved_regular"
label_map[first_existing("paved/pavedBad", "paved/Bad", "paved/bad")] = "paved_bad"

# Unpaved (handle common 'upaved' typo)
label_map[first_existing("unpaved/unpavedRegular", "upaved/unpavedRegular", "unpaved/Regular", "upaved/Regular", "unpaved/regular")] = "unpaved_regular"
label_map[first_existing("unpaved/unpavedBad", "upaved/unpavedBad", "unpaved/Bad", "upaved/Bad", "unpaved/bad")] = "unpaved_bad"

# Clean None keys if any didn't exist
label_map = {k:v for k,v in label_map.items() if k is not None}
print("Resolved source → target classes:")
for k,v in label_map.items():
    print(" ", k, "→", v)

expected_targets = {
    "asphalt_good","asphalt_regular","asphalt_bad",
    "paved_regular","paved_bad",
    "unpaved_regular","unpaved_bad"
}
assert set(label_map.values()) == expected_targets, f"Missing classes. Got {set(label_map.values())}"

# Output dir
OUTPUT_DIR = "/content/prepared_dataset"
for split in ["train", "val", "test"]:
    for cls in expected_targets:
        Path(f"{OUTPUT_DIR}/{split}/{cls}").mkdir(parents=True, exist_ok=True)

train_ratio, val_ratio, test_ratio = 0.70, 0.15, 0.15

def split_and_copy(src_dir, dst_label):
    files = [f for f in os.listdir(src_dir) if not f.startswith(".")]
    random.shuffle(files)
    n = len(files)
    n_train = int(n*train_ratio)
    n_val   = int(n*val_ratio)
    splits = {
        "train": files[:n_train],
        "val":   files[n_train:n_train+n_val],
        "test":  files[n_train+n_val:]
    }
    for split, split_files in splits.items():
        for f in split_files:
            src = os.path.join(src_dir, f)
            dst = os.path.join(OUTPUT_DIR, split, dst_label, f)
            if os.path.isfile(src):
                shutil.copy(src, dst)

# Do the split
for rel_src, tgt in label_map.items():
    split_and_copy(os.path.join(DATASET_DIR, rel_src), tgt)

print("✅ Prepared at:", OUTPUT_DIR)

# Show quick counts
from collections import Counter
def count_images(root):
    counts = {}
    for split in ["train","val","test"]:
        c = Counter()
        for cls in expected_targets:
            d = os.path.join(root, split, cls)
            c[cls] = len([x for x in os.listdir(d) if not x.startswith(".")])
        counts[split] = dict(c)
    return counts

counts = count_images(OUTPUT_DIR)
counts


Detected DATASET_DIR: /root/.cache/kagglehub/datasets/tallwinkingstan/road-traversing-knowledge-rtk-dataset/versions/1/RTK_Dataset
Resolved source → target classes:
  asphalt/asphaltGood → asphalt_good
  asphalt/asphaltRegular → asphalt_regular
  asphalt/asphaltBad → asphalt_bad
  paved/pavedRegular → paved_regular
  paved/pavedBad → paved_bad
  upaved/unpavedRegular → unpaved_regular
  upaved/unpavedBad → unpaved_bad
✅ Prepared at: /content/prepared_dataset


{'train': {'unpaved_regular': 557,
  'asphalt_good': 1384,
  'asphalt_regular': 587,
  'paved_regular': 226,
  'asphalt_bad': 324,
  'unpaved_bad': 415,
  'paved_bad': 86},
 'val': {'unpaved_regular': 119,
  'asphalt_good': 296,
  'asphalt_regular': 125,
  'paved_regular': 48,
  'asphalt_bad': 69,
  'unpaved_bad': 88,
  'paved_bad': 18},
 'test': {'unpaved_regular': 120,
  'asphalt_good': 298,
  'asphalt_regular': 127,
  'paved_regular': 50,
  'asphalt_bad': 71,
  'unpaved_bad': 90,
  'paved_bad': 20}}

In [None]:
import os, shutil, random
from pathlib import Path

SRC_ROOT = "/content/prepared_dataset"   # your current dataset
DST_ROOT = "/content/prepared_dataset_balanced"  # new balanced copy
CLASSES  = sorted(os.listdir(os.path.join(SRC_ROOT, "train")))
random.seed(42)

def balance_split(split):
    per_class_files = {}
    for cls in CLASSES:
        d = os.path.join(SRC_ROOT, split, cls)
        files = [os.path.join(d, f) for f in os.listdir(d) if not f.startswith(".")]
        random.shuffle(files)
        per_class_files[cls] = files

    # target = min class count
    min_n = min(len(v) for v in per_class_files.values())
    print(f"{split}: balancing to {min_n} per class")

    # make destination dirs
    for cls in CLASSES:
        Path(os.path.join(DST_ROOT, split, cls)).mkdir(parents=True, exist_ok=True)

    # copy
    for cls, files in per_class_files.items():
        chosen = files[:min_n]
        for src in chosen:
            dst = os.path.join(DST_ROOT, split, cls, os.path.basename(src))
            shutil.copy(src, dst)

def count_split(root, split):
    return {cls: len(os.listdir(os.path.join(root, split, cls))) for cls in CLASSES}

# Balance train/val/test
for split in ["train", "val", "test"]:
    balance_split(split)
    print(split, count_split(DST_ROOT, split))

print("✅ Balanced dataset saved to:", DST_ROOT)

train: balancing to 86 per class
train {'asphalt_bad': 86, 'asphalt_good': 86, 'asphalt_regular': 86, 'paved_bad': 86, 'paved_regular': 86, 'unpaved_bad': 86, 'unpaved_regular': 86}
val: balancing to 18 per class
val {'asphalt_bad': 18, 'asphalt_good': 18, 'asphalt_regular': 18, 'paved_bad': 18, 'paved_regular': 18, 'unpaved_bad': 18, 'unpaved_regular': 18}
test: balancing to 20 per class
test {'asphalt_bad': 20, 'asphalt_good': 20, 'asphalt_regular': 20, 'paved_bad': 20, 'paved_regular': 20, 'unpaved_bad': 20, 'unpaved_regular': 20}
✅ Balanced dataset saved to: /content/prepared_dataset_balanced


# Model Training & Comparison

In [None]:
!pip -q install pandas scikit-learn

import os, time, json, math, copy, random
from pathlib import Path

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

In [None]:
# =========================================================
# Train 5 models on your balanced RTK dataset and save all artifacts
# =========================================================
!pip -q install torch torchvision torchaudio --upgrade
!pip -q install pandas scikit-learn matplotlib

import os, time, json, math, copy, random
from pathlib import Path
from collections import defaultdict

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt

# ---------------- Config ----------------
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

DATA_ROOT = Path("/content/prepared_dataset_balanced")  # from your balancing step
assert (DATA_ROOT/"train").exists(), "Balanced dataset not found at /content/prepared_dataset_balanced"

EPOCHS = 10
BATCH_SIZE = 32
LR = 1e-3
WEIGHT_DECAY = 1e-4
NUM_WORKERS = 2

RUN_MODELS = ["CNN", "RESNET18", "MOBILENETV2", "VGG16", "INCEPTIONV3"]  # change to subset if needed

SAVE_DIR = Path(SAVE_DIR)  # from your earlier cell
SAVE_DIR.mkdir(parents=True, exist_ok=True)

print("Using device:", DEVICE)
print("Saving to:", SAVE_DIR)

# ---------------- Transforms ----------------
train_tf_224 = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.2,0.2,0.2,0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])
val_tf_224 = transforms.Compose([
    transforms.Resize(256), transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])
train_tf_299 = transforms.Compose([
    transforms.RandomResizedCrop(299, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.2,0.2,0.2,0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5]),
])
val_tf_299 = transforms.Compose([
    transforms.Resize(342), transforms.CenterCrop(299),
    transforms.ToTensor(),
    transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5]),
])

def make_dataloaders(root: Path, image_size: int):
    tr_tf, va_tf = (train_tf_224, val_tf_224) if image_size==224 else (train_tf_299, val_tf_299)
    train_ds = datasets.ImageFolder(root/"train", transform=tr_tf)
    val_ds   = datasets.ImageFolder(root/"val",   transform=va_tf)
    test_ds  = datasets.ImageFolder(root/"test",  transform=va_tf) if (root/"test").exists() else None

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=NUM_WORKERS, pin_memory=True)
    val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)
    test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, pin_memory=True) if test_ds else None
    return train_loader, val_loader, test_loader, train_ds.classes

# ---------------- Models ----------------
class SmallCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
        )
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), nn.Flatten(),
            nn.Linear(128, 128), nn.ReLU(), nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )
    def forward(self, x):
        return self.classifier(self.features(x))

def build_model(name: str, num_classes: int):
    n = name.upper()
    if n == "CNN":
        m = SmallCNN(num_classes); img=224
    elif n == "RESNET18":
        m = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        m.fc = nn.Linear(m.fc.in_features, num_classes); img=224
    elif n == "MOBILENETV2":
        m = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
        m.classifier[-1] = nn.Linear(m.classifier[-1].in_features, num_classes); img=224
    elif n == "VGG16":
        m = models.vgg16_bn(weights=models.VGG16_BN_Weights.DEFAULT)
        m.classifier[-1] = nn.Linear(m.classifier[-1].in_features, num_classes); img=224
    elif n == "INCEPTIONV3":
        m = models.inception_v3(weights=models.Inception_V3_Weights.DEFAULT, aux_logits=True)
        m.fc = nn.Linear(m.fc.in_features, num_classes)
        if m.aux_logits:
            m.AuxLogits.fc = nn.Linear(m.AuxLogits.fc.in_features, num_classes)
        img=299
    else:
        raise ValueError(name)
    params = sum(p.numel() for p in m.parameters())
    return m, img, params

# ---------------- Train/Eval helpers ----------------
def train_one_epoch(model, loader, criterion, optimizer, scaler=None, use_inception=False):
    model.train()
    tl, tc, tn = 0.0, 0, 0
    for x,y in loader:
        x,y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad(set_to_none=True)
        if scaler:
            with torch.cuda.amp.autocast():
                out = model(x)
                if use_inception and isinstance(out, tuple):
                    logits, aux = out
                    loss = criterion(logits, y) + 0.4*criterion(aux, y)
                else:
                    logits = out
                    loss = criterion(logits, y)
            scaler.scale(loss).backward(); scaler.step(optimizer); scaler.update()
        else:
            out = model(x)
            if use_inception and isinstance(out, tuple):
                logits, aux = out; loss = criterion(logits, y) + 0.4*criterion(aux, y)
            else:
                logits = out; loss = criterion(logits, y)
            loss.backward(); optimizer.step()
        tl += loss.item()*x.size(0)
        pred = logits.argmax(1); tn += x.size(0); tc += (pred==y).sum().item()
    return tl/tn, tc/tn

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    tl, tc, tn = 0.0, 0, 0
    y_all, p_all = [], []
    for x,y in loader:
        x,y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        if isinstance(out, tuple): out = out[0]
        loss = criterion(out, y)
        tl += loss.item()*x.size(0)
        p = out.argmax(1)
        tn += x.size(0); tc += (p==y).sum().item()
        y_all.append(y.detach().cpu().numpy()); p_all.append(p.detach().cpu().numpy())
    y_all = np.concatenate(y_all) if y_all else np.array([])
    p_all = np.concatenate(p_all) if p_all else np.array([])
    return tl/tn, tc/tn, y_all, p_all

@torch.no_grad()
def measure_inference_time(model, image_size=224, repeats=30):
    model.eval()
    dummy = torch.randn(1,3,image_size,image_size, device=DEVICE)
    for _ in range(5): _ = model(dummy)
    if DEVICE.type == "cuda": torch.cuda.synchronize()
    t0 = time.time()
    for _ in range(repeats): _ = model(dummy)
    if DEVICE.type == "cuda": torch.cuda.synchronize()
    return (time.time()-t0)*1000.0/repeats

def plot_confusion(cm, classes, out_png):
    plt.figure(figsize=(7,6))
    plt.imshow(cm, interpolation='nearest')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted"); plt.ylabel("True")
    plt.xticks(range(len(classes)), classes, rotation=45, ha="right")
    plt.yticks(range(len(classes)), classes)
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, str(cm[i,j]), ha="center", va="center")
    plt.tight_layout()
    plt.savefig(out_png, dpi=140); plt.close()

def plot_curves(history, out_png):
    ep = range(1, len(history["train_acc"])+1)
    plt.figure(figsize=(7,5))
    plt.plot(ep, history["train_acc"], label="train_acc")
    plt.plot(ep, history["val_acc"],   label="val_acc")
    plt.plot(ep, history["train_loss"], label="train_loss")
    plt.plot(ep, history["val_loss"],   label="val_loss")
    plt.legend(); plt.xlabel("epoch"); plt.ylabel("value"); plt.title("Learning Curves")
    plt.tight_layout(); plt.savefig(out_png, dpi=140); plt.close()

# ---------------- Run all models ----------------
results = []
all_class_names = sorted(os.listdir(DATA_ROOT/"train"))
print("Classes:", all_class_names)

for model_name in RUN_MODELS:
    print("\n"+"="*72)
    print(f"Training: {model_name}")
    # Make loaders first (we need correct image size per arch)
    dummy, img_size, _ = build_model(model_name, num_classes=len(all_class_names))
    train_loader, val_loader, test_loader, class_names = make_dataloaders(DATA_ROOT, img_size)
    assert class_names == all_class_names, "Class order mismatch between splits."

    # Build model
    model, img_size, n_params = build_model(model_name, num_classes=len(class_names))
    model = model.to(DEVICE)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=2, verbose=True)
    scaler = torch.cuda.amp.GradScaler() if DEVICE.type=="cuda" else None
    use_inception = (model_name.upper()=="INCEPTIONV3")

    history = {"train_loss":[], "val_loss":[], "train_acc":[], "val_acc":[]}
    best_va, best_state = 0.0, None

    for ep in range(1, EPOCHS+1):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, criterion, optimizer, scaler, use_inception)
        va_loss, va_acc, _, _ = evaluate(model, val_loader, criterion)
        scheduler.step(va_acc)
        history["train_loss"].append(tr_loss); history["train_acc"].append(tr_acc)
        history["val_loss"].append(va_loss);   history["val_acc"].append(va_acc)
        print(f"[{model_name}] Ep {ep:02d}/{EPOCHS} | Train {tr_acc:.4f}/{tr_loss:.4f} | Val {va_acc:.4f}/{va_loss:.4f}")
        if va_acc > best_va:
            best_va = va_acc; best_state = copy.deepcopy(model.state_dict())

    if best_state is not None:
        model.load_state_dict(best_state)

    # Save artifacts
    out_dir = SAVE_DIR / model_name
    out_dir.mkdir(parents=True, exist_ok=True)
    torch.save({
        "model_state": model.state_dict(),
        "classes": class_names,
        "image_size": img_size,
        "best_val_acc": float(best_va)
    }, out_dir / f"{model_name}_best.pth")
    with open(out_dir / "classes.json", "w") as f:
        json.dump(class_names, f, indent=2)
    plot_curves(history, out_dir / "learning_curves.png")

    # Validation CM + report
    _, _, y_true, y_pred = evaluate(model, val_loader, criterion)
    if y_true.size and y_pred.size:
        cm_val = confusion_matrix(y_true, y_pred, labels=list(range(len(class_names))))
        np.savetxt(out_dir / "confusion_val.csv", cm_val, fmt="%d", delimiter=",")
        with open(out_dir / "classification_report_val.txt","w") as f:
            f.write(classification_report(y_true, y_pred, target_names=class_names, digits=3))
        plot_confusion(cm_val, class_names, out_dir / "confusion_val.png")

    # Optional test
    test_acc = None
    if test_loader is not None:
        _, test_acc, y_t, y_p = evaluate(model, test_loader, criterion)
        if y_t.size and y_p.size:
            cm_test = confusion_matrix(y_t, y_p, labels=list(range(len(class_names))))
            np.savetxt(out_dir / "confusion_test.csv", cm_test, fmt="%d", delimiter=",")
            plot_confusion(cm_test, class_names, out_dir / "confusion_test.png")
        print(f"[{model_name}] Test Acc:", None if test_acc is None else f"{test_acc:.4f}")

    # Inference speed
    inf_ms = measure_inference_time(model, image_size=img_size, repeats=30)

    results.append({
        "Model": model_name,
        "Params (M)": round(n_params/1e6, 2),
        "Val Acc": round(float(best_va), 4),
        "Test Acc": round(float(test_acc), 4) if test_acc is not None else None,
        "Image Size": img_size,
        "Per-Image Inference (ms)": round(float(inf_ms), 2),
        "Checkpoint": str(out_dir / f"{model_name}_best.pth")
    })

# ---------------- Summary table ----------------
df = pd.DataFrame(results).sort_values(by="Val Acc", ascending=False)
display(df)
df_path = SAVE_DIR / "comparison_results.csv"
df.to_csv(df_path, index=False)
print("✅ Saved comparison table to:", df_path)
print("✅ Per-model artifacts saved under:", SAVE_DIR)


Using device: cpu
Saving to: /content/drive/MyDrive/model_comparison
Classes: ['asphalt_bad', 'asphalt_good', 'asphalt_regular', 'paved_bad', 'paved_regular', 'unpaved_bad', 'unpaved_regular']

Training: CNN


TypeError: ReduceLROnPlateau.__init__() got an unexpected keyword argument 'verbose'