# Traffic Light Autoencoder - Starter Kit

In [29]:
import os, csv, random, subprocess, sys, zipfile
from pathlib import Path
from functools import lru_cache
from collections import defaultdict
import requests

import numpy as np
from PIL import Image

import torch, torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.transforms import InterpolationMode


## Step 1: Download the zip file

This snippet downloads the training dataset ZIP from the course server and saves it locally as training_dataset.zip. 


In [30]:
url = "http://hadi.cs.virginia.edu:9000/download/train-dataset-hw2"
out = Path("training_dataset.zip")

with requests.get(url, stream=True) as r:
    r.raise_for_status()
    with out.open("wb") as f:
        for chunk in r.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)


## Step 2: Provide the config

In [31]:
# ---------- CONFIG ----------
DATA_ROOT  = Path("data")
ZIP_PATH   = Path("training_dataset.zip")  # provided to students
TRAIN_DIR  = DATA_ROOT / "training_dataset"        # unzip target

IMG_SIZE   = 256 #NOT ALLOWED TO CHANGE
GRAYSCALE  = False
LATENT_DIM = 32
BATCH_SIZE = 64            # drop to 32/16 if OOM
EPOCHS     = 20
LR         = 2e-3
SEED       = 42
MIN_BOX    = 8
DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"
LAMBDA_L1  = 0.5       # Optional: use mixed loss for sharper recon (0..1) # 0 = pure MSE, 0.5 = half MSE + half L1
random.seed(SEED); torch.manual_seed(SEED)

<torch._C.Generator at 0x7f15bbd16910>

## Step 3: Unzip the training set

In [32]:

# ---------- Unzip training dataset ----------
def ensure_unzipped(zip_path: Path, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    # If folder already has images, skip
    has_images = any(p.suffix.lower() in {".jpg",".jpeg",".png",".bmp",".tif",".tiff"}
                     for p in out_dir.rglob("*"))
    if has_images:
        print(f"Training images already present under: {out_dir}")
        return
    if not zip_path.exists():
        raise SystemExit(f"❗️ Zip not found: {zip_path}\n"
                         f"Place training_dataset.zip at {zip_path} and rerun.")
    print(f"Unzipping {zip_path} -> {out_dir} ...")
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(out_dir)
    print("Unzip done.")

ensure_unzipped(ZIP_PATH, TRAIN_DIR)

Training images already present under: data/training_dataset


## Optional Step (Please READ)
This zip file was created on MACOS so it may happen that a __MACOSX folder may be created inside data/training_dataset. If it is, please execute below code to delete that folder as it leads 2xthe number of images, which will hamper your performance.

In [33]:
import shutil
import stat

target = Path("data/training_dataset/__MACOSX")

if target.exists():
    if target.is_dir():
        shutil.rmtree(target)
        print(f"Removed: {target}")
    else:
        print(f"Exists but is not a directory: {target}")
else:
    print("Nothing to remove.")


Nothing to remove.


## Step 4: Collect all training images

In [34]:
# ---------- Collect ALL training images (no labels needed) ----------
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
def is_image_file(p: Path) -> bool:
    return p.is_file() and p.suffix.lower() in IMG_EXTS

all_train_imgs = [str(p.resolve()) for p in TRAIN_DIR.rglob("*") if is_image_file(p)]
if not all_train_imgs:
    raise SystemExit(f"No images found under {TRAIN_DIR}. Check your zip contents.")

random.shuffle(all_train_imgs)
n = len(all_train_imgs); n_tr = int(0.9*n)
train_imgs = all_train_imgs[:n_tr]
val_imgs   = all_train_imgs[n_tr:]
print(f"Train/Val (ALL training_dataset/ images): train={len(train_imgs)}  val={len(val_imgs)}")


Train/Val (ALL training_dataset/ images): train=12630  val=1404


## Step 6: Get train_loader and val_loader

In [35]:
# ---------- Transforms / Datasets / Loaders (single-process, spawn-proof) ----------
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.transforms import InterpolationMode
import torch, gc

# --- HARD RESET: kill any old loaders/workers lingering in memory ---
for _name in ["train_loader", "val_loader"]:
    try:
        del globals()[_name]
    except KeyError:
        pass
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

def fullframe_transform(img_size=IMG_SIZE, grayscale=GRAYSCALE):
    t = [transforms.Resize((img_size, img_size), interpolation=InterpolationMode.BILINEAR)]
    if grayscale:
        t.append(transforms.Grayscale(1))
    t.append(transforms.ToTensor())  # [0,1]
    return transforms.Compose(t)

FF_TF = fullframe_transform()

class FullFrameDS(Dataset):
    def __init__(self, img_paths):
        self.paths = img_paths
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, i):
        p = self.paths[i]
        with Image.open(p) as im:
            im = im.convert("RGB")
            x  = FF_TF(im)
        return x, 0

PIN = (DEVICE == "cuda")

train_loader = DataLoader(
    FullFrameDS(train_imgs),
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,          # << no worker subprocesses
    pin_memory=PIN,
    drop_last=True,
)

val_loader = DataLoader(
    FullFrameDS(val_imgs),
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,          # << no worker subprocesses
    pin_memory=False,
    drop_last=False,
)

# Quick sanity: ensure single-process
assert train_loader.num_workers == 0 and val_loader.num_workers == 0


## Step 7: Define Autoencoder and evaluation functions

In [36]:
# ===== Step 7 — Define Autoencoder and evaluation functions (TorchScript-friendly) =====
from typing import Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F

# ---------- UNet-lite Blocks ----------
class DWSeparable(nn.Module):
    """Depthwise conv (3x3) + pointwise conv (1x1) + BN + GELU."""
    def __init__(self, in_ch: int, out_ch: int, stride: int = 1):
        super().__init__()
        self.dw = nn.Conv2d(in_ch, in_ch, 3, stride=stride, padding=1, groups=in_ch, bias=False)
        self.pw = nn.Conv2d(in_ch, out_ch, 1, bias=False)
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.GELU()
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.dw(x)
        x = self.pw(x)
        x = self.bn(x)
        return self.act(x)

class Conv1x1(nn.Module):
    def __init__(self, in_ch: int, out_ch: int):
        super().__init__()
        self.c = nn.Conv2d(in_ch, out_ch, 1, bias=False)
        self.b = nn.BatchNorm2d(out_ch)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return F.gelu(self.b(self.c(x)))

# ---------- Encoder ----------
class Enc(nn.Module):
    def __init__(self, in_ch: int, latent_dim: int, img_size: int = IMG_SIZE):
        super().__init__()
        # 256 -> 128 -> 64 -> 32 -> 16
        self.stem  = DWSeparable(in_ch, 32, stride=2)     # 32 x 128 x 128
        self.b1    = DWSeparable(32, 64, stride=2)        # 64 x 64 x 64
        self.b2    = DWSeparable(64, 96, stride=2)        # 96 x 32 x 32
        self.b3    = DWSeparable(96, 96, stride=2)        # 96 x 16 x 16

        self._feat_shape = (96, img_size // 16, img_size // 16)  # (C, H, W)
        c, h, w = self._feat_shape
        self.fc = nn.Linear(c * h * w, latent_dim)
        self.latent_dim = latent_dim  # <-- grader can find this

    def forward(
        self, x: torch.Tensor
    ) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]]:
        s0 = self.stem(x)   # 32x128x128
        s1 = self.b1(s0)    # 64x64x64
        s2 = self.b2(s1)    # 96x32x32
        s3 = self.b3(s2)    # 96x16x16
        c, h, w = self._feat_shape
        z = self.fc(s3.view(s3.size(0), c * h * w))
        return z, (s0, s1, s2, s3)

# ---------- Decoder ----------
class Dec(nn.Module):
    def __init__(self, out_ch: int, latent_dim: int, img_size: int = IMG_SIZE):
        super().__init__()
        # latent maps back to 96 x 16 x 16
        self._feat_shape = (96, img_size // 16, img_size // 16)
        c, H, W = self._feat_shape
        self.fc = nn.Linear(latent_dim, c * H * W)

        # match skip channels to h BEFORE each block (add, then DWSeparable may change channels)
        self.match3 = Conv1x1(96, 96)  # s3: 96x16x16 -> 96  (16x16 stage)
        self.match2 = Conv1x1(96, 96)  # s2: 96x32x32 -> 96  (32x32 stage)
        self.match1 = Conv1x1(64, 64)  # s1: 64x64x64 -> 64  (64x64 stage)
        self.match0 = Conv1x1(32, 48)  # s0: 32x128x128 -> 48 (128x128 stage)

        self.u3 = DWSeparable(96, 96)  # at 16x16
        self.u2 = DWSeparable(96, 64)  # at 32x32
        self.u1 = DWSeparable(64, 48)  # at 64x64
        self.u0 = DWSeparable(48, 32)  # at 128x128

        self.head = nn.Conv2d(32, out_ch, 1)

    def forward(
        self,
        z: torch.Tensor,
        skips: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor],
    ) -> torch.Tensor:
        s0, s1, s2, s3 = skips  # s0:128x128x32, s1:64x64x64, s2:32x32x96, s3:16x16x96
        c, H, W = self._feat_shape

        # map z -> 96x16x16
        h = self.fc(z).view(z.size(0), c, H, W)

        # 16x16 stage
        h = self.u3(h + self.match3(s3))

        # 32x32 stage
        h = F.interpolate(h, scale_factor=2.0, mode='nearest')
        h = self.u2(h + self.match2(s2))

        # 64x64 stage
        h = F.interpolate(h, scale_factor=2.0, mode='nearest')
        h = self.u1(h + self.match1(s1))

        # 128x128 stage
        h = F.interpolate(h, scale_factor=2.0, mode='nearest')
        h = self.u0(h + self.match0(s0))

        # 256x256 final
        h = F.interpolate(h, scale_factor=2.0, mode='nearest')
        return torch.sigmoid(self.head(h))

# ---------- Autoencoder wrapper ----------
class AE(nn.Module):
    def __init__(self, c: int, latent_dim: int, img_size: int = IMG_SIZE):
        super().__init__()
        self.enc = Enc(c, latent_dim, img_size)
        self.dec = Dec(c, latent_dim, img_size)
        self.latent_dim = latent_dim   # <-- top-level for grader

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        z, skips = self.enc(x)
        return self.dec(z, skips)

# Instantiate, optimizer, losses, eval util
CHANNELS = 1 if GRAYSCALE else 3
model = AE(CHANNELS, LATENT_DIM, IMG_SIZE).to(DEVICE)
opt   = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)

crit_mse = nn.MSELoss()
crit_l1  = nn.L1Loss()

def recon_loss(y: torch.Tensor, x: torch.Tensor) -> torch.Tensor:
    if LAMBDA_L1 <= 0:
        return crit_mse(y, x)
    return (1 - LAMBDA_L1) * crit_mse(y, x) + LAMBDA_L1 * crit_l1(y, x)

@torch.no_grad()
def eval_full_mse(model: nn.Module, loader) -> float:
    model.eval()
    total, n = 0.0, 0
    for x, _ in loader:
        x = x.to(DEVICE)
        y = model(x)
        total += crit_mse(y, x).item() * x.size(0)
        n += x.size(0)
    return total / max(1, n)

best_val = float("inf")
ckpt_path = "model_ts.pt"


## Step 8: Train the model

In [38]:
# ===== Training loop (console tqdm + epoch timing) =====
import time
import torch

try:
    # auto picks notebook/console; falls back gracefully if ipywidgets missing
    from tqdm.auto import tqdm
except Exception:
    from tqdm import tqdm

best_val = float("inf")
ckpt_path = "model_ts.pt"

for epoch in range(1, EPOCHS + 1):
    epoch_start = time.time()
    model.train()
    running = 0.0
    seen = 0

    pbar = tqdm(
        train_loader,
        desc=f"Train {epoch}/{EPOCHS}",
        leave=False,
        dynamic_ncols=True,
        mininterval=0.5,
    )

    for xb, _ in pbar:
        xb = xb.to(DEVICE, non_blocking=True)
        yb = model(xb)
        loss = recon_loss(yb, xb)

        opt.zero_grad(set_to_none=True)
        loss.backward()
        opt.step()

        running += loss.item() * xb.size(0)
        seen    += xb.size(0)
        pbar.set_postfix(loss=running / max(1, seen))

    pbar.close()

    # ---- validation ----
    val_mse = eval_full_mse(model, val_loader)
    train_mse = running / max(1, seen)
    epoch_time = time.time() - epoch_start

    # clean one-liner per epoch (no tqdm.write)
    print(f"Epoch {epoch:03d} | train_loss={train_mse:.6f} | val_mse={val_mse:.6f} | time={epoch_time:.1f}s")

    # ---- save best + TorchScript ----
    if val_mse < best_val:
        best_val = val_mse
        try:
            model.eval()
            with torch.no_grad():
                scripted = torch.jit.script(model.cpu())
                scripted.save(ckpt_path)
            model.to(DEVICE)
            print(f"  ↳ New best! val_mse={best_val:.6f} — saved TorchScript to {ckpt_path}")
        except Exception as e:
            # If scripting fails mid-training, don’t crash the run
            print(f"  ⚠️ TorchScript export failed this epoch: {e}")


Train 1/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 001 | train_loss=0.011027 | val_mse=0.001567 | time=155.2s
  ↳ New best! val_mse=0.001567 — saved TorchScript to model_ts.pt


Train 2/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 002 | train_loss=0.010102 | val_mse=0.001494 | time=153.0s
  ↳ New best! val_mse=0.001494 — saved TorchScript to model_ts.pt


Train 3/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 003 | train_loss=0.010152 | val_mse=0.001466 | time=148.3s
  ↳ New best! val_mse=0.001466 — saved TorchScript to model_ts.pt


Train 4/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 004 | train_loss=0.009935 | val_mse=0.001423 | time=148.3s
  ↳ New best! val_mse=0.001423 — saved TorchScript to model_ts.pt


Train 5/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 005 | train_loss=0.009770 | val_mse=0.001426 | time=157.4s


Train 6/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 006 | train_loss=0.009314 | val_mse=0.001405 | time=156.9s
  ↳ New best! val_mse=0.001405 — saved TorchScript to model_ts.pt


Train 7/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 007 | train_loss=0.009315 | val_mse=0.001422 | time=157.5s


Train 8/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 008 | train_loss=0.009149 | val_mse=0.001383 | time=157.8s
  ↳ New best! val_mse=0.001383 — saved TorchScript to model_ts.pt


Train 9/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 009 | train_loss=0.008709 | val_mse=0.001375 | time=156.9s
  ↳ New best! val_mse=0.001375 — saved TorchScript to model_ts.pt


Train 10/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 010 | train_loss=0.008586 | val_mse=0.001387 | time=148.4s


Train 11/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 011 | train_loss=0.008478 | val_mse=0.001359 | time=148.1s
  ↳ New best! val_mse=0.001359 — saved TorchScript to model_ts.pt


Train 12/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 012 | train_loss=0.008458 | val_mse=0.001361 | time=156.9s


Train 13/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 013 | train_loss=0.008254 | val_mse=0.001360 | time=157.1s


Train 14/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 014 | train_loss=0.008157 | val_mse=0.001339 | time=157.0s
  ↳ New best! val_mse=0.001339 — saved TorchScript to model_ts.pt


Train 15/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 015 | train_loss=0.008232 | val_mse=0.001344 | time=157.5s


Train 16/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 016 | train_loss=0.008154 | val_mse=0.001381 | time=157.5s


Train 17/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 017 | train_loss=0.008070 | val_mse=0.001326 | time=157.2s
  ↳ New best! val_mse=0.001326 — saved TorchScript to model_ts.pt


Train 18/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 018 | train_loss=0.008017 | val_mse=0.001327 | time=156.3s


Train 19/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 019 | train_loss=0.008090 | val_mse=0.001323 | time=156.3s
  ↳ New best! val_mse=0.001323 — saved TorchScript to model_ts.pt


Train 20/20:   0%|          | 0/197 [00:00<?, ?it/s]

Epoch 020 | train_loss=0.008011 | val_mse=0.001320 | time=158.0s
  ↳ New best! val_mse=0.001320 — saved TorchScript to model_ts.pt


## Step 9: Check Error on training dataset provided

In [39]:
all_train_loader = DataLoader(
    FullFrameDS(train_imgs + val_imgs),  # entire training_dataset/
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=nw,
    pin_memory=(DEVICE=="cuda"),
)

In [None]:
# ---- Rebuild all-train eval loader (single-process, spawn-proof) ----
from torch.utils.data import DataLoader

# pick which paths you want in "all": here I use your full training set
all_imgs = train_imgs  # or train_imgs + val_imgs if you truly want both

all_ds = FullFrameDS(all_imgs)

# Slightly larger batch for faster eval but still safe
EVAL_BATCH = min(4 * BATCH_SIZE, 64)

# Nuke any old loader
try:
    del all_train_loader
except NameError:
    pass

all_train_loader = DataLoader(
    all_ds,
    batch_size=EVAL_BATCH,
    shuffle=False,
    num_workers=0,      # << critical: no workers → no spawn/pickle
    pin_memory=False,
    drop_last=False,
)

# ---- Load scripted model and evaluate ----
model = torch.jit.load(ckpt_path, map_location=DEVICE)
model.eval()

with torch.no_grad():
    all_full_mse = eval_full_mse(model, all_train_loader)

print(f"Full-MSE on all-train set: {all_full_mse:.6f}")


In [None]:
model = torch.jit.load(ckpt_path, map_location=DEVICE)
model.eval()
all_full_mse   = eval_full_mse(model, all_train_loader)


In [None]:
print("\n=== Full-image MSE on training data ===")
print(f"Entire training_dataset: {all_full_mse:.6f}")

## Step 10: Submit to Server and Also Check status

In [7]:
import os, glob
for f in ["model_ts.pt","model_scripted.pt","TinyDemiUNet_scripted.pt","Model_B.pth"]:
    if os.path.exists(f):
        print(f, os.path.getsize(f)/1e6, "MB")

model_ts.pt 6.76655 MB
model_scripted.pt 0.721842 MB


In [5]:
# -------------------------
# Submit to server
# -------------------------
import requests
def submit_model(token: str, model_path: str, server_url="http://hadi.cs.virginia.edu:9000"):
    with open(model_path, "rb") as f:
        files = {"file": f}
        data = {"token": token}
        response = requests.post(f"{server_url}/submit", data=data, files=files)
        resp_json = response.json()
        if "message" in resp_json:
            print(f"✅ {resp_json['message']}")
        else:
            print(f"❌ Submission failed: {resp_json.get('error')}")


# Replace with your team token
my_token = "2d99218bc499d7c8b376acc1bb545884"
file_name = "model_ts.pt"
submit_model(my_token, file_name)

✅ Submission received for team 'Charles Oliveira'. Attempt #2.


In [6]:
# -------------------------
#  Check status
# -------------------------
import requests
import time

def check_submission_status(my_token, max_retries=3):
    url = f"http://hadi.cs.virginia.edu:9000/submission-status/{my_token}"

    for attempt in range(max_retries):
        response = requests.get(url)

        if response.status_code == 200:
            attempts = response.json()
            for a in attempts:
                model_size = f"{a['model_size']:.4f}" if isinstance(a['model_size'], (float, int)) else "None"

                print(f"Attempt {a['attempt']}: Model size={model_size}, "
                      f"Submitted at={a['submitted_at']}, Status={a['status']}")

            if attempts and attempts[-1]['status'].lower() == "broken file":
                print("⚠️ Your model on the server is broken!")
            return  # success, exit function

        elif response.status_code == 429:
            # Server says rate limit exceeded
            try:
                error_json = response.json()
                wait_seconds = int(error_json.get("error", "").split()[-2])
            except Exception:
                wait_seconds = 15  # default fallback
            print(f"⏳ Rate limited. Waiting {wait_seconds} seconds before retry...")
            time.sleep(wait_seconds + 1)  # wait a bit longer to be safe

        else:
            print(f"❌ Error {response.status_code}: {response.text}")
            return

    print("⚠️ Max retries reached. Try again later.")


# Example usage:
check_submission_status(my_token)



Attempt 1: Model size=6.4531, Submitted at=Nov 11, 2025 10:37:38 PM, Status=pending
Attempt 2: Model size=6.4531, Submitted at=Nov 11, 2025 11:54:25 PM, Status=pending
