In [1]:
# What this cell does:
# - Creates the folder structure exactly as in the playbook.
# - Writes requirements.txt and minimal README.md.
# - Creates empty/stub files for code modules we will fill later from the notebook.

import os, textwrap, json
from pathlib import Path

root = Path.cwd() / "semnav_terrain"
dirs = [
    root / "configs",
    root / "data" / "images",
    root / "data" / "labels",
    root / "data" / "depth",
    root / "data" / "splits",
    root / "data" / "eda_samples",
    root / "semnav",
    root / "semnav" / "models",
    root / "semnav" / "losses",
    root / "semnav" / "metrics",
    root / "scripts",
    root / "runs",
    root / "export",
    root / "reports" / "figs",
]

for d in dirs:
    d.mkdir(parents=True, exist_ok=True)

# __init__ for package
(root / "semnav" / "__init__.py").write_text("")

# requirements.txt (exact list from playbook)
req = """torch>=2.2
torchvision>=0.17
opencv-python
numpy
yaml
rich
tqdm
albumentations
matplotlib
onnx
onnxruntime-gpu
"""
(root / "requirements.txt").write_text(req)

# Minimal configs (we will overwrite/update later)
(root / "configs" / "deeplabv3_mbv3.yaml").write_text(textwrap.dedent("""\
model: deeplabv3_mbv3
num_classes: 7
input_size: [512,384]
optimizer: {name: AdamW, lr: 3e-4, weight_decay: 0.01}
loss: {ce_weighted: true, dice: 0.3, boundary: 0.2}
train: {epochs: 5, batch_size: 2, amp: false}   # CPU-friendly starter
val: {tta_flip: true}
"""))

(root / "configs" / "fusion_lite.yaml").write_text(textwrap.dedent("""\
model: fusion_lite
rgb_backbone: deeplabv3_mbv3
depth_backbone: resnet18_unetlite
fusion_scales: [8,4]
num_classes: 7
"""))

# Tiny README
(root / "README.md").write_text(textwrap.dedent("""\
# SemNav Terrain (Notebook Build)

This repo is generated by a Jupyter notebook to train a small semantic segmentation model for terrain awareness.
See the notebook for step-by-step instructions. CPU-only settings are used by default.
"""))

# Stubs for scripts; we'll fill functions in later cells or inline
(root / "scripts" / "make_splits.py").write_text("# will be populated from notebook\n")
(root / "scripts" / "viz_samples.py").write_text("# will be populated from notebook\n")

# Stub modules; we will write actual code in later cells
(root / "semnav" / "models" / "deeplabv3.py").write_text("# model will be written from notebook\n")
(root / "semnav" / "models" / "fusion_lite.py").write_text("# model will be written from notebook\n")
(root / "semnav" / "losses" / "dice.py").write_text("# dice loss from notebook\n")
(root / "semnav" / "losses" / "boundary.py").write_text("# boundary loss from notebook\n")
(root / "semnav" / "metrics" / "miou.py").write_text("# miou metric from notebook\n")
(root / "semnav" / "metrics" / "boundary_iou.py").write_text("# boundary iou from notebook\n")
(root / "semnav" / "metrics" / "recall2m.py").write_text("# recall@2m from notebook\n")

# Train / infer stubs (we will write content later)
(root / "train.py").write_text("# training loop will be written from notebook\n")
(root / "infer.py").write_text("# inference/export code from notebook\n")
(root / "webcam_demo.py").write_text("# webcam demo code from notebook\n")

print("Created project at:", root)
print("Folders:")
for d in dirs:
    print(" -", d.relative_to(root))
print("\nNext: install requirements.")

Created project at: /Users/jayamdaxeshkumarshah/semnav_terrain
Folders:
 - configs
 - data/images
 - data/labels
 - data/depth
 - data/splits
 - data/eda_samples
 - semnav
 - semnav/models
 - semnav/losses
 - semnav/metrics
 - scripts
 - runs
 - export
 - reports/figs

Next: install requirements.


In [2]:
# What this cell does:
# - Rewrites two lines in semnav_terrain/requirements.txt:
#     yaml            -> PyYAML
#     onnxruntime-gpu -> onnxruntime   (CPU build for macOS/arm64)
# - Then re-runs pip install.

from pathlib import Path
import sys, subprocess

req_path = Path("semnav_terrain/requirements.txt")
lines = [ln.strip() for ln in req_path.read_text().splitlines() if ln.strip()]

fixed = []
for ln in lines:
    if ln.lower() == "yaml":
        ln = "PyYAML"
    if ln.lower() == "onnxruntime-gpu":
        ln = "onnxruntime"
    fixed.append(ln)

req_path.write_text("\n".join(fixed) + "\n")

print("Updated requirements.txt to:\n")
print(req_path.read_text())

print("\nRe-installing packages from the updated requirements...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", str(req_path)])
print("\nDone. If any package still errors, copy the error text here and we'll adjust.")

Updated requirements.txt to:

torch>=2.2
torchvision>=0.17
opencv-python
numpy
PyYAML
rich
tqdm
albumentations
matplotlib
onnx
onnxruntime


Re-installing packages from the updated requirements...

Done. If any package still errors, copy the error text here and we'll adjust.


In [3]:
# What this cell does:
# - Opens your default webcam.
# - Displays a window with FPS text. Press ESC to quit.

import cv2, time

cap = cv2.VideoCapture(0)
if not cap.isOpened():
    raise RuntimeError("Webcam not found. Please check your camera device or permissions.")

last = time.time()
frames = 0

print("Showing webcam. Press ESC to exit.")
while True:
    ok, frame = cap.read()
    if not ok:
        break
    frames += 1
    now = time.time()
    if now - last >= 1.0:
        fps = frames / (now - last)
        last = now
        frames = 0
        cv2.putText(frame, f"FPS: {fps:.1f}", (12,28), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
    cv2.imshow("SemNav — Webcam (Smoke Test)", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()
print("Closed webcam.")

Showing webcam. Press ESC to exit.
Closed webcam.


In [4]:
# What this cell does:
# - Looks for dataset ZIP files in the current directory or ./zips/
# - Extracts each to semnav_terrain/data/raw/<dataset_name>/

import zipfile
from pathlib import Path

root = Path("semnav_terrain")
raw_dir = root / "data" / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)

search_dirs = [Path.cwd(), Path.cwd()/ "zips"]
zip_files = []
for s in search_dirs:
    if s.exists():
        zip_files += list(s.glob("*.zip"))

if not zip_files:
    print("No .zip files found. Put your dataset zips in the current folder or ./zips/ and re-run.")
else:
    for zf in zip_files:
        dst = raw_dir / zf.stem
        dst.mkdir(parents=True, exist_ok=True)
        print(f"Extracting {zf.name} -> {dst} ...")
        with zipfile.ZipFile(zf, 'r') as z:
            z.extractall(dst)
        print("Done.")

print("\nAll zips extracted (if any were found). Next: standardize structure.")

Extracting Curbs Dataset.zip -> semnav_terrain/data/raw/Curbs Dataset ...
Done.
Extracting Gardens Point GT.zip -> semnav_terrain/data/raw/Gardens Point GT ...
Done.
Extracting terrain awareness dataset.zip -> semnav_terrain/data/raw/terrain awareness dataset ...
Done.
Extracting Crosswalk Pixelwise Groundtruth.zip -> semnav_terrain/data/raw/Crosswalk Pixelwise Groundtruth ...
Done.

All zips extracted (if any were found). Next: standardize structure.


In [5]:
# What this cell does:
# - Crawls data/raw/** to find RGB, Depth, Label files using simple name patterns.
# - Builds aligned triplets by base name (without extension).
# - Copies (or symlinks) into data/images, data/depth, data/labels with the SAME base name.

import shutil, re
from pathlib import Path

root = Path("semnav_terrain")
raw_dir = root / "data" / "raw"
img_out = root / "data" / "images"
lbl_out = root / "data" / "labels"
dep_out = root / "data" / "depth"

# helper: find files
def list_files(p: Path):
    return [f for f in p.rglob("*") if f.is_file()]

def is_image(p: Path):
    return p.suffix.lower() in [".jpg", ".jpeg", ".png", ".bmp"]

def guess_kind(p: Path):
    name = p.name.lower()
    # depth: file names with 'depth' or 16-bit png (we'll just treat png with 'depth' in name)
    if ("depth" in name) or ("disp" in name) or ("distance" in name):
        return "depth" if p.suffix.lower() in [".png", ".npy"] else "unknown"
    # labels/masks
    if any(k in name for k in ["label", "mask", "gt", "annot", "seg"]):
        return "label" if p.suffix.lower() in [".png"] else "unknown"
    # otherwise likely RGB image
    if is_image(p):
        return "rgb"
    return "unknown"

def basekey(p: Path):
    # remove typical suffix hints like _label, _mask
    n = p.stem
    n = re.sub(r"(_label|_mask|_lbl|_seg|_gt)$", "", n)
    n = re.sub(r"(_depth|_disp|_distance)$", "", n)
    return n

files = []
for d in raw_dir.glob("*"):
    if d.is_dir():
        files += list_files(d)

rgb_map, dep_map, lbl_map = {}, {}, {}
for f in files:
    kind = guess_kind(f)
    if kind == "unknown":
        continue
    key = basekey(f)
    if kind == "rgb":
        rgb_map.setdefault(key, f)
    elif kind == "depth":
        dep_map.setdefault(key, f)
    elif kind == "label":
        lbl_map.setdefault(key, f)

triplets = []
for key in set(list(rgb_map.keys()) + list(dep_map.keys()) + list(lbl_map.keys())):
    rgb = rgb_map.get(key, None)
    dep = dep_map.get(key, None)
    lbl = lbl_map.get(key, None)
    # We require at least RGB + Label; depth optional.
    if rgb is not None and lbl is not None:
        triplets.append((key, rgb, dep, lbl))

print(f"Found total aligned (RGB+Label [+Depth optional]) triplets: {len(triplets)}")

# copy into unified layout
copied = 0
for key, rgb, dep, lbl in triplets:
    rgb_dst = img_out / f"{key}.png"
    lbl_dst = lbl_out / f"{key}.png"
    shutil.copy2(rgb, rgb_dst) if rgb.suffix.lower() != ".png" else shutil.copy2(rgb, rgb_dst)
    shutil.copy2(lbl, lbl_dst)
    if dep is not None:
        dep_dst = dep_out / f"{key}{dep.suffix.lower()}"
        shutil.copy2(dep, dep_dst)
    copied += 1

print(f"Copied/standardized {copied} samples into data/images, data/labels, data/depth (if available).")

# quick summary
num_images = len(list(img_out.glob("*.png")))
num_labels = len(list(lbl_out.glob("*.png")))
num_depth_png = len(list(dep_out.glob("*.png")))
num_depth_npy = len(list(dep_out.glob("*.npy")))
print(f"Summary => images: {num_images}, labels: {num_labels}, depth_png: {num_depth_png}, depth_npy: {num_depth_npy}")
print("If counts look wrong, adjust naming patterns in the code and re-run this cell.")

Found total aligned (RGB+Label [+Depth optional]) triplets: 0
Copied/standardized 0 samples into data/images, data/labels, data/depth (if available).
Summary => images: 328, labels: 328, depth_png: 0, depth_npy: 0
If counts look wrong, adjust naming patterns in the code and re-run this cell.


In [6]:
# ============================================
# DATASET DIAGNOSTIC — find RGB / LABEL / DEPTH candidates
# What this cell does:
#   1) Recursively scans semnav_terrain/data/raw/** for files.
#   2) Classifies files as RGB / LABEL / DEPTH / UNKNOWN using broader rules.
#   3) Prints per-dataset counts and shows sample paths it thinks match.
#   4) Helps you see why "0 triplets" happened and which names we must handle.
# ============================================

from pathlib import Path
import re

root = Path("semnav_terrain")
raw_dir = root / "data" / "raw"

if not raw_dir.exists():
    print("raw dir not found:", raw_dir.resolve())
    print("Make sure you extracted your zips into semnav_terrain/data/raw/ ...")
else:
    # ---- patterns we consider ----
    IMG_EXTS   = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
    LABEL_EXTS = {".png", ".bmp", ".tif", ".tiff"}  # (we expect masks as image files)
    DEPTH_EXTS = {".png", ".npy", ".tif", ".tiff", ".exr"}  # detect, we might not read .exr

    LABEL_WORDS = [
        "label", "labels", "mask", "masks", "gt", "groundtruth",
        "annotation", "annotations", "seg", "segmentation", "labelids", "classid", "classids"
    ]
    DEPTH_WORDS = [
        "depth", "depthmap", "depth_map", "dep", "disparity", "disp",
        "distance", "range", "depthimage", "z", "zed"
    ]
    RGB_HINT_WORDS = ["image", "images", "rgb", "color", "left", "img"]

    # helper: return lowercase tokens from path (name + parent names)
    def tokens_for(p: Path):
        toks = [p.name.lower()]
        par = p.parent
        for _ in range(3):  # look up to 3 levels of parent names
            toks.append(par.name.lower())
            par = par.parent
        return " ".join(toks)

    # classify using filename + parent folder hints
    def classify(p: Path):
        name = p.name.lower()
        ex = p.suffix.lower()
        toks = tokens_for(p)

        if ex in DEPTH_EXTS and any(w in toks for w in DEPTH_WORDS):
            return "depth"
        if (ex in LABEL_EXTS) and any(w in toks for w in LABEL_WORDS):
            return "label"
        if (ex in IMG_EXTS):
            # If filename/folders strongly say label/depth, they would've matched above.
            # Otherwise assume it's RGB.
            return "rgb"
        return "unknown"

    # gather per top-level dataset folder
    dataset_dirs = [d for d in raw_dir.glob("*") if d.is_dir()]
    if not dataset_dirs:
        print("No dataset subfolders under:", raw_dir.resolve())
    else:
        grand_rgb, grand_lbl, grand_dep, grand_unk = 0,0,0,0
        sample_rgb, sample_lbl, sample_dep, sample_unk = [], [], [], []

        for dset in dataset_dirs:
            rgb = []
            lbl = []
            dep = []
            unk = []
            for f in dset.rglob("*"):
                if not f.is_file():
                    continue
                c = classify(f)
                if c == "rgb":
                    rgb.append(f)
                    if len(sample_rgb)<20: sample_rgb.append(f)
                elif c == "label":
                    lbl.append(f)
                    if len(sample_lbl)<20: sample_lbl.append(f)
                elif c == "depth":
                    dep.append(f)
                    if len(sample_dep)<20: sample_dep.append(f)
                else:
                    # only track "unknown" that look like images to help debugging
                    if f.suffix.lower() in (IMG_EXTS | DEPTH_EXTS):
                        unk.append(f)
                        if len(sample_unk)<20: sample_unk.append(f)

            print("\n=== DATASET:", dset.name, "===")
            print("RGB candidates:   ", len(rgb))
            print("LABEL candidates: ", len(lbl))
            print("DEPTH candidates: ", len(dep))
            print("UNKNOWN image-like:", len(unk))

            grand_rgb += len(rgb)
            grand_lbl += len(lbl)
            grand_dep += len(dep)
            grand_unk += len(unk)

        print("\n=== GRAND TOTALS ACROSS ALL RAW DATASETS ===")
        print("RGB:", grand_rgb, "  LABEL:", grand_lbl, "  DEPTH:", grand_dep, "  UNKNOWN(image-like):", grand_unk)

        # show samples
        def show_samples(title, arr):
            print(f"\n-- {title} (up to 20) --")
            if not arr:
                print("(none)")
            else:
                for p in arr[:20]:
                    print(" ", p.relative_to(raw_dir))

        show_samples("SAMPLE RGB CANDIDATES", sample_rgb)
        show_samples("SAMPLE LABEL CANDIDATES", sample_lbl)
        show_samples("SAMPLE DEPTH CANDIDATES", sample_dep)
        show_samples("SAMPLE UNKNOWN (image-like, not matched)", sample_unk)

        print("\nTip: If many labels are in UNKNOWN, note their extensions/keywords and we will extend the matcher.")


=== DATASET: Gardens Point GT ===
RGB candidates:    0
LABEL candidates:  1200
DEPTH candidates:  0
UNKNOWN image-like: 0

=== DATASET: Curbs Dataset ===
RGB candidates:    100
LABEL candidates:  100
DEPTH candidates:  0
UNKNOWN image-like: 0

=== DATASET: terrain awareness dataset ===
RGB candidates:    120
LABEL candidates:  120
DEPTH candidates:  120
UNKNOWN image-like: 0

=== DATASET: Crosswalk Pixelwise Groundtruth ===
RGB candidates:    191
LABEL candidates:  191
DEPTH candidates:  0
UNKNOWN image-like: 0

=== GRAND TOTALS ACROSS ALL RAW DATASETS ===
RGB: 411   LABEL: 1611   DEPTH: 120   UNKNOWN(image-like): 0

-- SAMPLE RGB CANDIDATES (up to 20) --
  Curbs Dataset/Curbs Dataset/left/yuquan/13-37-52leftp/339.png
  Curbs Dataset/Curbs Dataset/left/yuquan/13-37-52leftp/338.png
  Curbs Dataset/Curbs Dataset/left/yuquan/13-37-52leftp/336.png
  Curbs Dataset/Curbs Dataset/left/yuquan/13-37-52leftp/337.png
  Curbs Dataset/Curbs Dataset/left/yuquan/13-37-52leftp/335.png
  Curbs Dataset

In [7]:
# ============================================
# PATCHED STANDARDIZER — broader matching + stem alignment
# What this cell does:
#   1) Uses broader rules (extensions + keywords + folder hints) to find RGB/LABEL/DEPTH.
#   2) Normalizes base names by stripping common suffixes (e.g., _label, -mask, _left, _rgb, _depth, _disp, etc.).
#   3) Builds aligned triplets by base name (RGB + LABEL required, DEPTH optional).
#   4) Copies/Converts to semnav_terrain/data/{images,labels,depth}/<base>.png (depth keeps original ext if .npy).
#   5) Prints counts and a few aligned examples.
# Notes:
#   - Labels that are 3-channel color masks will be copied as-is; mapping step later may need attention.
#   - If label is 3-channel with identical channels, we auto-reduce to single channel.
# ============================================

from pathlib import Path
import shutil
import re
import cv2
import numpy as np

root = Path("semnav_terrain")
raw_dir = root / "data" / "raw"
img_out = root / "data" / "images"
lbl_out = root / "data" / "labels"
dep_out = root / "data" / "depth"

img_out.mkdir(parents=True, exist_ok=True)
lbl_out.mkdir(parents=True, exist_ok=True)
dep_out.mkdir(parents=True, exist_ok=True)

# ---- patterns & helpers ----
IMG_EXTS   = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
LABEL_EXTS = {".png", ".bmp", ".tif", ".tiff"}
DEPTH_EXTS = {".png", ".npy", ".tif", ".tiff", ".exr"}

LABEL_WORDS = [
    "label", "labels", "mask", "masks", "gt", "groundtruth",
    "annotation", "annotations", "seg", "segmentation", "labelids", "classid", "classids"
]
DEPTH_WORDS = [
    "depth", "depthmap", "depth_map", "dep", "disparity", "disp",
    "distance", "range", "depthimage", "z", "zed"
]
RGB_HINT_WORDS = ["image", "images", "rgb", "color", "left", "img", "frames"]

# common suffixes to strip when forming the "base key"
SUFFIXES = [
    r"(_|-)label(ids)?$", r"(_|-)labels$", r"(_|-)mask$", r"(_|-)masks$",
    r"(_|-)gt$", r"(_|-)seg$", r"(_|-)segmentation$", r"(_|-)annotation(s)?$",
    r"(_|-)class(id|ids)$",
    r"(_|-)rgb$", r"(_|-)left$", r"(_|-)image$", r"(_|-)img$", r"(_|-)color$",
    r"(_|-)depth(map)?$", r"(_|-)disp(arit(y)?)?$", r"(_|-)distance$", r"(_|-)range$", r"(_|-)z$"
]
SUFFIX_REGEX = re.compile("(" + "|".join(SUFFIXES) + ")", re.IGNORECASE)

def tokens_for(p: Path):
    toks = [p.name.lower()]
    par = p.parent
    for _ in range(3):
        toks.append(par.name.lower())
        par = par.parent
    return " ".join(toks)

def classify(p: Path):
    ex = p.suffix.lower()
    toks = tokens_for(p)
    # depth first
    if (ex in DEPTH_EXTS) and any(w in toks for w in DEPTH_WORDS):
        return "depth"
    # labels next
    if (ex in LABEL_EXTS) and any(w in toks for w in LABEL_WORDS):
        return "label"
    # otherwise if looks like an image, call it rgb
    if ex in IMG_EXTS:
        return "rgb"
    return "unknown"

def make_basekey(p: Path):
    stem = p.stem
    stem = SUFFIX_REGEX.sub("", stem)  # strip defined suffixes
    stem = re.sub(r"[\s]+", "_", stem) # collapse spaces to underscore
    return stem

# scan all files
files = [f for f in raw_dir.rglob("*") if f.is_file()]
rgb_map, lbl_map, dep_map = {}, {}, {}

for f in files:
    c = classify(f)
    if c == "unknown":
        continue
    key = make_basekey(f)
    # prefer files located in conventional folders by overriding prior picks
    score = 0
    toks = tokens_for(f)
    if c == "rgb":
        if any(w in toks for w in ["image", "images", "rgb", "color", "left", "img", "frames"]):
            score += 1
        prev = rgb_map.get(key)
        if (prev is None) or ("score" in prev and score > prev["score"]):
            rgb_map[key] = {"path": f, "score": score}
    elif c == "label":
        if any(w in toks for w in ["label", "labels", "gt", "groundtruth", "seg", "segmentation", "annotation"]):
            score += 1
        prev = lbl_map.get(key)
        if (prev is None) or ("score" in prev and score > prev["score"]):
            lbl_map[key] = {"path": f, "score": score}
    elif c == "depth":
        if any(w in toks for w in ["depth", "disparity", "range", "distance"]):
            score += 1
        prev = dep_map.get(key)
        if (prev is None) or ("score" in prev and score > prev["score"]):
            dep_map[key] = {"path": f, "score": score}

# build triplets
keys_all = sorted(set(rgb_map.keys()) | set(lbl_map.keys()) | set(dep_map.keys()))
triplets = []
for k in keys_all:
    rgb = rgb_map.get(k, {}).get("path")
    lbl = lbl_map.get(k, {}).get("path")
    dep = dep_map.get(k, {}).get("path")
    if (rgb is not None) and (lbl is not None):
        triplets.append((k, rgb, dep, lbl))

print(f"Aligned (RGB+Label [+Depth opt]) triplets found: {len(triplets)}")

# copy/convert helpers
def save_rgb_to_png(src: Path, dst: Path):
    img = cv2.imread(str(src), cv2.IMREAD_COLOR)
    if img is None:
        return False
    cv2.imwrite(str(dst), img)
    return True

def save_label_to_png(src: Path, dst: Path):
    m = cv2.imread(str(src), cv2.IMREAD_UNCHANGED)
    if m is None:
        return False
    # if 3-channel and all channels equal, reduce to single-channel
    if m.ndim == 3 and m.shape[2] == 3:
        if np.allclose(m[:,:,0], m[:,:,1]) and np.allclose(m[:,:,1], m[:,:,2]):
            m = m[:,:,0]
        # else: keep 3ch; mapping step may need manual handling
    cv2.imwrite(str(dst), m)
    return True

def copy_depth(src: Path, dst_png: Path, dst_npy: Path):
    # keep .npy as .npy; others to .png
    if src.suffix.lower() == ".npy":
        shutil.copy2(src, dst_npy)
        return True
    else:
        d = cv2.imread(str(src), cv2.IMREAD_UNCHANGED)
        if d is None:
            # if OpenCV can't read (e.g., EXR not supported), just copy the raw file with its ext
            shutil.copy2(src, dst_png.with_suffix(src.suffix.lower()))
            return True
        cv2.imwrite(str(dst_png), d)
        return True

# perform copy/convert
copied = 0
examples = []
for key, rgb, dep, lbl in triplets:
    rgb_dst = img_out / f"{key}.png"
    lbl_dst = lbl_out / f"{key}.png"
    ok1 = save_rgb_to_png(rgb, rgb_dst)
    ok2 = save_label_to_png(lbl, lbl_dst)
    ok3 = True
    if dep is not None:
        dep_png = dep_out / f"{key}.png"
        dep_npy = dep_out / f"{key}.npy"
        ok3 = copy_depth(dep, dep_png, dep_npy)
    if ok1 and ok2 and ok3:
        copied += 1
        if len(examples) < 8:
            examples.append((key, rgb.name, (dep.name if dep is not None else "None"), lbl.name))

print(f"Copied/converted {copied} aligned samples into:")
print("  images ->", img_out.resolve())
print("  labels ->", lbl_out.resolve())
print("  depth  ->", dep_out.resolve())

print("\nExamples (up to 8):")
for e in examples:
    print("  KEY:", e[0], " | RGB:", e[1], " | DEPTH:", e[2], " | LABEL:", e[3])

# final summary
num_images = len(list(img_out.glob("*.png")))
num_labels = len(list(lbl_out.glob("*.png")))
num_depth_png = len(list(dep_out.glob("*.png")))
num_depth_npy = len(list(dep_out.glob("*.npy")))
print("\nSUMMARY AFTER PATCHED STANDARDIZATION")
print(f"images: {num_images}, labels: {num_labels}, depth_png: {num_depth_png}, depth_npy: {num_depth_npy}")
print("If counts are still low, re-run the DIAGNOSTIC cell and tell me which filenames need extra handling.")

Aligned (RGB+Label [+Depth opt]) triplets found: 328




Copied/converted 328 aligned samples into:
  images -> /Users/jayamdaxeshkumarshah/semnav_terrain/data/images
  labels -> /Users/jayamdaxeshkumarshah/semnav_terrain/data/labels
  depth  -> /Users/jayamdaxeshkumarshah/semnav_terrain/data/depth

Examples (up to 8):
  KEY: 0045  | RGB: 0045-color.png  | DEPTH: None  | LABEL: 0045-color.png
  KEY: 0046  | RGB: 0046-color.png  | DEPTH: None  | LABEL: 0046-color.png
  KEY: 0047  | RGB: 0047-color.png  | DEPTH: None  | LABEL: 0047-color.png
  KEY: 0048  | RGB: 0048-color.png  | DEPTH: None  | LABEL: 0048-color.png
  KEY: 0049  | RGB: 0049-color.png  | DEPTH: None  | LABEL: 0049-color.png
  KEY: 0050  | RGB: 0050-color.png  | DEPTH: None  | LABEL: 0050-color.png
  KEY: 0051  | RGB: 0051-color.png  | DEPTH: None  | LABEL: 0051-color.png
  KEY: 0286  | RGB: 0286-color.png  | DEPTH: None  | LABEL: 0286-color.png

SUMMARY AFTER PATCHED STANDARDIZATION
images: 328, labels: 328, depth_png: 0, depth_npy: 0
If counts are still low, re-run the DIAGNOST



In [8]:
# What this cell does:
# - Scans all label masks to collect unique pixel values.
# - Writes a mapping template JSON: data/label_mapping.json
# - You can edit that JSON to map source IDs to our target {0..6}, others to 255 (ignore).

import numpy as np
import cv2, json
from pathlib import Path

root = Path("semnav_terrain")
lbl_dir = root / "data" / "labels"
mapping_path = root / "data" / "label_mapping.json"

unique_ids = set()
lbl_files = list(lbl_dir.glob("*.png"))
for i, f in enumerate(lbl_files):
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    if m is None:
        continue
    unique_ids.update(np.unique(m).tolist())

unique_ids = sorted(list(unique_ids))
print("Unique label IDs found across datasets:", unique_ids)

# Build default mapping: unknown -> 255, and try a simple guess (often 0=background/ground)
default_map = {str(int(k)): 255 for k in unique_ids}
# Give a naive guess if some standard IDs appear; you will likely need to edit
# target IDs: ground=0, sidewalk=1, stairs=2, water=3, person=4, car=5, sky=6
# We'll leave all as 255 except try to map '0' to ground if exists:
if "0" in default_map:
    default_map["0"] = 0

mapping = {
    "target_names": ["ground","sidewalk","stairs","water","person","car","sky"],
    "ignore_index": 255,
    "source_to_target": default_map
}
with open(mapping_path, "w") as f:
    json.dump(mapping, f, indent=2)

print(f"Template saved to: {mapping_path}")
print(">> IMPORTANT: Open data/label_mapping.json and edit 'source_to_target' so IDs map correctly.")

Unique label IDs found across datasets: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213,

In [9]:
# What this cell does:
# - Scans semnav_terrain/data/images and semnav_terrain/data/labels by key.
# - Tries to read each PNG with cv2.imread; if it fails, tries cv2.imdecode() from raw bytes and re-saves.
# - If still unreadable, removes that key from images/labels/depth so later steps won't crash.
# - Prints how many files were repaired and how many samples were removed.

from pathlib import Path
import numpy as np
import cv2
import os

root = Path("semnav_terrain")
img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"
dep_dir = root / "data" / "depth"

def try_read(path):
    arr = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
    if arr is not None:
        return arr, "imread"
    # try imdecode from bytes (sometimes helps)
    try:
        raw = np.fromfile(str(path), dtype=np.uint8)
        if raw.size > 0:
            arr2 = cv2.imdecode(raw, cv2.IMREAD_UNCHANGED)
            if arr2 is not None:
                return arr2, "imdecode"
    except Exception:
        pass
    return None, None

def reencode_png(path, arr):
    try:
        ok = cv2.imwrite(str(path), arr)
        return bool(ok)
    except Exception:
        return False

def remove_key(key):
    removed = []
    for folder, ext in [(img_dir,".png"), (lbl_dir,".png"),
                        (dep_dir,".png"), (dep_dir,".npy")]:
        p = folder / f"{key}{ext}"
        if p.exists():
            try:
                p.unlink()
                removed.append(p)
            except Exception:
                pass
    return removed

# collect keys by intersection of filenames we expect
img_keys = {p.stem for p in img_dir.glob("*.png")}
lbl_keys = {p.stem for p in lbl_dir.glob("*.png")}
all_keys = sorted(img_keys | lbl_keys)

repaired_imgs = 0
repaired_lbls = 0
removed_samples = 0
bad_keys = []

for key in all_keys:
    ip = img_dir / f"{key}.png"
    lp = lbl_dir / f"{key}.png"

    # check image
    if ip.exists():
        img, how = try_read(ip)
        if img is None:
            # cannot read, try to drop this key
            bad_keys.append(key)
        elif how == "imdecode":
            if reencode_png(ip, img):
                repaired_imgs += 1

    # check label
    if lp.exists():
        lbl, how = try_read(lp)
        if lbl is None:
            bad_keys.append(key)
        elif how == "imdecode":
            if reencode_png(lp, lbl):
                repaired_lbls += 1

# finalize: remove keys that are still unreadable
bad_keys = sorted(set(bad_keys))
for key in bad_keys:
    removed = remove_key(key)
    if removed:
        removed_samples += 1

print(f"Repaired images: {repaired_imgs}")
print(f"Repaired labels: {repaired_lbls}")
print(f"Removed unreadable samples: {removed_samples}")
print("Remaining counts:",
      "images", len(list(img_dir.glob('*.png'))),
      "labels", len(list(lbl_dir.glob('*.png'))),
      "depth_png", len(list(dep_dir.glob('*.png'))),
      "depth_npy", len(list(dep_dir.glob('*.npy'))))

Repaired images: 0
Repaired labels: 0
Removed unreadable samples: 0
Remaining counts: images 328 labels 328 depth_png 0 depth_npy 0


In [10]:
# SAFE replacement for your original "Cell 1D — Apply mapping + 5-fold splits"
# - Skips any unreadable labels (None) so we never call .any() on a bool.
# - If a label is 3-channel with identical channels, squeezes to 1 channel for presence computation.
# - Writes folds only for valid samples.

import json, random
from pathlib import Path
import numpy as np
import cv2

root = Path("semnav_terrain")
img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"
dep_dir = root / "data" / "depth"
splits_dir = root / "data" / "splits"
splits_dir.mkdir(parents=True, exist_ok=True)

mapping_path = root / "data" / "label_mapping.json"
if not mapping_path.exists():
    raise RuntimeError("label_mapping.json not found. Run the label mapping cell first (or the fix cell that creates it).")

with open(mapping_path, "r") as f:
    mapping = json.load(f)
ignore_index = int(mapping.get("ignore_index",255))
source_to_target = {int(k): int(v) for k,v in mapping["source_to_target"].items()}
num_classes = 7

# 1) Apply mapping to all labels (in place), skipping unreadables
lbl_files = sorted(lbl_dir.glob("*.png"))
mapped_ok = 0
skipped = 0
for f in lbl_files:
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    if m is None:
        skipped += 1
        continue
    if m.ndim == 3:
        # if 3-channel but identical channels, squeeze to 1
        if np.allclose(m[:,:,0], m[:,:,1]) and np.allclose(m[:,:,1], m[:,:,2]):
            m = m[:,:,0]
        else:
            # leave as is; mapping below will operate on the first channel to stay safe here
            m = m[:,:,0]
    flat = m.reshape(-1)
    out = np.full(flat.shape, ignore_index, dtype=np.uint8)
    for s_id, t_id in source_to_target.items():
        out[flat == s_id] = t_id
    out = out.reshape(m.shape)
    cv2.imwrite(str(f), out)
    mapped_ok += 1

print(f"Applied mapping to {mapped_ok} labels. Skipped unreadables: {skipped}")

# 2) Build sample list & class presence (skip unreadables)
img_files = sorted(img_dir.glob("*.png"))
samples = []
bad = 0
for f in img_files:
    key = f.stem
    lbl = lbl_dir / f"{key}.png"
    if not lbl.exists():
        continue
    m = cv2.imread(str(lbl), cv2.IMREAD_UNCHANGED)
    if m is None:
        bad += 1
        continue
    if m.ndim == 3:
        # safety squeeze first channel
        m = m[:,:,0]
    present = np.zeros(num_classes, dtype=np.int32)
    for c in range(num_classes):
        present[c] = int(np.any(m == c))
    has_depth = (dep_dir / f"{key}.png").exists() or (dep_dir / f"{key}.npy").exists()
    samples.append({"key": key, "present": present, "has_depth": has_depth})

print(f"Total valid aligned samples (RGB+Label) used for splits: {len(samples)}  (skipped invalid: {bad})")

# 3) 5-fold approximate stratification
K = 5
random.seed(42)
random.shuffle(samples)
fold_bins = [[] for _ in range(K)]
fold_counts = np.zeros((K, num_classes), dtype=np.int32)

for s in samples:
    pres = s["present"]
    scores = []
    for k in range(K):
        scores.append(int((fold_counts[k] * pres).sum()))
    k_best = int(np.argmin(scores))
    fold_bins[k_best].append(s)
    fold_counts[k_best] += pres

def split_fold(bin_list, seed=42):
    random.Random(seed).shuffle(bin_list)
    n = len(bin_list)
    n_train = int(0.7*n)
    n_val = int(0.15*n)
    train = bin_list[:n_train]
    val = bin_list[n_train:n_train+n_val]
    test = bin_list[n_train+n_val:]
    return train, val, test

for k in range(K):
    train, val, test = split_fold(fold_bins[k], seed=100+k)
    for name, lst in [("train", train), ("val", val), ("test", test)]:
        outp = splits_dir / f"fold{k}_{name}.txt"
        with open(outp, "w") as f:
            for s in lst:
                f.write(s["key"] + "\n")
    print(f"Fold {k}: train={len(train)} val={len(val)} test={len(test)}")

print("5-fold splits saved in data/splits/.")

Applied mapping to 328 labels. Skipped unreadables: 0
Total valid aligned samples (RGB+Label) used for splits: 328  (skipped invalid: 0)
Fold 0: train=67 val=14 test=15
Fold 1: train=40 val=8 test=10
Fold 2: train=40 val=8 test=10
Fold 3: train=40 val=8 test=10
Fold 4: train=40 val=8 test=10
5-fold splits saved in data/splits/.


In [11]:
# What this cell does:
# - Computes rough depth stats on a subset (clip [0.5m,10m], simple median blur).
# - Saves 12 random RGB/Depth/Mask triplets for sanity check in data/eda_samples/.

import random
from pathlib import Path
import numpy as np
import cv2, matplotlib.pyplot as plt

root = Path("semnav_terrain")
img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"
dep_dir = root / "data" / "depth"
eda_dir = root / "data" / "eda_samples"
eda_dir.mkdir(parents=True, exist_ok=True)

def load_depth_any(png_or_npy: Path):
    if png_or_npy.suffix.lower() == ".npy":
        d = np.load(str(png_or_npy))
    else:
        # assume 16-bit depth if PNG; some datasets store mm
        d = cv2.imread(str(png_or_npy), cv2.IMREAD_UNCHANGED)
    return d

# collect keys with depth
keys = []
for img in img_dir.glob("*.png"):
    key = img.stem
    lbl = lbl_dir / f"{key}.png"
    if not lbl.exists():
        continue
    d1 = dep_dir / f"{key}.png"
    d2 = dep_dir / f"{key}.npy"
    if d1.exists() or d2.exists():
        keys.append(key)

depth_stats = []
for key in keys[:min(40, len(keys))]:
    dpath = dep_dir / f"{key}.png"
    if not dpath.exists():
        dpath = dep_dir / f"{key}.npy"
    d = load_depth_any(dpath)
    if d is None:
        continue
    d = d.astype(np.float32)
    # heuristic: if values look large (e.g., mm), convert to meters
    if d.max() > 50.0:
        d_m = d / 1000.0
    else:
        d_m = d
    # clip and median blur (convert to 16-bit for medianBlur if needed)
    d_m = np.clip(d_m, 0.5, 10.0)
    d_show = (d_m - 0.5) / (10.0 - 0.5 + 1e-6)
    d_u16 = (d_show * 65535.0).astype(np.uint16)
    d_u16 = cv2.medianBlur(d_u16, 5)
    d_m2 = d_u16.astype(np.float32) / 65535.0 * (10.0 - 0.5) + 0.5
    depth_stats.append([float(d_m2.min()), float(np.median(d_m2)), float(d_m2.max())])

if depth_stats:
    mins, meds, maxs = zip(*depth_stats)
    print(f"Depth stats (sampled): min={np.mean(mins):.2f}m, median={np.mean(meds):.2f}m, max={np.mean(maxs):.2f}m")
else:
    print("No depth files found to compute stats.")

# Save 12 triplets
all_keys = [p.stem for p in img_dir.glob("*.png")]
random.seed(77)
sel = random.sample(all_keys, min(12, len(all_keys)))

for key in sel:
    rgb = cv2.cvtColor(cv2.imread(str(img_dir / f"{key}.png")), cv2.COLOR_BGR2RGB)
    lbl = cv2.imread(str(lbl_dir / f"{key}.png"), cv2.IMREAD_UNCHANGED)
    dpath = dep_dir / f"{key}.png"
    if not dpath.exists(): dpath = dep_dir / f"{key}.npy"
    if dpath.exists():
        d = load_depth_any(dpath)
        if d is not None:
            d = d.astype(np.float32)
            if d.max() > 50.0:
                d = d / 1000.0
            d = np.clip(d, 0.5, 10.0)
            d_show = (d - 0.5) / (10.0 - 0.5 + 1e-6)
        else:
            d_show = None
    else:
        d_show = None

    fig, ax = plt.subplots(1, 3, figsize=(12,4))
    ax[0].imshow(rgb); ax[0].set_title("RGB"); ax[0].axis('off')
    ax[1].imshow(lbl, vmin=0, vmax=6); ax[1].set_title("Label"); ax[1].axis('off')
    if d_show is not None:
        ax[2].imshow(d_show, vmin=0, vmax=1); ax[2].set_title("Depth (~norm)"); ax[2].axis('off')
    else:
        ax[2].imshow(np.zeros_like(lbl)); ax[2].set_title("Depth: N/A"); ax[2].axis('off')
    outp = eda_dir / f"{key}_triplet.png"
    plt.tight_layout()
    plt.savefig(outp)
    plt.close(fig)

print(f"Saved {len(sel)} triplet previews to: {eda_dir}")

No depth files found to compute stats.
Saved 12 triplet previews to: semnav_terrain/data/eda_samples


In [12]:
# What this cell does:
# - Sets up Albumentations transforms.
# - Train: resize to 512x384 (keep aspect with pad), flip, color jitter (RGB only), small rotation, scale.
# - Val/Test: resize only.

import albumentations as A

INPUT_W, INPUT_H = 512, 384

# Geometric transforms shared by RGB/Depth/Label
geom_train = A.Compose([
    A.LongestMaxSize(max_size=max(INPUT_W, INPUT_H)),
    A.PadIfNeeded(min_height=INPUT_H, min_width=INPUT_W, border_mode=0, value=0, mask_value=255),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.01, scale_limit=0.5, rotate_limit=5, border_mode=0, value=0, mask_value=255, p=0.7),
], additional_targets={'depth': 'image', 'mask': 'mask'})

geom_val = A.Compose([
    A.LongestMaxSize(max_size=max(INPUT_W, INPUT_H)),
    A.PadIfNeeded(min_height=INPUT_H, min_width=INPUT_W, border_mode=0, value=0, mask_value=255),
], additional_targets={'depth': 'image', 'mask': 'mask'})

# Color transforms for RGB only
color_train_rgb = A.Compose([
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05, p=0.8)
])

print("Transforms ready: train (geom+rgb color), val/test (geom only).")

Transforms ready: train (geom+rgb color), val/test (geom only).


  A.PadIfNeeded(min_height=INPUT_H, min_width=INPUT_W, border_mode=0, value=0, mask_value=255),
  original_init(self, **validated_kwargs)
  A.ShiftScaleRotate(shift_limit=0.01, scale_limit=0.5, rotate_limit=5, border_mode=0, value=0, mask_value=255, p=0.7),
  A.PadIfNeeded(min_height=INPUT_H, min_width=INPUT_W, border_mode=0, value=0, mask_value=255),


In [13]:
# What this cell does:
# - Ensures every label matches its image HxW (resizes label with nearest).
# - Converts 3-channel masks to single-channel.
# - Clamps label values to {0..6, 255} (others -> 255 ignore).
# - (Optional) If you later add depth, it will resize depth to match RGB too.
# - Prints a summary of fixes and drops any sample that remains unreadable.

from pathlib import Path
import numpy as np
import cv2

root = Path("semnav_terrain")
img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"
dep_dir = root / "data" / "depth"

valid_ids = set([0,1,2,3,4,5,6,255])

def read_any(path):
    arr = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
    if arr is not None:
        return arr
    # try raw decode fallback
    try:
        raw = np.fromfile(str(path), dtype=np.uint8)
        if raw.size > 0:
            arr2 = cv2.imdecode(raw, cv2.IMREAD_UNCHANGED)
            return arr2
    except Exception:
        pass
    return None

def drop_key(key):
    removed = []
    for folder, exts in [(img_dir, [".png"]),
                         (lbl_dir, [".png"]),
                         (dep_dir, [".png",".npy",".tif",".tiff",".exr"])]:
        for ext in exts:
            p = folder / f"{key}{ext}"
            if p.exists():
                try:
                    p.unlink()
                    removed.append(p)
                except Exception:
                    pass
    return removed

fixed_size = 0
squeezed = 0
clamped = 0
fixed_depth = 0
dropped = 0

# iterate over keys present in images or labels
keys = sorted({p.stem for p in img_dir.glob("*.png")} | {p.stem for p in lbl_dir.glob("*.png")})

for key in keys:
    ip = img_dir / f"{key}.png"
    lp = lbl_dir / f"{key}.png"
    if not (ip.exists() and lp.exists()):
        # must have both rgb and label
        _ = drop_key(key)
        dropped += 1
        continue

    img = read_any(ip)
    lbl = read_any(lp)
    if img is None or lbl is None:
        _ = drop_key(key)
        dropped += 1
        continue

    # Ensure label single-channel
    if lbl.ndim == 3:
        # if gray replicated into 3 channels, squeeze; else take first channel
        if np.allclose(lbl[:,:,0], lbl[:,:,1]) and np.allclose(lbl[:,:,1], lbl[:,:,2]):
            lbl = lbl[:,:,0]
        else:
            lbl = lbl[:,:,0]
        squeezed += 1

    # Resize label to image HxW if mismatch
    H, W = img.shape[:2]
    if lbl.shape[:2] != (H, W):
        lbl = cv2.resize(lbl, (W, H), interpolation=cv2.INTER_NEAREST)
        fixed_size += 1

    # Ensure uint8
    if lbl.dtype != np.uint8:
        lbl = lbl.astype(np.uint8)

    # Clamp label IDs to {0..6,255}
    uniq = np.unique(lbl)
    if any(int(v) not in valid_ids for v in uniq):
        bad_mask = ~np.isin(lbl, np.array(list(valid_ids), dtype=np.uint8))
        lbl[bad_mask] = 255
        clamped += 1

    # write back
    cv2.imwrite(str(lp), lbl)

    # (Optional) if depth exists, resize to image HxW too
    dp = None
    for ext in [".png", ".tif", ".tiff", ".exr"]:
        if (dep_dir / f"{key}{ext}").exists():
            dp = dep_dir / f"{key}{ext}"
            break
    if dp is None and (dep_dir / f"{key}.npy").exists():
        dp = dep_dir / f"{key}.npy"

    if dp is not None:
        if dp.suffix.lower() == ".npy":
            d = np.load(str(dp))
            if d.shape[:2] != (H, W):
                d = cv2.resize(d.astype(np.float32), (W, H), interpolation=cv2.INTER_NEAREST)
                np.save(str(dp), d)
                fixed_depth += 1
        else:
            d = read_any(dp)
            if d is not None and d.shape[:2] != (H, W):
                d = cv2.resize(d, (W, H), interpolation=cv2.INTER_NEAREST)
                cv2.imwrite(str(dp), d)
                fixed_depth += 1

print("Sanitizer summary:")
print(" - labels squeezed to 1ch:", squeezed)
print(" - labels resized to match RGB:", fixed_size)
print(" - labels clamped to {0..6,255}:", clamped)
print(" - depth resized:", fixed_depth)
print(" - samples dropped:", dropped)

# Show current counts
print("Counts now ->",
      "images:", len(list(img_dir.glob('*.png'))),
      "labels:", len(list(lbl_dir.glob('*.png'))),
      "depth_png:", len(list(dep_dir.glob('*.png'))),
      "depth_npy:", len(list(dep_dir.glob('*.npy'))))

Sanitizer summary:
 - labels squeezed to 1ch: 0
 - labels resized to match RGB: 2
 - labels clamped to {0..6,255}: 0
 - depth resized: 0
 - samples dropped: 0
Counts now -> images: 328 labels: 328 depth_png: 0 depth_npy: 0


In [14]:
# Cell 2B — Dataloaders (with drop_last=True to avoid 1-sample BN issues)

import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np, cv2, os, yaml
from pathlib import Path

root = Path("semnav_terrain")
img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"
dep_dir = root / "data" / "depth"
splits_dir = root / "data" / "splits"

# Load config (for num_classes, batch_size, etc.)
cfg_path = root / "configs" / "deeplabv3_mbv3.yaml"
with open(cfg_path, "r") as f:
    cfg = yaml.safe_load(f)

num_classes = int(cfg.get("num_classes", 7))
bs_cfg = int(cfg.get("train", {}).get("batch_size", 2))
# Ensure train batch size >= 2 to avoid BatchNorm error
BATCH_SIZE = max(2, bs_cfg)

# Albumentations transforms must be defined in Cell 2A:
# geom_train, geom_val, color_train_rgb

# Palette (BGR in OpenCV order for overlay later)
PALETTE = np.array([
  [128,64,128],  # ground
  [244,35,232],  # sidewalk
  [70,70,70],    # stairs
  [0,0,142],     # water
  [220,20,60],   # person
  [0,0,230],     # car
  [70,130,180],  # sky
], dtype=np.uint8)

def to_tensor_img(img):  # HWC BGR -> CHW RGB norm
    rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)/255.0
    mean = np.array([0.485,0.456,0.406], dtype=np.float32)
    std  = np.array([0.229,0.224,0.225], dtype=np.float32)
    rgb = (rgb - mean)/std
    chw = np.transpose(rgb, (2,0,1))
    return torch.from_numpy(chw)

def to_tensor_depth(d):
    d = d.astype(np.float32)
    if d.max() > 50.0:  # likely mm
        d = d / 1000.0
    d = np.clip(d, 0.5, 10.0)
    d = (d - 0.5) / (10.0 - 0.5 + 1e-6)  # 0..1
    if d.ndim == 2:
        d = d[None, ...]
    elif d.ndim == 3:
        d = d.transpose(2,0,1)[0:1]
    return torch.from_numpy(d)

class SemNavDataset(Dataset):
    def __init__(self, keys, split="train"):
        self.keys = keys
        self.split = split

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        key = self.keys[idx]
        img = cv2.imread(str(img_dir / f"{key}.png"))
        mask = cv2.imread(str(lbl_dir / f"{key}.png"), cv2.IMREAD_UNCHANGED)

        # Ensure label single-channel (safety)
        if mask is None:
            raise RuntimeError(f"Label missing or unreadable for key: {key}")
        if mask.ndim == 3:
            if np.allclose(mask[:,:,0], mask[:,:,1]) and np.allclose(mask[:,:,1], mask[:,:,2]):
                mask = mask[:,:,0]
            else:
                mask = mask[:,:,0]
        mask = mask.astype(np.int64)

        # Read depth if present
        if (dep_dir / f"{key}.png").exists():
            depth_path = dep_dir / f"{key}.png"
            depth = cv2.imread(str(depth_path), cv2.IMREAD_UNCHANGED)
        elif (dep_dir / f"{key}.npy").exists():
            depth_path = dep_dir / f"{key}.npy"
            depth = np.load(str(depth_path))
        else:
            depth = None

        # Albumentations pipelines defined in Cell 2A
        if self.split == "train":
            if depth is None:
                out = geom_train(image=img, mask=mask)
                img_aug, mask_aug = out['image'], out['mask']
                img_aug = color_train_rgb(image=img_aug)['image']
            else:
                out = geom_train(image=img, mask=mask, depth=depth)
                img_aug, mask_aug, depth = out['image'], out['mask'], out['depth']
                img_aug = color_train_rgb(image=img_aug)['image']
        else:
            if depth is None:
                out = geom_val(image=img, mask=mask)
                img_aug, mask_aug = out['image'], out['mask']
            else:
                out = geom_val(image=img, mask=mask, depth=depth)
                img_aug, mask_aug, depth = out['image'], out['mask'], out['depth']

        x_rgb = to_tensor_img(img_aug)
        y = torch.from_numpy(mask_aug.astype(np.int64))
        if depth is None:
            x_depth = torch.zeros((1, y.shape[0], y.shape[1]), dtype=torch.float32)
        else:
            x_depth = to_tensor_depth(depth)

        return x_rgb, x_depth, y, key

# Load fold 0 splits (adjust k if needed)
k = 0
with open(splits_dir / f"fold{k}_train.txt") as f:
    train_keys = [l.strip() for l in f if l.strip()]
with open(splits_dir / f"fold{k}_val.txt") as f:
    val_keys = [l.strip() for l in f if l.strip()]
with open(splits_dir / f"fold{k}_test.txt") as f:
    test_keys = [l.strip() for l in f if l.strip()]

train_ds = SemNavDataset(train_keys, split="train")
val_ds   = SemNavDataset(val_keys,   split="val")
test_ds  = SemNavDataset(test_keys,  split="test")

# Build DataLoaders
train_loader = DataLoader(
    train_ds,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,
    drop_last=True,     # <<< important: prevents 1-sample batch (BN error in ASPP)
)
val_loader = DataLoader(
    val_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0
)
test_loader = DataLoader(
    test_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0
)

# Quick batch check
xb, db, yb, kb = next(iter(train_loader))
print("Batch shapes => x_rgb:", tuple(xb.shape), "x_depth:", tuple(db.shape), "y:", tuple(yb.shape))
print("First keys:", list(kb)[:min(2, len(kb))])
print("Train batch size (drop_last=True):", xb.size(0))

Batch shapes => x_rgb: (2, 3, 384, 512) x_depth: (2, 1, 384, 512) y: (2, 384, 512)
First keys: ['300', '157']
Train batch size (drop_last=True): 2


In [15]:
# What this cell does:
# - Rewrites configs/deeplabv3_mbv3.yaml with the playbook values (CPU-friendly epochs/batch).
from pathlib import Path
import textwrap

cfg_text = textwrap.dedent("""\
model: deeplabv3_mbv3
num_classes: 7
input_size: [512,384]
optimizer: {name: AdamW, lr: 3e-4, weight_decay: 0.01}
loss: {ce_weighted: true, dice: 0.3, boundary: 0.2}
train: {epochs: 5, batch_size: 2, amp: false}
val: {tta_flip: true}
""")
p = Path("semnav_terrain/configs/deeplabv3_mbv3.yaml")
p.write_text(cfg_text)
print("Saved config to:", p.resolve())

Saved config to: /Users/jayamdaxeshkumarshah/semnav_terrain/configs/deeplabv3_mbv3.yaml


In [16]:
# What this cell does:
# - Implements a DeepLabV3(+ approx) with MobileNetV3-Large backbone.
# - Implements Dice loss and a simple Boundary-aware CE loss.
# - Saves code into semnav/ files so train.py can import them.

from pathlib import Path
import textwrap

model_code = textwrap.dedent(r"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models.segmentation import deeplabv3_mobilenet_v3_large

class DeeplabV3_MBV3(nn.Module):
    def __init__(self, num_classes=7, pretrained=True):
        super().__init__()
        self.net = deeplabv3_mobilenet_v3_large(weights='DEFAULT' if pretrained else None)
        # replace classifier to our num_classes
        in_ch = self.net.classifier[-1].in_channels
        self.net.classifier[-1] = nn.Conv2d(in_ch, num_classes, kernel_size=1)

    def forward(self, x):
        out = self.net(x)  # dict with 'out'
        return out['out']
""")

dice_code = textwrap.dedent(r"""
import torch
import torch.nn as nn
import torch.nn.functional as F

class DiceLoss(nn.Module):
    def __init__(self, smooth=1.0, ignore_index=255):
        super().__init__()
        self.smooth = smooth
        self.ignore_index = ignore_index

    def forward(self, logits, target, num_classes=7):
        # logits: [B,C,H,W], target: [B,H,W]
        probs = torch.softmax(logits, dim=1)
        masks = []
        valid = (target != self.ignore_index)
        for c in range(num_classes):
            gt_c = (target == c) & valid
            pred_c = probs[:, c, :, :]
            gt_c = gt_c.float()
            inter = (pred_c * gt_c).sum(dim=(1,2))
            union = pred_c.sum(dim=(1,2)) + gt_c.sum(dim=(1,2))
            dice = (2*inter + self.smooth) / (union + self.smooth)
            masks.append(1 - dice)
        loss = torch.stack(masks, dim=1).mean()
        return loss
""")

boundary_code = textwrap.dedent(r"""
import torch
import torch.nn as nn
import torch.nn.functional as F

# Simple boundary-aware weighting: upweight pixels whose 3x3 neighborhood contains label changes
class BoundaryLoss(nn.Module):
    def __init__(self, ignore_index=255, weight=1.0):
        super().__init__()
        self.ignore_index = ignore_index
        self.base_ce = nn.CrossEntropyLoss(ignore_index=ignore_index, reduction='none')
        self.weight = weight

    def forward(self, logits, target):
        # logits [B,C,H,W], target [B,H,W]
        ce = self.base_ce(logits, target)  # [B,H,W]
        with torch.no_grad():
            # boundary mask: true if any neighbor differs
            pad = F.pad(target.unsqueeze(1).float(), (1,1,1,1), mode='replicate')
            patches = F.unfold(pad, kernel_size=3, stride=1)  # [B, 9, H*W]
            center = patches[:, 4:5, :]  # center pixel
            diff = (patches != center).float().sum(dim=1)  # [B, H*W]
            boundary = (diff > 0).float().view_as(ce)
            # ignore invalid areas
            boundary[target==self.ignore_index] = 0.0
            weights = 1.0 + 2.0*boundary  # 3x weight at boundary pixels
        loss = (ce * weights).mean()
        return self.weight * loss
""")

miou_code = textwrap.dedent(r"""
import torch
import numpy as np

def compute_confusion_matrix(pred, target, num_classes=7, ignore_index=255):
    # pred/target [B,H,W]
    mask = (target != ignore_index)
    pred = pred[mask]
    target = target[mask]
    cm = torch.zeros((num_classes, num_classes), dtype=torch.int64, device=pred.device)
    k = target * num_classes + pred
    binc = torch.bincount(k, minlength=num_classes**2)
    cm += binc.reshape((num_classes, num_classes))
    return cm

def compute_miou_from_cm(cm):
    # cm: [C,C]
    cm = cm.float()
    tp = torch.diag(cm)
    denom = cm.sum(1) + cm.sum(0) - tp + 1e-6
    iou = tp / denom
    miou = iou.mean()
    return miou.item(), iou.cpu().numpy()
""")

boundary_iou_code = textwrap.dedent(r"""
import torch
import torch.nn.functional as F
import numpy as np

# proxy boundary IoU using morphological edges on class=free-space (ground+sidewalk) vs prediction
def boundary_iou_free(pred, target, free_ids=(0,1), ignore_index=255):
    # Make binary free-space masks
    p_free = torch.zeros_like(pred, dtype=torch.uint8)
    t_free = torch.zeros_like(target, dtype=torch.uint8)
    for fid in free_ids:
        p_free |= (pred == fid)
        t_free |= (target == fid)
    # edges: XOR of dilated and eroded => approximate boundary
    # Using 3x3 dilation/erosion via maxpool/minpool style operations
    def edge_mask(x):
        x = x.float().unsqueeze(1)
        dil = F.max_pool2d(x, 3, stride=1, padding=1)
        ero = -F.max_pool2d(-x, 3, stride=1, padding=1)
        e = (dil - ero).clamp(0,1).squeeze(1).bool()
        return e
    pe = edge_mask(p_free)
    te = edge_mask(t_free)
    inter = (pe & te).sum().item()
    union = (pe | te).sum().item() + 1e-6
    return inter / union
""")

recall2m_code = textwrap.dedent(r"""
import numpy as np
import torch

# Approximate Recall@2m for obstacle classes {person(4), car(5), stairs(2), water(3)}
def recall_at_2m(pred, depth_m, classes=(4,5,2,3), thresh=2.0, ignore_index=255):
    # pred: [H,W], depth_m: [H,W] in meters (float)
    if depth_m is None:
        return None
    mask_close = (depth_m <= thresh)
    if mask_close.sum() == 0:
        return 1.0
    obs_pred = np.zeros_like(pred, dtype=bool)
    for c in classes:
        obs_pred |= (pred == c)
    tp = (obs_pred & mask_close)
    recall = tp.sum() / (mask_close.sum() + 1e-6)
    return float(recall)
""")

root = Path("semnav_terrain")
(root / "semnav" / "models" / "deeplabv3.py").write_text(model_code)
(root / "semnav" / "losses" / "dice.py").write_text(dice_code)
(root / "semnav" / "losses" / "boundary.py").write_text(boundary_code)
(root / "semnav" / "metrics" / "miou.py").write_text(miou_code)
(root / "semnav" / "metrics" / "boundary_iou.py").write_text(boundary_iou_code)
(root / "semnav" / "metrics" / "recall2m.py").write_text(recall2m_code)

print("Wrote model + losses + metrics code into semnav/ package.")

Wrote model + losses + metrics code into semnav/ package.


In [17]:
# What this cell does:
# 1) Converts any 3-channel color masks in semnav_terrain/data/labels to single-channel integer ID maps.
#    - Builds/uses a persistent color→index mapping so the same color gets the same ID across files.
# 2) Ensures data/label_mapping.json exists; if missing, creates a template that maps found IDs to 255 except maybe 0→0.
# 3) Applies the source→target mapping so labels end up in {0..6} with 255 as ignore.
# 4) Prints a quick summary and sample checks.

import json, random
from pathlib import Path
import numpy as np
import cv2

root = Path("semnav_terrain")
lbl_dir = root / "data" / "labels"
mapping_path = root / "data" / "label_mapping.json"
color_index_map_path = root / "data" / "auto_color_to_index.json"

lbl_files = sorted(lbl_dir.glob("*.png"))
if not lbl_files:
    raise RuntimeError("No label PNGs found in semnav_terrain/data/labels. Did standardization run?")

# --- Step 1: Convert 3-channel color masks to single-channel index masks ---
# persistent { "b,g,r": idx }
if color_index_map_path.exists():
    color_map = json.loads(color_index_map_path.read_text())
else:
    color_map = {}

next_idx = 0
if color_map:
    next_idx = 1 + max(int(v) for v in color_map.values())

converted = 0
suspicious = 0

for f in lbl_files:
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    if m is None:
        continue

    # If already single-channel, skip
    if m.ndim == 2:
        continue

    # If 3-channel color mask: map each color to an integer index
    if m.ndim == 3 and m.shape[2] == 3:
        # If channels are identical (gray stored as 3ch), squeeze
        if np.allclose(m[:,:,0], m[:,:,1]) and np.allclose(m[:,:,1], m[:,:,2]):
            m_id = m[:,:,0].copy()
        else:
            # Build color → id using persistent global mapping
            H, W, _ = m.shape
            b = m[:,:,0].astype(np.uint32)
            g = m[:,:,1].astype(np.uint32)
            r = m[:,:,2].astype(np.uint32)
            code = (b << 16) | (g << 8) | r  # unique code per BGR color

            # Collect unique colors in this image
            uniq = np.unique(code)
            # Heuristic warning: too many unique colors likely means this wasn't a discrete mask
            if uniq.size > 64:
                suspicious += 1

            # Assign IDs for any new colors
            for c in uniq.tolist():
                key = f"{(c>>16)&255},{(c>>8)&255},{c&255}"  # "b,g,r"
                if key not in color_map:
                    color_map[key] = int(next_idx)
                    next_idx += 1

            # Vectorized map from code → id
            # Build lookup table (dict) for this file's colors
            lut = {}
            for c in uniq.tolist():
                key = f"{(c>>16)&255},{(c>>8)&255},{c&255}"
                lut[c] = color_map[key]

            # Apply lut
            flat = code.reshape(-1)
            out = np.empty_like(flat, dtype=np.uint16)
            for c in uniq.tolist():
                out[flat == c] = lut[c]
            m_id = out.reshape(H, W).astype(np.uint8)

        # Overwrite label with single-channel ID image
        cv2.imwrite(str(f), m_id)
        converted += 1

# Save/refresh persistent color map
with open(color_index_map_path, "w") as fp:
    json.dump(color_map, fp, indent=2)

print(f"Converted {converted} color masks to single-channel IDs.")
if suspicious > 0:
    print(f"WARNING: {suspicious} label(s) had >64 unique colors; they might not be true masks. Verify those datasets.")

# --- Step 2: Ensure label_mapping.json exists (source→target) ---
# After conversion, current labels contain compact integer source IDs (from color map or original).
# We need to map those source IDs to our target {0..6} with 255 ignore.
# If mapping file doesn't exist, create a template with found IDs → 255 except maybe 0→0.

# Collect unique source IDs now present
unique_ids = set()
for f in lbl_files:
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    if m is None:
        continue
    if m.ndim != 2:
        # if anything still multi-channel, squeeze first channel as a last-resort
        m = m[:,:,0]
        cv2.imwrite(str(f), m)
    unique_ids.update(np.unique(m).tolist())
unique_ids = sorted(int(x) for x in unique_ids)

if mapping_path.exists():
    mapping = json.loads(mapping_path.read_text())
else:
    # Create a fresh template
    default_map = {str(int(k)): 255 for k in unique_ids}
    if "0" in default_map:
        default_map["0"] = 0  # common case: 0 is background/ground
    mapping = {
        "target_names": ["ground","sidewalk","stairs","water","person","car","sky"],
        "ignore_index": 255,
        "source_to_target": default_map
    }
    with open(mapping_path, "w") as f:
        json.dump(mapping, f, indent=2)
    print("Created template mapping at data/label_mapping.json. Edit it if needed.")

# --- Step 3: Apply source→target mapping to all labels (in place) ---
ignore_index = int(mapping.get("ignore_index", 255))
s2t = {int(k): int(v) for k, v in mapping["source_to_target"].items()}

remapped = 0
for f in lbl_files:
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    if m is None: 
        continue
    if m.ndim != 2:
        m = m[:,:,0]
    flat = m.reshape(-1)
    out = np.full_like(flat, ignore_index, dtype=np.uint8)
    for s_id, t_id in s2t.items():
        out[flat == s_id] = t_id
    out = out.reshape(m.shape)
    cv2.imwrite(str(f), out)
    remapped += 1

print(f"Applied source→target mapping to {remapped} labels.")
print("Unique target IDs now present (sampled):")
check_ids = set()
for f in random.sample(lbl_files, min(8, len(lbl_files))):
    m = cv2.imread(str(f), cv2.IMREAD_UNCHANGED)
    check_ids.update(np.unique(m).tolist())
print(sorted(int(x) for x in check_ids)[:30])

# --- Step 4: Spot-check a couple of masks to ensure 2D integer format ---
samples = random.sample(lbl_files, min(3, len(lbl_files)))
for s in samples:
    m = cv2.imread(str(s), cv2.IMREAD_UNCHANGED)
    print(f"{s.name}: shape={m.shape}, dtype={m.dtype}, uniques={sorted(list(np.unique(m)))[:12]}")

Converted 0 color masks to single-channel IDs.
Applied source→target mapping to 328 labels.
Unique target IDs now present (sampled):
[0, 255]
180.png: shape=(720, 1280), dtype=uint8, uniques=[np.uint8(0), np.uint8(255)]
472color.png: shape=(480, 640), dtype=uint8, uniques=[np.uint8(0), np.uint8(255)]
3200.png: shape=(720, 1280), dtype=uint8, uniques=[np.uint8(0), np.uint8(255)]


In [18]:
# Cell 3B-fix — make 'semnav' importable for this session

import sys, importlib
from pathlib import Path

# <-- If your project lives elsewhere, change this path:
project_root = Path("/Users/jayamdaxeshkumarshah/semnav_terrain")

# sanity check
pkg_init = project_root / "semnav" / "__init__.py"
assert pkg_init.exists(), f"Didn't find semnav at {pkg_init} — adjust project_root above."

# put project on Python path (front)
pr = str(project_root.resolve())
if pr not in sys.path:
    sys.path.insert(0, pr)
print("Added to sys.path:", sys.path[0])

# reload import caches and smoke test
importlib.invalidate_caches()
import semnav
print("Imported 'semnav' from:", Path(semnav.__file__).parent)

# optional: test model import
from semnav.models.deeplabv3 import DeeplabV3_MBV3
print("Imported DeeplabV3_MBV3 OK.")

Added to sys.path: /Users/jayamdaxeshkumarshah/semnav_terrain
Imported 'semnav' from: /Users/jayamdaxeshkumarshah/semnav_terrain/semnav
Imported DeeplabV3_MBV3 OK.


In [19]:
# What this cell does:
# - Implements a simple training loop (CPU) for fold=0.
# - Tracks val mIoU and saves best.pth into runs/fold0/.
# - Small epochs & batch size to validate the pipeline.

import os, time, math
import torch, yaml
import numpy as np
from pathlib import Path
from tqdm import tqdm

from semnav.models.deeplabv3 import DeeplabV3_MBV3
from semnav.losses.dice import DiceLoss
from semnav.losses.boundary import BoundaryLoss
from semnav.metrics.miou import compute_confusion_matrix, compute_miou_from_cm

root = Path("semnav_terrain")
cfg_path = root / "configs" / "deeplabv3_mbv3.yaml"
with open(cfg_path, "r") as f:
    cfg = yaml.safe_load(f)

num_classes = int(cfg["num_classes"])
EPOCHS = int(cfg["train"]["epochs"])
BATCH_SIZE = int(cfg["train"]["batch_size"])
LR = float(cfg["optimizer"]["lr"])
WD = float(cfg["optimizer"]["weight_decay"])
use_amp = bool(cfg["train"].get("amp", False))

device = torch.device("cpu")  # CPU only as requested
model = DeeplabV3_MBV3(num_classes=num_classes, pretrained=True).to(device)

# DataLoaders from previous cell (train_loader, val_loader) are already set

# Class weights (median frequency) optional -> here just None (beginner-friendly)
ce_weight = None
ce = torch.nn.CrossEntropyLoss(ignore_index=255, weight=ce_weight)
dice = DiceLoss(ignore_index=255)
bnd  = BoundaryLoss(ignore_index=255, weight=1.0)

opt = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WD)

run_dir = root / "runs" / "fold0"
run_dir.mkdir(parents=True, exist_ok=True)
best_miou = -1.0
best_path = run_dir / "best.pth"

for epoch in range(1, EPOCHS+1):
    model.train()
    losses = []
    pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{EPOCHS} [train]")
    for xb, db, yb, kb in pbar:
    # skip tiny batch to avoid BN error in ASPP pooling
        if xb.size(0) == 1:
            continue
        xb, yb = xb.to(device), yb.to(device)
        opt.zero_grad()
        logits = model(xb)
        loss_ce = ce(logits, yb)
        loss_dice = dice(logits, yb, num_classes=num_classes) * float(cfg["loss"]["dice"])
        loss_bnd  = bnd(logits, yb) * float(cfg["loss"]["boundary"])
        loss = loss_ce + loss_dice + loss_bnd
        loss.backward()
        opt.step()
        losses.append(loss.item())
        pbar.set_postfix(loss=np.mean(losses))

    # Validation
    model.eval()
    cm = torch.zeros((num_classes, num_classes), dtype=torch.int64, device=device)
    with torch.no_grad():
        for xb, db, yb, kb in tqdm(val_loader, desc=f"Epoch {epoch}/{EPOCHS} [val]"):
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            pred = torch.argmax(logits, dim=1)
            cm += compute_confusion_matrix(pred, yb, num_classes=num_classes, ignore_index=255)
    miou, per_class = compute_miou_from_cm(cm)
    print(f"Epoch {epoch}: val mIoU = {miou:.4f}")

    if miou > best_miou:
        best_miou = miou
        torch.save({"model": model.state_dict(), "miou": best_miou, "epoch": epoch}, best_path)
        print(">> Saved new best to", best_path)

print("Training done. Best mIoU:", best_miou)

Epoch 1/5 [train]: 100%|████████████| 33/33 [00:37<00:00,  1.15s/it, loss=0.766]
Epoch 1/5 [val]: 100%|████████████████████████████| 7/7 [00:02<00:00,  2.49it/s]


Epoch 1: val mIoU = 0.1186
>> Saved new best to semnav_terrain/runs/fold0/best.pth


Epoch 2/5 [train]: 100%|████████████| 33/33 [00:38<00:00,  1.16s/it, loss=0.393]
Epoch 2/5 [val]: 100%|████████████████████████████| 7/7 [00:02<00:00,  2.46it/s]


Epoch 2: val mIoU = 0.1265
>> Saved new best to semnav_terrain/runs/fold0/best.pth


Epoch 3/5 [train]: 100%|██████████████| 33/33 [00:37<00:00,  1.15s/it, loss=nan]
Epoch 3/5 [val]: 100%|████████████████████████████| 7/7 [00:02<00:00,  2.48it/s]


Epoch 3: val mIoU = 0.1361
>> Saved new best to semnav_terrain/runs/fold0/best.pth


Epoch 4/5 [train]: 100%|██████████████| 33/33 [00:37<00:00,  1.15s/it, loss=nan]
Epoch 4/5 [val]: 100%|████████████████████████████| 7/7 [00:02<00:00,  2.46it/s]


Epoch 4: val mIoU = 0.1414
>> Saved new best to semnav_terrain/runs/fold0/best.pth


Epoch 5/5 [train]: 100%|████████████| 33/33 [00:38<00:00,  1.15s/it, loss=0.311]
Epoch 5/5 [val]: 100%|████████████████████████████| 7/7 [00:02<00:00,  2.52it/s]

Epoch 5: val mIoU = 0.1426
>> Saved new best to semnav_terrain/runs/fold0/best.pth
Training done. Best mIoU: 0.14264614880084991





In [20]:
# What this cell does:
# - Loads best.pth (CPU), ignoring unexpected 'aux_classifier' keys.
# - Runs the model on a handful of val images.
# - Saves color overlays to runs/fold0/vis/.

import sys, json
import cv2, numpy as np, torch
from pathlib import Path
from tqdm import tqdm

# Ensure local package is importable (adjust if needed)
project_root = Path("semnav_terrain").resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from semnav.models.deeplabv3 import DeeplabV3_MBV3  # after sys.path fix

root = Path("semnav_terrain")
vis_dir = root / "runs" / "fold0" / "vis"
vis_dir.mkdir(parents=True, exist_ok=True)

# Palette in RGB (we'll convert to BGR for OpenCV overlays)
PALETTE_RGB = np.array([
  [128,64,128],  # ground
  [244,35,232],  # sidewalk
  [70,70,70],    # stairs
  [0,0,142],     # water
  [220,20,60],   # person
  [0,0,230],     # car
  [70,130,180],  # sky
], dtype=np.uint8)
PALETTE_BGR = PALETTE_RGB[:, ::-1]  # convert to BGR for cv2

# Load trained weights (CPU) and ignore unexpected keys like aux_classifier.*
ckpt_path = root / "runs" / "fold0" / "best.pth"
assert ckpt_path.exists(), f"Checkpoint not found: {ckpt_path}"
ckpt = torch.load(ckpt_path, map_location="cpu")

model = DeeplabV3_MBV3(num_classes=7, pretrained=False)
incompat = model.load_state_dict(ckpt["model"], strict=False)  # <-- key fix
# Optional: print a small summary of missing/unexpected (PyTorch returns IncompatibleKeys)
try:
    print("Missing keys:", len(incompat.missing_keys), "Unexpected keys:", len(incompat.unexpected_keys))
except Exception:
    pass
model.eval()

# Helper: preprocessing like in training (normalize to ImageNet stats)
def preprocess_to_tensor(bgr, w, h):
    resized = cv2.resize(bgr, (w, h), interpolation=cv2.INTER_LINEAR)
    rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
    std  = np.array([0.229, 0.224, 0.225], dtype=np.float32)
    rgb = (rgb - mean) / std
    x = torch.from_numpy(rgb.transpose(2, 0, 1)).unsqueeze(0)  # [1,3,H,W]
    return x

# Load a few val keys
splits_dir = root / "data" / "splits"
val_keys = [l.strip() for l in open(splits_dir / "fold0_val.txt") if l.strip()]
val_keys = val_keys[:12] if len(val_keys) > 12 else val_keys

img_dir = root / "data" / "images"
lbl_dir = root / "data" / "labels"

# Input size (W,H)
cfg_inp = (512, 384)

print(f"Running qualitative predictions on {len(val_keys)} images...")
for key in tqdm(val_keys):
    img_path = img_dir / f"{key}.png"
    if not img_path.exists():
        continue
    bgr = cv2.imread(str(img_path), cv2.IMREAD_COLOR)
    if bgr is None:
        continue
    H, W = bgr.shape[:2]

    x = preprocess_to_tensor(bgr, *cfg_inp)

    with torch.no_grad():
        logits = model(x)              # [1,C,h,w]
        pred = torch.argmax(logits, dim=1)[0].cpu().numpy().astype(np.uint8)  # [h,w]

    # Colorize and resize back to original size
    vis_small = PALETTE_BGR[pred]  # [h,w,3] in BGR
    vis = cv2.resize(vis_small, (W, H), interpolation=cv2.INTER_NEAREST)

    # Blend overlay
    overlay = cv2.addWeighted(bgr, 0.45, vis, 0.55, 0.0)

    # Optional: highlight free-space (ground ∪ sidewalk => ids 0 and 1)
    free_small = ((pred == 0) | (pred == 1)).astype(np.uint8) * 255
    free = cv2.resize(free_small, (W, H), interpolation=cv2.INTER_NEAREST)
    overlay_free = overlay.copy()
    overlay_free[free == 255] = (0.8 * overlay_free[free == 255] + np.array([0, 255, 0]) * 0.2).astype(np.uint8)

    # Save
    out_path = vis_dir / f"{key}_overlay.png"
    cv2.imwrite(str(out_path), overlay_free)

print(f"Saved overlays to: {vis_dir}")

Missing keys: 0 Unexpected keys: 8
Running qualitative predictions on 12 images...


100%|███████████████████████████████████████████| 12/12 [00:02<00:00,  4.79it/s]

Saved overlays to: semnav_terrain/runs/fold0/vis





In [21]:
# What this cell does:
# - Writes a lightweight FusionLite model file that fuses a depth branch into RGB features.
# - This is optional for CPU; you can skip training this model for now.

from pathlib import Path
import textwrap

fusion_code = textwrap.dedent(r"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models.segmentation import deeplabv3_mobilenet_v3_large

class DepthBranch(nn.Module):
    def __init__(self, in_ch=1, out_ch=64):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Conv2d(in_ch, 32, 3, 2, 1), nn.BatchNorm2d(32), nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, 3, 2, 1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
            nn.Conv2d(64, out_ch, 3, 2, 1), nn.BatchNorm2d(out_ch), nn.ReLU(inplace=True),
        )
    def forward(self, x):  # downsample depth to stride ~8
        return self.enc(x)

class FusionLite(nn.Module):
    def __init__(self, num_classes=7, pretrained=True):
        super().__init__()
        self.rgb = deeplabv3_mobilenet_v3_large(weights='DEFAULT' if pretrained else None)
        # head in_ch
        in_ch = self.rgb.classifier[-1].in_channels
        self.rgb.classifier[-1] = nn.Conv2d(in_ch, num_classes, 1)

        self.depth = DepthBranch(1, out_ch=128)
        # gating conv for fusion at stride ~8
        self.gate = nn.Sequential(
            nn.Conv2d(in_ch + 128, in_ch, 1), nn.Sigmoid()
        )

    def forward(self, x_rgb, x_depth):
        # feature from backbone classifier input (ASPP in-channels)
        # In torchvision impl, classifier input is not directly exposed;
        # simple trick: run forward once to get logits, then re-run limited layers is complex.
        # Here we do late fusion into logits space (approx): fuse via gate at classifier input proxy using ASPP output.
        out_dict = self.rgb(x_rgb)
        logits_rgb = out_dict['out']

        # make depth features stride ~8
        Fd = self.depth(x_depth)
        # up/down sample to match logits spatial size
        if Fd.shape[-2:] != logits_rgb.shape[-2:]:
            Fd = F.interpolate(Fd, size=logits_rgb.shape[-2:], mode='bilinear', align_corners=False)

        # build a proxy feature stack: [logits_rgb (C classes) , Fd(128)]
        # expand logits to in_ch by 1x1 conv to match channels
        proj = nn.Conv2d(logits_rgb.shape[1], logits_rgb.shape[1], 1).to(logits_rgb.device)
        Frgb_proj = proj(logits_rgb)
        gate_in = torch.cat([Frgb_proj, Fd], dim=1)
        g = self.gate(gate_in)
        fused = Frgb_proj + g * Fd.mean(dim=1, keepdim=True)  # simple gate on avg depth feat
        # final 1x1 to classes
        head = nn.Conv2d(fused.shape[1], logits_rgb.shape[1], 1).to(logits_rgb.device)
        out = head(fused)
        return out
""")

(root / "semnav" / "models" / "fusion_lite.py").write_text(fusion_code)
print("Wrote FusionLite model stub. (Training optional on CPU)")

Wrote FusionLite model stub. (Training optional on CPU)


In [22]:
# Cell 5A (fixed v2) — evaluate & log CSV with aux-keys ignored and proper tensor inputs

import csv, torch
from pathlib import Path
import numpy as np

from semnav.models.deeplabv3 import DeeplabV3_MBV3
from semnav.metrics.miou import compute_confusion_matrix, compute_miou_from_cm
try:
    from semnav.metrics.boundary_iou import boundary_iou_free
    HAS_BIOU = True
except Exception:
    HAS_BIOU = False

root = Path("semnav_terrain")
ckpt_path = root / "runs" / "fold0" / "best.pth"
assert ckpt_path.exists(), f"Missing checkpoint: {ckpt_path}"

# Load model (CPU), ignore aux head keys
model = DeeplabV3_MBV3(num_classes=7, pretrained=False)
ckpt = torch.load(ckpt_path, map_location="cpu")
_ = model.load_state_dict(ckpt["model"], strict=False)
model.eval()

def evaluate_to_csv(model, loader, csv_path, num_classes=7):
    device = torch.device("cpu")
    model.to(device).eval()

    cm = torch.zeros((num_classes, num_classes), dtype=torch.int64, device=device)
    biou_vals = []

    with torch.no_grad():
        for xb, db, yb, kb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            pred = torch.argmax(logits, dim=1)  # [B,H,W] (torch.LongTensor)

            cm += compute_confusion_matrix(pred, yb, num_classes=num_classes, ignore_index=255)

            if HAS_BIOU:
                # >>> PASS TORCH TENSORS (CPU) — not numpy <<<
                for i in range(pred.size(0)):
                    biou_vals.append(boundary_iou_free(pred[i].cpu(), yb[i].cpu()))

    miou, per_class = compute_miou_from_cm(cm)
    biou = float(np.nanmean(biou_vals)) if (HAS_BIOU and len(biou_vals) > 0) else np.nan

    # write CSV row
    csv_path = Path(csv_path)
    write_header = not csv_path.exists()
    with csv_path.open("a", newline="") as fp:
        w = csv.writer(fp)
        if write_header:
            w.writerow(["miou"] + [f"class_{i}_iou" for i in range(num_classes)] + ["boundary_iou"])
        w.writerow([float(miou)] + [float(x) for x in per_class] + [biou])

    return miou, biou

miou, biou = evaluate_to_csv(model, val_loader, root / "runs" / "fold0" / "val_metrics.csv", num_classes=7)
print(f"Val mIoU: {miou:.4f}  |  Boundary IoU: {biou if not np.isnan(biou) else 'N/A'}")
print("Wrote CSV ->", root / "runs" / "fold0" / "val_metrics.csv")

Val mIoU: 0.1426  |  Boundary IoU: 0.0
Wrote CSV -> semnav_terrain/runs/fold0/val_metrics.csv


In [23]:
# Cell 5B — Fix metrics summary (read multi-column CSV, write summary.csv, save charts)

import csv
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt

root = Path("semnav_terrain")
metrics_csv = root / "runs" / "fold0" / "val_metrics.csv"
reports_dir = root / "reports"
figs_dir = reports_dir / "figs"
reports_dir.mkdir(parents=True, exist_ok=True)
figs_dir.mkdir(parents=True, exist_ok=True)

# 1) Read the multi-column CSV and take the LAST row
assert metrics_csv.exists(), f"Metrics CSV not found: {metrics_csv}"
rows = []
with metrics_csv.open("r", newline="") as f:
    r = csv.DictReader(f)
    for row in r:
        rows.append(row)
assert rows, f"No rows found in {metrics_csv}"
last = rows[-1]  # latest metrics

# 2) Parse numeric fields safely
def to_float(x):
    try:
        return float(x)
    except Exception:
        return float("nan")

miou = to_float(last.get("miou", "nan"))
biou = to_float(last.get("boundary_iou", "nan"))

# collect per-class ious (any headers like class_0_iou ... class_6_iou)
class_cols = [k for k in last.keys() if k.startswith("class_") and k.endswith("_iou")]
class_cols_sorted = sorted(
    class_cols,
    key=lambda k: int(k.split("_")[1]) if k.split("_")[1].isdigit() else 999
)
per_class_vals = [to_float(last[k]) for k in class_cols_sorted]

# 3) Write a simple 2-column summary CSV: metric,value (includes per-class + headline metrics)
summary_csv = reports_dir / "summary.csv"
with summary_csv.open("w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["metric", "value"])
    w.writerow(["miou", miou])
    w.writerow(["boundary_iou", biou])
    for k, v in zip(class_cols_sorted, per_class_vals):
        w.writerow([k, v])

print("Wrote summary CSV ->", summary_csv)

# 4) Save charts (matplotlib, single-color default)
# 4a) Headline metrics bar (mIoU & Boundary IoU)
labels = ["mIoU", "Boundary IoU"]
vals = [miou if not np.isnan(miou) else 0.0,
        biou if not np.isnan(biou) else 0.0]

plt.figure()
plt.bar(labels, vals)
plt.ylim(0, 1.0)
plt.title("Validation Metrics (fold0 latest)")
plt.ylabel("Score")
for i, v in enumerate(vals):
    txt = f"{v:.3f}" if not np.isnan([miou, biou][i]) else "N/A"
    plt.text(i, min(v + 0.02, 0.98), txt, ha="center", va="bottom")
head_fig = figs_dir / "metrics_headline.png"
plt.savefig(head_fig, dpi=150, bbox_inches="tight")
plt.close()
print("Saved figure ->", head_fig)

# 4b) Per-class IoU bar chart (if available)
if per_class_vals:
    plt.figure()
    x = np.arange(len(per_class_vals))
    plt.bar(x, per_class_vals)
    plt.ylim(0, 1.0)
    plt.title("Per-class IoU (fold0 latest)")
    plt.ylabel("IoU")
    plt.xlabel("Class index")
    plt.xticks(x, [c.split("_")[1] for c in class_cols_sorted])
    for i, v in enumerate(per_class_vals):
        plt.text(i, min(v + 0.02, 0.98), f"{v:.2f}" if not np.isnan(v) else "N/A", ha="center", va="bottom", fontsize=8)
    cls_fig = figs_dir / "per_class_iou.png"
    plt.savefig(cls_fig, dpi=150, bbox_inches="tight")
    plt.close()
    print("Saved figure ->", cls_fig)
else:
    print("No per-class IoU columns found in CSV; skipped per-class chart.")

print("Done. Open reports/summary.csv and reports/figs/*.png to view results.")

Wrote summary CSV -> semnav_terrain/reports/summary.csv
Saved figure -> semnav_terrain/reports/figs/metrics_headline.png
Saved figure -> semnav_terrain/reports/figs/per_class_iou.png
Done. Open reports/summary.csv and reports/figs/*.png to view results.


In [24]:
# Cell 6A-legacy — Export TorchScript + ONNX using the legacy exporter (no onnxscript required)

import sys, torch
from pathlib import Path

# Make local package importable
project_root = Path("semnav_terrain").resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from semnav.models.deeplabv3 import DeeplabV3_MBV3

root = Path("semnav_terrain")
ckpt_path = root / "runs" / "fold0" / "best.pth"
export_dir = root / "export"
export_dir.mkdir(parents=True, exist_ok=True)

# 1) Build model (CPU) and load weights, stripping aux head keys
model = DeeplabV3_MBV3(num_classes=7, pretrained=False)
state = torch.load(ckpt_path, map_location="cpu")["model"]
# Strip any aux head weights so strict load succeeds
state = {k: v for k, v in state.items() if not k.startswith("net.aux_classifier.")}
_ = model.load_state_dict(state, strict=False)
model.eval()

# 2) TorchScript (trace)
dummy = torch.randn(1, 3, 384, 512)  # H,W as in config
ts_path = export_dir / "model.ts"
traced = torch.jit.trace(model, dummy)
traced.save(str(ts_path))
print("Saved TorchScript ->", ts_path)

# 3) ONNX export (force legacy exporter by passing dynamo=False; fallback if arg not supported)
onnx_path = export_dir / "model.onnx"
export_kwargs = dict(
    input_names=["input"],
    output_names=["logits"],
    dynamic_axes={"input": {0: "N", 2: "H", 3: "W"},
                  "logits": {0: "N", 2: "h", 3: "w"}},
    opset_version=12,
    do_constant_folding=True,
)
try:
    torch.onnx.export(model, dummy, str(onnx_path), dynamo=False, **export_kwargs)
except TypeError:
    # Older torch without 'dynamo' arg
    torch.onnx.export(model, dummy, str(onnx_path), **export_kwargs)

print("Saved ONNX ->", onnx_path)

# Quick forward check
with torch.no_grad():
    out = model(dummy)
    print("Sanity forward OK. Logits shape:", tuple(out.shape))

Saved TorchScript -> semnav_terrain/export/model.ts


  torch.onnx.export(model, dummy, str(onnx_path), dynamo=False, **export_kwargs)


Saved ONNX -> semnav_terrain/export/model.onnx
Sanity forward OK. Logits shape: (1, 7, 384, 512)


In [25]:
# Cell 6B — PyTorch ↔ ONNX parity check (strips aux keys, CPU)

import sys, cv2, numpy as np, torch, onnxruntime as ort
from pathlib import Path

# Make local package importable
project_root = Path("semnav_terrain").resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from semnav.models.deeplabv3 import DeeplabV3_MBV3

root = Path("semnav_terrain")
ckpt_path = root / "runs" / "fold0" / "best.pth"
onnx_path = root / "export" / "model.onnx"

# --- Build PT model (CPU) and load weights, STRIPPING aux head keys ---
state = torch.load(ckpt_path, map_location="cpu")["model"]
state = {k: v for k, v in state.items() if not k.startswith("net.aux_classifier.")}
pt_model = DeeplabV3_MBV3(num_classes=7, pretrained=False)
_ = pt_model.load_state_dict(state, strict=True)
pt_model.eval()

# --- ONNXRuntime session (CPU) ---
sess = ort.InferenceSession(str(onnx_path), providers=["CPUExecutionProvider"])
inp_name = sess.get_inputs()[0].name

# --- Preprocess: BGR -> RGB, normalize, NCHW float32 @ 512x384 ---
mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
std  = np.array([0.229, 0.224, 0.225], dtype=np.float32)
W, H = 512, 384

def preprocess(bgr):
    rsz = cv2.resize(bgr, (W, H), interpolation=cv2.INTER_LINEAR)
    rgb = cv2.cvtColor(rsz, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    rgb = (rgb - mean) / std
    x = rgb.transpose(2, 0, 1)[None, ...].astype(np.float32)  # [1,3,H,W]
    return x

# --- Compare on a handful of val images ---
val_keys = [l.strip() for l in open(root/"data"/"splits"/"fold0_val.txt") if l.strip()]
val_keys = val_keys[:10] if len(val_keys) > 10 else val_keys

diffs = []
for key in val_keys:
    img = cv2.imread(str(root / "data" / "images" / f"{key}.png"), cv2.IMREAD_COLOR)
    if img is None:
        continue
    x_np = preprocess(img)

    with torch.no_grad():
        logits_pt = pt_model(torch.from_numpy(x_np).float())
        pm_pt = logits_pt.argmax(1).cpu().numpy()    # [1,h,w]

    logits_onnx = sess.run(None, {inp_name: x_np})[0]  # [1,C,h,w]
    pm_onnx = logits_onnx.argmax(1)                    # [1,h,w]

    # Ensure shapes match; if not, resize ONNX pred to PT shape
    if pm_pt.shape != pm_onnx.shape:
        h, w = pm_pt.shape[-2], pm_pt.shape[-1]
        pm_onnx = pm_onnx.astype(np.uint8)
        pm_onnx = np.array([cv2.resize(pm_onnx[0], (w, h), interpolation=cv2.INTER_NEAREST)])[None, ...]

    diffs.append(np.mean(pm_pt != pm_onnx))

print("Compared on", len(diffs), "images.")
print("Mean pixel disagreement:", float(np.mean(diffs)) if diffs else "N/A")
print("Max pixel disagreement:", float(np.max(diffs)) if diffs else "N/A")

Compared on 10 images.
Mean pixel disagreement: 0.0
Max pixel disagreement: 0.0


In [30]:
# webcam_demo.py — segmentation + free-space + pseudo-depth + YOLO boxes within X meters
# Keys:
#   ESC/q : quit
#   c     : calibrate depth (maps center ROI to --calib-distance meters)

import argparse, time, subprocess, platform
from pathlib import Path
import numpy as np
import cv2

# ---------------- path helpers (robust for VS Code & Jupyter) ----------------
try:
    SCRIPT_DIR = Path(__file__).resolve().parent
except NameError:
    # __file__ doesn't exist in a Jupyter cell → use current working directory
    SCRIPT_DIR = Path.cwd()

def resolve_path(plike):
    """Expand ~, env vars; if relative, try as-is, SCRIPT_DIR/..., and CWD/..."""
    if plike is None:
        return None
    p = Path(str(plike)).expanduser()
    cands = [p]
    if not p.is_absolute():
        cands += [SCRIPT_DIR / p, Path.cwd() / p]
    for c in cands:
        if c.exists():
            return c
    # not found; return first candidate so error message shows what we tried first
    return cands[0]

# ---------------- optional deps ----------------
try:
    import torch
    TORCH_OK = True
except Exception:
    TORCH_OK = False

try:
    import onnxruntime as ort
    ORT_OK = True
except Exception:
    ORT_OK = False

# ---------------- palettes / labels ----------------
PALETTE = np.array([
  [128,  64, 128],  # ground
  [244,  35, 232],  # sidewalk
  [ 70,  70,  70],  # stairs
  [  0,   0, 142],  # water
  [220,  20,  60],  # person
  [  0,   0, 230],  # car
  [ 70, 130, 180],  # sky
], dtype=np.uint8)

COCO80 = [
    'person','bicycle','car','motorcycle','airplane','bus','train','truck','boat','traffic light',
    'fire hydrant','stop sign','parking meter','bench','bird','cat','dog','horse','sheep','cow',
    'elephant','bear','zebra','giraffe','backpack','umbrella','handbag','tie','suitcase','frisbee',
    'skis','snowboard','sports ball','kite','baseball bat','baseball glove','skateboard',
    'surfboard','tennis racket','bottle','wine glass','cup','fork','knife','spoon','bowl','banana',
    'apple','sandwich','orange','broccoli','carrot','hot dog','pizza','donut','cake','chair','couch',
    'potted plant','bed','dining table','toilet','tv','laptop','mouse','remote','keyboard','cell phone',
    'microwave','oven','toaster','sink','refrigerator','book','clock','vase','scissors','teddy bear',
    'hair drier','toothbrush'
]

# ---------------- helpers ----------------
def preprocess(frame_bgr, w, h):
    img = cv2.resize(frame_bgr, (w, h), interpolation=cv2.INTER_LINEAR)
    x = img.astype(np.float32) / 255.0
    x = x[:, :, ::-1]
    mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
    std  = np.array([0.229, 0.224, 0.225], dtype=np.float32)
    x = (x - mean) / std
    x = x.transpose(2, 0, 1)[None, ...].astype(np.float32, copy=False)
    return x

def preprocess_depth(frame_bgr, w, h):
    img = cv2.resize(frame_bgr, (w, h), interpolation=cv2.INTER_LINEAR)
    x = img.astype(np.float32) / 255.0
    x = x[:, :, ::-1]
    mean = np.array([0.485, 0.456, 0.406], dtype=np.float32)
    std  = np.array([0.229, 0.224, 0.225], dtype=np.float32)
    x = (x - mean) / std
    x = x.transpose(2, 0, 1)[None, ...].astype(np.float32, copy=False)
    return x

def colorize(mask):
    return PALETTE[mask]

def speak(text, say_cmd="say", enable=True):
    if not enable or not text:
        return False
    if platform.system() == "Darwin":
        try:
            subprocess.Popen([say_cmd, text])
        except Exception:
            print("[speak]", text, flush=True)
    else:
        print("[speak]", text, flush=True)
    return True

# ---------------- YOLO utils ----------------
def letterbox(im, new_shape=640, color=(114,114,114), stride=32):
    if isinstance(new_shape, int):
        new_shape = (new_shape, new_shape)
    shape = im.shape[:2]
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
    dw /= 2; dh /= 2
    if shape[::-1] != new_unpad:
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)
    return im, r, (left, top)

def yolo_preprocess_bgr(im, size=640):
    im0 = im.copy()
    img, ratio, pad = letterbox(im0, new_shape=size, stride=32)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    img = img.transpose(2, 0, 1)[None, ...]
    return img.astype(np.float32), ratio, pad, im0.shape[:2]

def nms_boxes(boxes, scores, iou_thres=0.45):
    if len(boxes) == 0: return []
    boxes = boxes.astype(np.float32)
    x1, y1, x2, y2 = boxes.T
    areas = (x2 - x1) * (y2 - y1)
    order = scores.argsort()[::-1]
    keep = []
    while order.size > 0:
        i = order[0]; keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])
        w = np.maximum(0.0, xx2 - xx1)
        h = np.maximum(0.0, yy2 - yy1)
        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter + 1e-6)
        inds = np.where(ovr <= iou_thres)[0]
        order = order[inds + 1]
    return keep

def yolo_decode(pred, ratio, pad, orig_hw, conf_thres=0.35, iou_thres=0.45, classes=None, max_det=100):
    p = np.squeeze(pred, axis=0)
    if p.shape[1] < 85: return []
    xywh = p[:, 0:4]; obj = p[:, 4:5]; cls = p[:, 5:]
    cls_id = np.argmax(cls, axis=1)
    cls_score = cls[np.arange(cls.shape[0]), cls_id]
    scores = (obj[:, 0] * cls_score)
    mask = scores >= conf_thres
    if classes is not None:
        mask = mask & np.isin(cls_id, classes)
    xywh = xywh[mask]; scores = scores[mask]; cls_id = cls_id[mask]
    if xywh.size == 0: return []
    x, y, w, h = xywh.T
    x1 = x - w/2; y1 = y - h/2; x2 = x + w/2; y2 = y + h/2
    gain = ratio; padx, pady = pad
    x1 = (x1 - padx) / gain; y1 = (y1 - pady) / gain
    x2 = (x2 - padx) / gain; y2 = (y2 - pady) / gain
    H, W = orig_hw
    x1 = np.clip(x1, 0, W-1); y1 = np.clip(y1, 0, H-1)
    x2 = np.clip(x2, 0, W-1); y2 = np.clip(y2, 0, H-1)
    boxes = np.vstack([x1, y1, x2, y2]).T
    keep = nms_boxes(boxes, scores, iou_thres=iou_thres)
    if len(keep) > max_det: keep = keep[:max_det]
    return [(boxes[i], float(scores[i]), int(cls_id[i])) for i in keep]

# ---------------- IoU + simple track match ----------------
def iou_xyxy(a, b):
    ax1, ay1, ax2, ay2 = a; bx1, by1, bx2, by2 = b
    ix1, iy1 = max(ax1, bx1), max(ay1, by1)
    ix2, iy2 = min(ax2, bx2), min(ay2, by2)
    iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
    inter = iw * ih
    if inter == 0: return 0.0
    aarea = (ax2 - ax1) * (ay2 - ay1)
    barea = (bx2 - bx1) * (by2 - by1)
    return inter / (aarea + barea - inter + 1e-6)

def match_track_iou(tracks, name, box, iou_thresh=0.3):
    best_k, best_iou = None, iou_thresh
    for k, t in tracks.items():
        if t["name"] != name: continue
        iou = iou_xyxy(t["box"], box)
        if iou > best_iou:
            best_iou, best_k = iou, k
    return best_k

# ---------------- main ----------------
def main():
    ap = argparse.ArgumentParser()
    # segmentation (defaults relative to this file)
    ap.add_argument('--weights', default=str(SCRIPT_DIR / 'export/model.onnx'))
    ap.add_argument('--backend', choices=['onnx','torch'], default='onnx')
    ap.add_argument('--input-size', nargs=2, type=int, default=[512,384], metavar=('W','H'))
    ap.add_argument('--classes', nargs='+', default=['ground','sidewalk','stairs','water','person','car','sky'])
    ap.add_argument('--free-space', nargs='+', default=['ground','sidewalk'])
    # display
    ap.add_argument('--show-fps', action='store_true')
    ap.add_argument('--save-video', default=None)
    ap.add_argument('--alpha', type=float, default=0.55)
    ap.add_argument('--free-alpha', type=float, default=0.20)
    ap.add_argument('--morph', type=int, default=0)
    # speech + anti-chatter
    ap.add_argument('--speak', action='store_true')
    ap.add_argument('--say-cmd', default='say')
    ap.add_argument('--cooldown', type=float, default=6.0)
    ap.add_argument('--max-announce', type=int, default=2)
    ap.add_argument('--speak-classes', nargs='+', default=['water','stairs','person','car'])
    ap.add_argument('--speak-once', action='store_true')
    ap.add_argument('--speak-miss-frames', type=int, default=90)
    ap.add_argument('--speak-min-gap', type=float, default=2.5)
    ap.add_argument('--speak-per-class-gap', type=float, default=12.0)
    ap.add_argument('--reannounce-drop', type=float, default=0.6)
    ap.add_argument('--reannounce-abs', type=float, default=1.5)
    ap.add_argument('--reannounce-time', type=float, default=12.0)
    # pseudo-depth (default relative to this file)
    ap.add_argument('--pseudo-depth', default=str(SCRIPT_DIR / 'models/midas_small.onnx'))
    ap.add_argument('--depth-size', nargs=2, type=int, default=[256,256], metavar=('W','H'))
    ap.add_argument('--depth-interval', type=int, default=2)
    ap.add_argument('--depth-ema', type=float, default=0.7)
    ap.add_argument('--calib-distance', type=float, default=2.0)
    # detection (YOLOv5n ONNX, default relative to this file)
    ap.add_argument('--det-onnx', default=str(SCRIPT_DIR / 'models/yolov5n.onnx'))
    ap.add_argument('--det-size', type=int, default=640)
    ap.add_argument('--det-interval', type=int, default=2)
    ap.add_argument('--det-thresh', type=float, default=0.35)
    ap.add_argument('--det-iou', type=float, default=0.45)
    ap.add_argument('--det-classes', nargs='*', default=None)
    # filtering
    ap.add_argument('--max-distance', type=float, default=5.0)
    ap.add_argument('--min-area', type=int, default=120)
    ap.add_argument('--bottom-focus', type=float, default=0.0)

    # Jupyter-safe parsing
    args, _ = ap.parse_known_args()

    # -------- resolve paths and print them (helps Notebook users) --------
    weights_path = resolve_path(args.weights)
    depth_path   = resolve_path(args.pseudo_depth) if args.pseudo_depth else None
    det_path     = resolve_path(args.det_onnx) if args.det_onnx else None

    print("[paths]")
    print("  weights     :", weights_path)
    print("  pseudo-depth:", depth_path if depth_path else "None")
    print("  det-onnx    :", det_path if det_path else "None", flush=True)

    # sanity checks
    if not weights_path.exists():
        raise FileNotFoundError(
            f"Segmentation weights not found at '{args.weights}'. "
            f"Looked for: {weights_path}. "
            f"Tip: run from your project root or pass an absolute --weights path."
        )

    w, h = args.input_size
    name_to_id = {n:i for i,n in enumerate(args.classes)}
    fs_ids = [name_to_id[c] for c in args.free_space if c in name_to_id]

    # seg init
    if args.backend == 'onnx':
        assert ORT_OK, "onnxruntime not available."
        seg_sess = ort.InferenceSession(str(weights_path), providers=['CPUExecutionProvider'])
        seg_in_name = seg_sess.get_inputs()[0].name
        seg_model = None
    else:
        assert TORCH_OK, "PyTorch not available."
        if str(weights_path).endswith(".ts"):
            seg_model = torch.jit.load(str(weights_path), map_location="cpu").eval()
        else:
            from semnav.models.deeplabv3 import DeeplabV3_MBV3
            state = torch.load(str(weights_path), map_location="cpu")
            if isinstance(state, dict) and "model" in state:
                state = state["model"]
            state = {k:v for k,v in state.items() if "aux_classifier" not in k}
            seg_model = DeeplabV3_MBV3(num_classes=len(args.classes), pretrained=False)
            seg_model.load_state_dict(state, strict=False)
            seg_model.eval()
        seg_sess = None

    # depth init
    depth_sess = None; depth_in_name = None
    depth_w, depth_h = args.depth_size
    has_depth = False
    if depth_path:
        if depth_path.exists():
            depth_sess = ort.InferenceSession(str(depth_path), providers=['CPUExecutionProvider'])
            depth_in_name = depth_sess.get_inputs()[0].name
            has_depth = True
        else:
            print(f"[warn] pseudo-depth not found: {depth_path}", flush=True)

    # det init
    det_sess = None; det_in_name = None
    det_classes_idx = None
    if det_path:
        if det_path.exists():
            det_sess = ort.InferenceSession(str(det_path), providers=['CPUExecutionProvider'])
            det_in_name = det_sess.get_inputs()[0].name
            if args.det_classes:
                name2coco = {n:i for i,n in enumerate(COCO80)}
                det_classes_idx = [name2coco[n] for n in args.det_classes if n in name2coco]
        else:
            print(f"[warn] detector ONNX not found: {det_path}; continuing without boxes.", flush=True)

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        raise RuntimeError("Could not open webcam 0.")

    writer = None
    if args.save_video:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        ret_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 640
        ret_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 480
        writer = cv2.VideoWriter(args.save_video, fourcc, 30.0, (ret_w, ret_h))

    last_fps_t = time.time(); frames = 0
    inv_depth_full = None; k_scale = None
    last_dets = []
    frame_idx = 0

    # speech state
    tracks = {}
    next_track_id = 1
    last_spoken_per_class = {}
    last_any_spoken = 0.0

    # morph for free space
    if args.morph and args.morph > 0:
        ksz = args.morph if (args.morph % 2 == 1) else (args.morph + 1)
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (ksz, ksz))
    else:
        kernel = None

    while True:
        ok, frame = cap.read()
        if not ok: break

        # seg
        x = preprocess(frame, w, h)
        if seg_sess is not None:
            logits = seg_sess.run(None, {seg_in_name: x.astype(np.float32)})[0]
            mask_small = logits.argmax(1)[0].astype(np.uint8)
        else:
            with torch.no_grad():
                tx = torch.from_numpy(x)
                out = seg_model(tx)
                if isinstance(out, dict) and "out" in out:
                    out = out["out"]
                mask_small = torch.argmax(out, dim=1)[0].cpu().numpy().astype(np.uint8)
        mask = cv2.resize(mask_small, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_NEAREST)

        # depth
        if has_depth and (frame_idx % max(1, args.depth_interval) == 0):
            xd = preprocess_depth(frame, depth_w, depth_h)
            pred = depth_sess.run(None, {depth_in_name: xd})[0]
            d = np.asarray(pred)
            if d.ndim == 4: d = d[0, 0 if d.shape[1] >= 1 else 0, :, :]
            elif d.ndim == 3: d = d[0, :, :]
            d = np.maximum(d.astype(np.float32), 1e-6)
            inv_full = cv2.resize(d, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_CUBIC)
            if inv_depth_full is None:
                inv_depth_full = inv_full
            else:
                a = float(np.clip(args.depth_ema, 0.0, 0.99))
                inv_depth_full = a * inv_depth_full + (1.0 - a) * inv_full

        # overlay + free space
        vis = colorize(mask)
        overlay = cv2.addWeighted(frame, 1.0 - args.alpha, vis, args.alpha, 0.0)
        if fs_ids:
            free = np.isin(mask, np.array(fs_ids, dtype=np.uint8))
            free_u8 = (free.astype(np.uint8) * 255)
            if kernel is not None:
                free_u8 = cv2.morphologyEx(free_u8, cv2.MORPH_OPEN, kernel)
                free_u8 = cv2.morphologyEx(free_u8, cv2.MORPH_CLOSE, kernel)
            idx = free_u8 == 255
            if args.free_alpha > 0:
                overlay[idx] = (0.8 * overlay[idx] + np.array([0,255,0]) * args.free_alpha).astype(np.uint8)

        # detections
        dets_to_draw = last_dets
        if det_sess is not None and (frame_idx % max(1, args.det_interval) == 0):
            yimg, ratio, pad, orig_hw = yolo_preprocess_bgr(frame, size=args.det_size)
            pred = det_sess.run(None, {det_in_name: yimg})[0]
            raw = yolo_decode(pred, ratio, pad, orig_hw, conf_thres=args.det_thresh,
                              iou_thres=args.det_iou, classes=det_classes_idx, max_det=100)
            dets = []
            for (x1, y1, x2, y2), score, cid in raw:
                dist_m = None
                if has_depth and inv_depth_full is not None and k_scale is not None:
                    cx = int((x1 + x2) / 2.0)
                    cy = int(min(inv_depth_full.shape[0]-1, y2))
                    xlo = max(0, cx - 2); xhi = min(inv_depth_full.shape[1], cx + 3)
                    ylo = max(0, cy - 4); yhi = min(inv_depth_full.shape[0], cy + 1)
                    patch = inv_depth_full[ylo:yhi, xlo:xhi]
                    if patch.size > 0:
                        inv_med = float(np.median(patch))
                        if inv_med > 1e-6:
                            dist_m = float(np.clip(k_scale / inv_med, 0.3, 20.0))
                if dist_m is None or dist_m <= args.max_distance:
                    dets.append(((int(x1), int(y1), int(x2), int(y2)), score, cid, dist_m))
            dets_to_draw = dets
            last_dets = dets

        # draw + speech candidates
        speak_set = set(n.lower() for n in args.speak_classes) if args.speak_classes else set()
        detections = []  # (name, box, dist)
        for (x1, y1, x2, y2), score, cid, dist_m in dets_to_draw:
            name = COCO80[cid] if 0 <= cid < len(COCO80) else f"id{cid}"
            cv2.rectangle(overlay, (x1, y1), (x2, y2), (255,255,255), 2, cv2.LINE_AA)
            label = f"{name} {score:.2f}"
            if dist_m is not None: label += f" — {dist_m:.1f} m"
            ty = max(14, y1 - 6)
            cv2.putText(overlay, label, (x1, ty), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2, cv2.LINE_AA)
            if (not speak_set) or (name.lower() in speak_set):
                detections.append((name, (x1,y1,x2,y2), dist_m))

        # speech logic (unchanged from your version)
        phrases = []
        tnow = time.time()

        # simple track store
        def iou_xyxy(a, b):
            ax1, ay1, ax2, ay2 = a; bx1, by1, bx2, by2 = b
            ix1, iy1 = max(ax1, bx1), max(ay1, by1)
            ix2, iy2 = min(ax2, bx2), min(ay2, by2)
            iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1)
            inter = iw * ih
            if inter == 0: return 0.0
            aarea = (ax2 - ax1) * (ay2 - ay1)
            barea = (bx2 - bx1) * (by2 - by1)
            return inter / (aarea + barea - inter + 1e-6)

        def match_track_iou(tracks, name, box, iou_thresh=0.3):
            best_k, best_iou = None, iou_thresh
            for k, t in tracks.items():
                if t["name"] != name: continue
                iou = iou_xyxy(t["box"], box)
                if iou > best_iou:
                    best_iou, best_k = iou, k
            return best_k

        if args.speak and args.speak_once:
            stale = [k for k,t in tracks.items() if (frame_idx - t["last_seen"]) > int(args.speak_miss_frames)]
            for k in stale: del tracks[k]

            for name, box, dist_m in detections:
                if (tnow - last_spoken_per_class.get(name, 0.0)) < float(args.speak_per_class_gap):
                    k = match_track_iou(tracks, name, box, iou_thresh=0.35)
                    if k is None:
                        tid = max(tracks.keys(), default=0) + 1
                        tracks[tid] = {"name": name, "box": box, "last_seen": frame_idx,
                                       "last_dist": dist_m if dist_m is not None else float('inf'),
                                       "announced_at": 0.0}
                    else:
                        t = tracks[k]; t["box"] = box; t["last_seen"] = frame_idx
                        if dist_m is not None: t["last_dist"] = dist_m
                    continue

                k = match_track_iou(tracks, name, box, iou_thresh=0.35)
                if k is None:
                    if (tnow - last_any_spoken) >= float(args.speak_min_gap):
                        phrases.append(f"{name} {dist_m:.1f} meters" if (dist_m is not None and np.isfinite(dist_m)) else f"{name}")
                        last_spoken_per_class[name] = tnow
                        last_any_spoken = tnow
                    tid = max(tracks.keys(), default=0) + 1
                    tracks[tid] = {"name": name, "box": box, "last_seen": frame_idx,
                                   "last_dist": dist_m if dist_m is not None else float('inf'),
                                   "announced_at": tnow}
                else:
                    t = tracks[k]
                    t["box"] = box; t["last_seen"] = frame_idx
                    if dist_m is not None and np.isfinite(dist_m):
                        prev = t.get("last_dist", float('inf'))
                        rel_drop = (prev - dist_m) / max(prev, 1e-6)
                        abs_drop = prev - dist_m
                        if (tnow - t.get("announced_at", 0.0)) >= float(args.reannounce_time) and \
                           (rel_drop >= float(args.reannounce_drop) or abs_drop >= float(args.reannounce_abs)) and \
                           (tnow - last_any_spoken) >= float(args.speak_min_gap):
                            phrases.append(f"{name} {dist_m:.1f} meters")
                            last_spoken_per_class[name] = tnow
                            last_any_spoken = tnow
                            t["announced_at"] = tnow
                        t["last_dist"] = dist_m
        elif args.speak:
            detections.sort(key=lambda d: (0 if (d[2] is not None and np.isfinite(d[2])) else 1,
                                           d[2] if d[2] is not None else 1e9))
            for name, _, dist_m in detections[:max(1, int(args.max_announce))]:
                if (tnow - last_spoken_per_class.get(name, 0.0)) >= float(args.cooldown) and \
                   (tnow - last_any_spoken) >= float(args.speak_min_gap):
                    phrases.append(f"{name} {dist_m:.1f} meters" if (dist_m is not None and np.isfinite(dist_m)) else f"{name}")
                    last_spoken_per_class[name] = tnow
                    last_any_spoken = tnow

        if args.speak and phrases:
            sentence = ", ".join([p.capitalize() for p in phrases]) + " ahead"
            print("[announce]", sentence, flush=True)
            speak(sentence, enable=True)

        # HUD
        if has_depth:
            calib_txt = f"Calib: {'OK' if k_scale is not None else 'Press c'}"
            cv2.putText(overlay, calib_txt, (12, 52),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)

        # FPS
        frames += 1
        now = time.time()
        if args.show_fps and (now - last_fps_t) >= 1.0:
            fps = frames / (now - last_fps_t)
            last_fps_t = now; frames = 0
            cv2.putText(overlay, f"FPS: {fps:.1f}", (12, 28),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2, cv2.LINE_AA)

        # show
        cv2.imshow('SemNav — Webcam', overlay)
        if writer: writer.write(overlay)

        # keys
        key = cv2.waitKey(1) & 0xFF
        if key in (27, ord('q')):
            break
        if key == ord('c') and has_depth and (inv_depth_full is not None):
            Hc, Wc = inv_depth_full.shape
            cx1, cy1 = int(Wc*0.45), int(Hc*0.45)
            cx2, cy2 = int(Wc*0.55), int(Hc*0.55)
            roi = inv_depth_full[cy1:cy2, cx1:cx2]
            if roi.size > 0:
                inv_med = float(np.median(roi))
                if inv_med > 1e-6:
                    k_scale = float(args.calib_distance) * inv_med
                    print(f"[calib] k_scale set to {k_scale:.4f} (maps center ROI to {args.calib_distance} m)", flush=True)
                    speak("Calibration set", enable=args.speak)

        frame_idx += 1

    cap.release()
    if writer: writer.release()
    cv2.destroyAllWindows()
    print("Webcam demo finished.", flush=True)

if __name__ == '__main__':
    main()

[paths]
  weights     : /Users/jayamdaxeshkumarshah/semnav_terrain/export/model.onnx
  pseudo-depth: /Users/jayamdaxeshkumarshah/semnav_terrain/models/midas_small.onnx
  det-onnx    : /Users/jayamdaxeshkumarshah/semnav_terrain/models/yolov5n.onnx
Webcam demo finished.


In [3]:
%cd ~/semnav_terrain

!python -u webcam_demo.py \
  --weights "$PWD/export/model.onnx" \
  --backend onnx \
  --input-size 512 384 \
  --classes ground sidewalk stairs water person car sky \
  --free-space ground sidewalk \
  --pseudo-depth "$PWD/models/midas_small.onnx" \
  --det-onnx    "$PWD/models/yolov5n.onnx" \
  --det-size 640 --det-interval 2 --det-thresh 0.35 \
  --max-distance 5.0 \
  --speak \
  --speak-classes \
  person bicycle car motorcycle airplane bus train truck boat "notebook" "traffic light" "fire hydrant" "stop sign" "parking meter" bench bird cat dog horse sheep cow elephant bear zebra giraffe backpack umbrella handbag tie suitcase frisbee skis snowboard "sports ball" kite "baseball bat" "baseball glove" skateboard surfboard "tennis racket" bottle "wine glass" cup fork knife spoon bowl banana apple sandwich orange broccoli carrot "hot dog" pizza donut cake chair couch "potted plant" bed "dining table" toilet tv laptop mouse remote keyboard "cell phone" microwave oven toaster sink refrigerator book clock vase scissors "teddy bear" "hair drier" toothbrush \
  --cooldown 8 --speak-min-gap 1.5 --speak-per-class-gap 7 \
  --speak-miss-frames 90 --reannounce-drop 0.6 --reannounce-abs 1.5 --reannounce-time 12 \
  --alpha 0.35 --free-alpha 0.10 --show-fps

/Users/jayamdaxeshkumarshah/semnav_terrain
[announce] Person ahead
[calib] k_scale set to 1712.0177 (maps center ROI to 2.0 m)
[announce] Person 2.1 meters ahead
[announce] Chair 2.0 meters ahead
[announce] Cup 2.6 meters ahead
Webcam demo finished.


In [10]:
!mkdir -p semnav_terrain/models
!curl -L "https://huggingface.co/julienkay/sentis-MiDaS/resolve/main/onnx/midas_v21_small_256.onnx" -o semnav_terrain/models/midas_small.onnx

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1337  100  1337    0     0   6037      0 --:--:-- --:--:-- --:--:--  6049
100 63.3M  100 63.3M    0     0  46.8M      0  0:00:01  0:00:01 --:--:-- 73.6M


In [8]:
# Writes a Makefile with simple CPU targets.
from pathlib import Path
import textwrap

root = Path("semnav_terrain")
root.mkdir(parents=True, exist_ok=True)

mk = textwrap.dedent("""\
.PHONY: env train export demo

env:
\tpip install -r requirements.txt

train:
\tpython train.py --cfg configs/deeplabv3_mbv3.yaml --fold 0

export:
\tpython infer.py --weights runs/fold0/best.pth --export onnx --output export/model.onnx

demo:
\tpython webcam_demo.py --weights export/model.onnx --backend onnx --input-size 512 384 --classes ground sidewalk stairs water person car sky --free-space ground sidewalk --show-fps --save-video demo_out.mp4
""")
(root / "Makefile").write_text(mk)
print("Wrote:", (root / "Makefile").resolve())

Wrote: /Users/jayamdaxeshkumarshah/semnav_terrain/Makefile


In [9]:
# Appends a CPU quickstart to README.md (creates file if missing).
from pathlib import Path
import textwrap

root = Path("semnav_terrain")
readme_path = root / "README.md"

quickstart = textwrap.dedent("""\n
## Quickstart (CPU)
```bash
make env
make train
make export
make demo
""")

if readme_path.exists():
    readme_path.write_text(readme_path.read_text() + quickstart)
else:
    readme_path.write_text("# SemNav Terrain\n" + quickstart)

print("Updated:", readme_path.resolve())

Updated: /Users/jayamdaxeshkumarshah/semnav_terrain/README.md


In [10]:

# Prints a minimal status against the webcam demo checklist.
from pathlib import Path

root = Path("semnav_terrain")
ok_onnx   = (root / "export" / "model.onnx").exists()
ok_ts     = (root / "export" / "model.ts").exists()
ok_export = ok_onnx and ok_ts
ok_metrics= (root / "runs" / "fold0" / "val_metrics.csv").exists()
ok_free   = True  # free-space overlay renders if you ran the webcam demo

print("Acceptance Checklist (Webcam Demo)")
print("[ ] Mean mIoU ≥ 55% (5-fold) for RGB baseline  --> train longer/more folds")
print(f"[{'x' if ok_onnx else ' '}] Webcam overlay available (ONNX exported)")
print(f"[{'x' if ok_free else ' '}] Free-space mask rendered and reasonably stable")
print("[ ] (Optional) Pseudo-depth Recall@2m displayed (skipped)")
print(f"[{'x' if ok_export else ' '}] Exported ONNX & TorchScript exist")

Acceptance Checklist (Webcam Demo)
[ ] Mean mIoU ≥ 55% (5-fold) for RGB baseline  --> train longer/more folds
[x] Webcam overlay available (ONNX exported)
[x] Free-space mask rendered and reasonably stable
[ ] (Optional) Pseudo-depth Recall@2m displayed (skipped)
[x] Exported ONNX & TorchScript exist


In [11]:
# What this cell does:
# - Zips key artifacts: checkpoints, export models, reports, demo video (if exists).

import zipfile
from pathlib import Path

root = Path("semnav_terrain")
bundle = root / "handover_bundle.zip"
with zipfile.ZipFile(bundle, 'w', zipfile.ZIP_DEFLATED) as z:
    for p in [
        root / "runs" / "fold0" / "best.pth",
        root / "export" / "model.onnx",
        root / "export" / "model.ts",
        root / "reports" / "summary.csv",
    ]:
        if p.exists():
            z.write(p, p.relative_to(root).as_posix())
    # include figures and optional demo video
    for fig in (root / "reports" / "figs").glob("*.png"):
        z.write(fig, fig.relative_to(root).as_posix())
    demo = root / "demo_out.mp4"
    if demo.exists():
        z.write(demo, demo.relative_to(root).as_posix())

print("Created:", bundle)

Created: semnav_terrain/handover_bundle.zip
