In [1]:
import os
import math
import random
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, Any, List

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from PIL import Image
import torchvision
import torchvision.transforms as T

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

device = torch.device(
    "cuda" if torch.cuda.is_available()
    else ("mps" if torch.backends.mps.is_available() else "cpu")
)
print("device:", device)
print("torch:", torch.__version__)
print("torchvision:", torchvision.__version__)

device: mps
torch: 2.9.1
torchvision: 0.24.1


In [2]:
CSV_PATH = "final_data_new_labels.csv"
IMAGES_ROOT = "processed_data"

OUT_DIR = Path("out_methods_bonus")
(OUT_DIR / "checkpoints").mkdir(parents=True, exist_ok=True)
(OUT_DIR / "tables").mkdir(parents=True, exist_ok=True)
(OUT_DIR / "logs").mkdir(parents=True, exist_ok=True)

assert os.path.isfile(CSV_PATH)
assert os.path.isdir(IMAGES_ROOT)

print("OUT_DIR:", OUT_DIR.resolve())

OUT_DIR: /Users/lisawang/Cornell/25Fall/AML/final/Vision-Based-Safety-Assessment-for-Pedestrian-Street-Crossing/4-dataset_and_training/out_methods_bonus


In [3]:
class PedXingDataset(Dataset):
    def __init__(self, csv_path, images_root, subset,
                 image_size=224, preprocess_mode="norm",
                 augment_mode="none", roadway_bin_size_m=5.0):
        super().__init__()
        self.df = pd.read_csv(csv_path)
        self.df = self.df[self.df["subset"] == subset].reset_index(drop=True)

        self.images_root = images_root
        self.image_size = image_size
        self.preprocess_mode = preprocess_mode
        self.augment_mode = augment_mode
        self.roadway_bin_size_m = roadway_bin_size_m

        self.col_filename = "new_filename" if "new_filename" in self.df.columns else "filename"
        if "safe_to_walk" in self.df.columns:
            self.col_safe = "safe_to_walk"
        elif "safe_to_cross" in self.df.columns:
            self.col_safe = "safe_to_cross"
        else:
            raise ValueError("Missing safe label")

        self.col_weather = "weather" if "weather" in self.df.columns else None
        self.col_tlight = "traffic_light" if "traffic_light" in self.df.columns else None
        if "crosswalk_signal" in self.df.columns:
            self.col_psignal = "crosswalk_signal"
        elif "pedestrian_signal" in self.df.columns:
            self.col_psignal = "pedestrian_signal"
        else:
            self.col_psignal = None

        self.col_roadway = "roadway_width" if "roadway_width" in self.df.columns else None

        self.col_crosswalk = "crosswalk" if "crosswalk" in self.df.columns else None
        self.col_car = "car" if "car" in self.df.columns else None
        self.col_scooter = "scooter" if "scooter" in self.df.columns else None
        self.col_bike = "bike" if "bike" in self.df.columns else None
        self.col_obstacles = "other_obstacles" if "other_obstacles" in self.df.columns else None

        self.transform = self._build_transform()

    def _build_transform(self):
        base = [
            T.Resize(self.image_size, antialias=True),
            T.CenterCrop(self.image_size),
        ]

        if self.augment_mode == "none":
            aug = []
            geom = base
        elif self.augment_mode == "basic":
            aug = [
                T.RandomResizedCrop(self.image_size, scale=(0.85, 1.0), antialias=True),
                T.RandomHorizontalFlip(p=0.5),
            ]
            geom = []
        elif self.augment_mode == "strong":
            aug = [
                T.RandomResizedCrop(self.image_size, scale=(0.75, 1.0), antialias=True),
                T.RandomHorizontalFlip(p=0.5),
                T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.15, hue=0.02),
            ]
            geom = []
        else:
            raise ValueError("augment_mode must be none/basic/strong")

        to_tensor = [T.ToTensor()]
        if self.preprocess_mode == "norm":
            norm = [T.Normalize(mean=[0.485, 0.456, 0.406],
                                std=[0.229, 0.224, 0.225])]
        elif self.preprocess_mode == "no_norm":
            norm = []
        else:
            raise ValueError("preprocess_mode must be norm/no_norm")

        return T.Compose(geom + aug + to_tensor + norm)

    def _roadway_width_to_bin(self, w):
        try:
            w = float(w)
        except Exception:
            return -1
        if not np.isfinite(w) or w < 0:
            return -1
        return int(math.floor(w / self.roadway_bin_size_m))

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        fname = str(row[self.col_filename])
        img = Image.open(os.path.join(self.images_root, fname)).convert("RGB")
        x = self.transform(img)

        y = {"safe_to_cross": torch.tensor(int(row[self.col_safe]), dtype=torch.long)}

        if self.col_weather is not None:
            y["weather"] = torch.tensor(int(row[self.col_weather]), dtype=torch.long)
        if self.col_psignal is not None:
            y["pedestrian_signal"] = torch.tensor(int(row[self.col_psignal]), dtype=torch.long)
        if self.col_tlight is not None:
            y["traffic_light"] = torch.tensor(int(row[self.col_tlight]), dtype=torch.long)
        if self.col_roadway is not None:
            y["roadway_width_bin"] = torch.tensor(self._roadway_width_to_bin(row[self.col_roadway]), dtype=torch.long)

        def add_bin(col, key):
            if col is None: return
            y[key] = torch.tensor(int(row[col]), dtype=torch.long)

        add_bin(self.col_crosswalk, "crosswalk")
        add_bin(self.col_car, "car")
        add_bin(self.col_scooter, "scooter")
        add_bin(self.col_bike, "bike")
        add_bin(self.col_obstacles, "other_obstacles")

        return x, y

In [4]:
def make_loader(subset, image_size, preprocess_mode, augment_mode, batch_size=16, shuffle=True):
    ds = PedXingDataset(CSV_PATH, IMAGES_ROOT, subset, image_size=image_size,
                       preprocess_mode=preprocess_mode, augment_mode=augment_mode)
    use_pin = torch.cuda.is_available()
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle, num_workers=0, pin_memory=use_pin)

def infer_roadway_num_classes(train_ds: PedXingDataset) -> int:
    if train_ds.col_roadway is None:
        return 1
    bins = []
    for i in range(len(train_ds.df)):
        b = train_ds._roadway_width_to_bin(train_ds.df.iloc[i][train_ds.col_roadway])
        if b >= 0:
            bins.append(b)
    return int(max(bins) + 1) if len(bins) else 1

In [5]:
class ResNetBackbone(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        weights = torchvision.models.ResNet18_Weights.DEFAULT if pretrained else None
        m = torchvision.models.resnet18(weights=weights, progress=False)
        self.features = nn.Sequential(*list(m.children())[:-1])
        self.feat_dim = 512
    def forward(self, x):
        return self.features(x).flatten(1)

class MLPHead(nn.Module):
    def __init__(self, in_dim, out_dim, hidden_dim=256, dropout=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden_dim), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(hidden_dim, out_dim),
        )
    def forward(self, z):
        return self.net(z)

class MultiTaskResNet(nn.Module):
    def __init__(self, roadway_num_classes, pretrained=True):
        super().__init__()
        self.backbone = ResNetBackbone(pretrained=pretrained)
        d = self.backbone.feat_dim
        self.heads = nn.ModuleDict({
            "safe_to_cross": MLPHead(d, 2),
            "weather": MLPHead(d, 3),
            "pedestrian_signal": MLPHead(d, 3),
            "traffic_light": MLPHead(d, 3),
            "roadway_width_bin": MLPHead(d, roadway_num_classes),
            "crosswalk": MLPHead(d, 2),
            "car": MLPHead(d, 2),
            "scooter": MLPHead(d, 2),
            "bike": MLPHead(d, 2),
            "other_obstacles": MLPHead(d, 2),
        })
    def forward(self, x):
        z = self.backbone(x)
        return {k: h(z) for k, h in self.heads.items()}

class SmallCNN(nn.Module):
    def __init__(self, roadway_num_classes):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 5, stride=2, padding=2), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1)),
        )
        d = 128
        self.heads = nn.ModuleDict({
            "safe_to_cross": nn.Linear(d, 2),
            "weather": nn.Linear(d, 3),
            "pedestrian_signal": nn.Linear(d, 3),
            "traffic_light": nn.Linear(d, 3),
            "roadway_width_bin": nn.Linear(d, roadway_num_classes),
            "crosswalk": nn.Linear(d, 2),
            "car": nn.Linear(d, 2),
            "scooter": nn.Linear(d, 2),
            "bike": nn.Linear(d, 2),
            "other_obstacles": nn.Linear(d, 2),
        })
    def forward(self, x):
        z = self.features(x).flatten(1)
        return {k: h(z) for k, h in self.heads.items()}

In [6]:
def accuracy_from_logits(logits, y):
    return (logits.argmax(1) == y).float().mean().item()

@torch.no_grad()
def evaluate_safe(model, loader):
    model.eval()
    accs = []
    for x, y in loader:
        x = x.to(device)
        y = {k: v.to(device) for k, v in y.items()}
        out = model(x)
        accs.append(accuracy_from_logits(out["safe_to_cross"], y["safe_to_cross"]))
    return float(np.mean(accs)) if accs else float("nan")

def compute_loss(out, y, weights):
    total = 0.0
    def add(key):
        nonlocal total
        if key not in out or key not in y:
            return
        yy = y[key]
        if key == "roadway_width_bin":
            mask = yy >= 0
            if mask.sum().item() == 0:
                return
            logits = out[key][mask]
            target = yy[mask]
        else:
            logits = out[key]
            target = yy
        total = total + weights.get(key, 0.0) * F.cross_entropy(logits, target)
    for k in ["safe_to_cross","weather","pedestrian_signal","traffic_light","roadway_width_bin",
              "crosswalk","car","scooter","bike","other_obstacles"]:
        add(k)
    return total

In [7]:
@dataclass
class ExpConfig:
    exp_name: str
    model_family: str          # "resnet" or "smallcnn"
    preprocess_mode: str       # "norm"
    augment_mode: str          # "basic" or "strong"
    pretrained: bool = True
    image_size: int = 224
    epochs: int = 20
    lr: float = 1e-3
    weight_decay: float = 1e-4

def run_one(cfg: ExpConfig) -> Dict[str, Any]:
    train_loader = make_loader("train", cfg.image_size, cfg.preprocess_mode, cfg.augment_mode, shuffle=True)
    val_loader = make_loader("val", cfg.image_size, cfg.preprocess_mode, "none", shuffle=False)
    test_loader = make_loader("test", cfg.image_size, cfg.preprocess_mode, "none", shuffle=False)

    roadway_num_classes = infer_roadway_num_classes(train_loader.dataset)

    if cfg.model_family == "resnet":
        model = MultiTaskResNet(roadway_num_classes, pretrained=cfg.pretrained).to(device)
    elif cfg.model_family == "smallcnn":
        model = SmallCNN(roadway_num_classes).to(device)
    else:
        raise ValueError("bad model_family")

    opt = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=cfg.epochs)

    weights = {
        "safe_to_cross": 1.0, "weather": 0.3, "pedestrian_signal": 0.5, "traffic_light": 0.5,
        "roadway_width_bin": 0.3, "crosswalk": 0.2, "car": 0.2, "scooter": 0.2, "bike": 0.2, "other_obstacles": 0.2,
    }

    best = -1.0
    ckpt_path = OUT_DIR / "checkpoints" / f"{cfg.exp_name}.pt"
    hist = []

    for epoch in range(cfg.epochs):
        model.train()
        losses = []
        for x, y in train_loader:
            x = x.to(device)
            y = {k: v.to(device) for k, v in y.items()}
            out = model(x)
            L = compute_loss(out, y, weights)
            opt.zero_grad()
            L.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            opt.step()
            losses.append(float(L.item()))
        sched.step()

        val_acc = evaluate_safe(model, val_loader)
        hist.append({"exp_name": cfg.exp_name, "epoch": epoch, "train_loss": float(np.mean(losses)),
                     "val_safe_acc": float(val_acc), "lr": float(sched.get_last_lr()[0])})

        if val_acc > best:
            best = val_acc
            torch.save({"cfg": asdict(cfg), "state_dict": model.state_dict(),
                        "roadway_num_classes": roadway_num_classes}, ckpt_path)

        print(f"[{cfg.exp_name}] epoch={epoch} val_safe_acc={val_acc:.4f} best={best:.4f}")

    ckpt = torch.load(ckpt_path, map_location=device)
    model.load_state_dict(ckpt["state_dict"])
    test_acc = evaluate_safe(model, test_loader)

    hist_df = pd.DataFrame(hist)
    hist_csv = OUT_DIR / "logs" / f"{cfg.exp_name}_history.csv"
    hist_df.to_csv(hist_csv, index=False)

    return {**asdict(cfg),
            "roadway_num_classes": int(ckpt["roadway_num_classes"]),
            "best_val_safe_acc": float(best),
            "test_safe_acc": float(test_acc),
            "ckpt_path": str(ckpt_path),
            "history_csv": str(hist_csv)}

In [8]:
experiments = [
    ExpConfig(exp_name="B2_finetune_norm_strongaug", model_family="resnet", preprocess_mode="norm", augment_mode="strong", pretrained=True),
    ExpConfig(exp_name="D1_smallcnn_norm_basicaug", model_family="smallcnn", preprocess_mode="norm", augment_mode="basic", pretrained=False),
]

all_results = [run_one(cfg) for cfg in experiments]
results_df = pd.DataFrame(all_results)

results_csv = OUT_DIR / "tables" / "results_summary_bonus.csv"
results_df.to_csv(results_csv, index=False)
print("Saved:", results_csv)

results_df[["exp_name","model_family","augment_mode","epochs","lr","best_val_safe_acc","test_safe_acc"]].sort_values("test_safe_acc", ascending=False)

[B2_finetune_norm_strongaug] epoch=0 val_safe_acc=0.6597 best=0.6597
[B2_finetune_norm_strongaug] epoch=1 val_safe_acc=0.5851 best=0.6597
[B2_finetune_norm_strongaug] epoch=2 val_safe_acc=0.6007 best=0.6597
[B2_finetune_norm_strongaug] epoch=3 val_safe_acc=0.5729 best=0.6597
[B2_finetune_norm_strongaug] epoch=4 val_safe_acc=0.7222 best=0.7222
[B2_finetune_norm_strongaug] epoch=5 val_safe_acc=0.6354 best=0.7222
[B2_finetune_norm_strongaug] epoch=6 val_safe_acc=0.6632 best=0.7222
[B2_finetune_norm_strongaug] epoch=7 val_safe_acc=0.6441 best=0.7222
[B2_finetune_norm_strongaug] epoch=8 val_safe_acc=0.6944 best=0.7222
[B2_finetune_norm_strongaug] epoch=9 val_safe_acc=0.6788 best=0.7222
[B2_finetune_norm_strongaug] epoch=10 val_safe_acc=0.7222 best=0.7222
[B2_finetune_norm_strongaug] epoch=11 val_safe_acc=0.6944 best=0.7222
[B2_finetune_norm_strongaug] epoch=12 val_safe_acc=0.6858 best=0.7222
[B2_finetune_norm_strongaug] epoch=13 val_safe_acc=0.7691 best=0.7691
[B2_finetune_norm_strongaug] e

Unnamed: 0,exp_name,model_family,augment_mode,epochs,lr,best_val_safe_acc,test_safe_acc
0,B2_finetune_norm_strongaug,resnet,strong,20,0.001,0.769097,0.800347
1,D1_smallcnn_norm_basicaug,smallcnn,basic,20,0.001,0.600694,0.595486
