In [None]:
# Cell 1: Environment setup
!nvidia-smi

# Install core dependencies (PyTorch + tools)
!pip install --quiet torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install --quiet opencv-python tqdm onnx onnxruntime-gpu
!pip install addict scikit-learn pathspec imagesize ujson tensorboard
!pip install onnxscript

In [None]:
# Cell 2: Clone UFLDv2 repo and move into it
%cd /content
!git clone https://github.com/cfzd/Ultra-Fast-Lane-Detection-v2.git
%cd Ultra-Fast-Lane-Detection-v2

!ls

In [None]:
# Cell 3: Download KITTI raw subset for benchmarking
import os
import zipfile
from pathlib import Path
from tqdm.auto import tqdm
import requests

ROOT = Path.cwd()
DATASETS_ROOT = ROOT / "datasets"
DOWNLOADS_DIR = DATASETS_ROOT / "downloads"
KITTI_RAW_DIR = DATASETS_ROOT / "kitti_raw"

for d in [DATASETS_ROOT, DOWNLOADS_DIR, KITTI_RAW_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# Small subset of KITTI raw sequences (you can add more later)
kitti_urls = [
    "https://s3.eu-central-1.amazonaws.com/avg-kitti/raw_data/2011_09_26_drive_0015/2011_09_26_drive_0015_sync.zip",
    "https://s3.eu-central-1.amazonaws.com/avg-kitti/raw_data/2011_09_26_drive_0027/2011_09_26_drive_0027_sync.zip",
]

def download_file(url, out_dir):
    local_path = out_dir / url.split("/")[-1]
    if local_path.exists():
        print(f"[SKIP] Already exists: {local_path.name}")
        return local_path

    print(f"\nDownloading: {url}")
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length", 0))
        block_size = 1024
        with open(local_path, "wb") as f, tqdm(total=total, unit="B", unit_scale=True, desc=local_path.name) as pbar:
            for chunk in r.iter_content(block_size):
                f.write(chunk)
                pbar.update(len(chunk))
    return local_path

def extract_zip(zip_path, extract_to):
    print(f"Extracting {zip_path.name} -> {extract_to}")
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(extract_to)

for url in kitti_urls:
    zip_file = download_file(url, DOWNLOADS_DIR)
    extract_zip(zip_file, KITTI_RAW_DIR)

print("\nKITTI raw subset downloaded to:", KITTI_RAW_DIR)
!find datasets/kitti_raw -maxdepth 2 -type d | head


In [None]:
# Cell 4: Create weights directory and upload Tusimple ResNet18
from pathlib import Path

WEIGHTS_DIR = Path("weights")
WEIGHTS_DIR.mkdir(exist_ok=True)


In [None]:
%cd /content/Ultra-Fast-Lane-Detection-v2

In [None]:
# Cell 5: Preprocess KITTI frames for UFLDv2 benchmark

import cv2
import numpy as np
from pathlib import Path
from tqdm.auto import tqdm

import configs.tusimple_res18 as cfg  # UFLDv2 config for Tusimple ResNet18

IMG_W, IMG_H = cfg.train_width, cfg.train_height
print("Using input resolution:", IMG_W, "x", IMG_H)

ROOT = Path.cwd()
KITTI_RAW_DIR = ROOT / "datasets" / "kitti_raw" / "2011_09_26"
BENCH_IMG_DIR = ROOT / "datasets" / "processed" / "kitti_ufld_benchmark" / "images"
BENCH_IMG_DIR.mkdir(parents=True, exist_ok=True)

# Select drives we downloaded
drive_names = [
    "2011_09_26_drive_0015_sync",
    "2011_09_26_drive_0027_sync",
]

all_img_paths = []
for drive in drive_names:
    drive_dir = KITTI_RAW_DIR / drive / "image_02" / "data"
    if not drive_dir.exists():
        print("Warning: drive not found:", drive_dir)
        continue
    imgs = sorted(drive_dir.glob("*.png"))
    all_img_paths.extend(imgs)

print(f"Total raw KITTI frames found: {len(all_img_paths)}")

# Optionally limit number of frames for faster benchmarking
MAX_FRAMES = 500
all_img_paths = all_img_paths[:MAX_FRAMES]
print(f"Using {len(all_img_paths)} frames for benchmark preprocessing.")

def resize_and_save(src_paths, out_dir, img_w, img_h):
    for p in tqdm(src_paths, desc="Resizing KITTI frames"):
        img = cv2.imread(str(p))
        if img is None:
            continue
        img_resized = cv2.resize(img, (img_w, img_h))
        out_path = out_dir / f"{p.stem}.png"
        cv2.imwrite(str(out_path), img_resized)

resize_and_save(all_img_paths, BENCH_IMG_DIR, IMG_W, IMG_H)

print("Preprocessed KITTI benchmark images saved to:", BENCH_IMG_DIR)
!ls datasets/processed/kitti_ufld_benchmark/images | head


In [None]:
# Benchmark cell with DALI stub + get_model(cfg)
import time
from pathlib import Path

import cv2
import numpy as np
import torch
from tqdm.auto import tqdm

# Make sure we're in repo root
%cd /content/Ultra-Fast-Lane-Detection-v2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# -------------------------------------------------------------------
# 1) Build a proper fake nvidia.dali package hierarchy (so imports don't fail)
# -------------------------------------------------------------------
import sys, types

# Base "nvidia" package
nvidia_mod = sys.modules.get("nvidia")
if nvidia_mod is None:
    nvidia_mod = types.ModuleType("nvidia")
    nvidia_mod.__path__ = []
    sys.modules["nvidia"] = nvidia_mod

# "nvidia.dali" package
dali_mod = sys.modules.get("nvidia.dali")
if dali_mod is None:
    dali_mod = types.ModuleType("nvidia.dali")
    dali_mod.__path__ = []
    sys.modules["nvidia.dali"] = dali_mod

# "nvidia.dali.plugin" package
plugin_mod = sys.modules.get("nvidia.dali.plugin")
if plugin_mod is None:
    plugin_mod = types.ModuleType("nvidia.dali.plugin")
    plugin_mod.__path__ = []
    sys.modules["nvidia.dali.plugin"] = plugin_mod

# "nvidia.dali.plugin.pytorch" module
pytorch_mod = sys.modules.get("nvidia.dali.plugin.pytorch")
if pytorch_mod is None:
    pytorch_mod = types.ModuleType("nvidia.dali.plugin.pytorch")

    class DummyDALIGenericIterator:
        def __init__(self, *args, **kwargs):
            pass

    class DummyLastBatchPolicy:
        DROP = 0
        PARTIAL = 1

    pytorch_mod.DALIGenericIterator = DummyDALIGenericIterator
    pytorch_mod.LastBatchPolicy = DummyLastBatchPolicy

    sys.modules["nvidia.dali.plugin.pytorch"] = pytorch_mod

# "nvidia.dali.types" and "nvidia.dali.fn" stubs
types_mod = sys.modules.get("nvidia.dali.types")
if types_mod is None:
    types_mod = types.ModuleType("nvidia.dali.types")
    sys.modules["nvidia.dali.types"] = types_mod

fn_mod = sys.modules.get("nvidia.dali.fn")
if fn_mod is None:
    fn_mod = types.ModuleType("nvidia.dali.fn")
    sys.modules["nvidia.dali.fn"] = fn_mod

# Link submodules as attributes on dali_mod
dali_mod.plugin = plugin_mod
dali_mod.types = types_mod
dali_mod.fn = fn_mod

# -------------------------------------------------------------------
# 2) Import config and model (use get_model(cfg) instead of parsingNet(...))
# -------------------------------------------------------------------
import configs.tusimple_res18 as cfg
from model import model_tusimple

# This is the correct way for this repo:
# model_tusimple.get_model(cfg) internally builds parsingNet with all proper args
model = model_tusimple.get_model(cfg)
model.to(device)
model.eval()

# -------------------------------------------------------------------
# 3) Prepare benchmark images
# -------------------------------------------------------------------
IMG_W, IMG_H = cfg.train_width, cfg.train_height
print("Benchmarking with resolution:", IMG_W, "x", IMG_H)

BENCH_IMG_DIR = Path("datasets/processed/kitti_ufld_benchmark/images")
img_paths = sorted(BENCH_IMG_DIR.glob("*.png"))
print("Benchmark frames:", len(img_paths))
assert len(img_paths) > 0, "No preprocessed KITTI frames found!"

def preprocess_tensor(path):
    img = cv2.imread(str(path))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0
    img = np.transpose(img, (2, 0, 1))
    tensor = torch.from_numpy(img).unsqueeze(0)
    return tensor.to(device)

tensors = [preprocess_tensor(p) for p in img_paths]

# -------------------------------------------------------------------
# 4) Warm-up
# -------------------------------------------------------------------
with torch.no_grad():
    for t in tensors[:10]:
        _ = model(t)
    if device.type == "cuda":
        torch.cuda.synchronize()

# -------------------------------------------------------------------
# 5) Timed loop
# -------------------------------------------------------------------
NUM_PASSES = 3  # repeat passes to smooth noise
total_frames = 0
start = time.time()
with torch.no_grad():
    for _ in range(NUM_PASSES):
        for t in tensors:
            _ = model(t)
            total_frames += 1
    if device.type == "cuda":
        torch.cuda.synchronize()
end = time.time()

total_time = end - start
avg_time = total_time / total_frames
fps = 1.0 / avg_time

print(f"Total frames processed: {total_frames}")
print(f"Total time: {total_time:.3f} s")
print(f"Average latency: {avg_time * 1000:.3f} ms/frame")
print(f"Throughput: {fps:.2f} FPS on {device}")


In [None]:
# Cell 7: Export UFLDv2 Tusimple ResNet18 to ONNX for QNN later
import torch
from pathlib import Path

model.eval()

dummy_input = torch.randn(1, 3, IMG_H, IMG_W, device=device)

onnx_path = Path("weights") / f"ufldv2_tusimple_res18_{IMG_W}x{IMG_H}.onnx"
print("Exporting ONNX to:", onnx_path)

torch.onnx.export(
    model,
    dummy_input,
    onnx_path.as_posix(),
    input_names=["input"],
    output_names=["output"],
    opset_version=18,
    do_constant_folding=True,
)

print("ONNX export complete.")

# Optional: quick sanity check with ONNX Runtime
import onnxruntime as ort
import numpy as np

sess = ort.InferenceSession(onnx_path.as_posix(), providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
print("ONNX Runtime inputs:", sess.get_inputs()[0].name, sess.get_inputs()[0].shape)

# Test one frame
import cv2
test_img = cv2.imread(str(img_paths[0]))
test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
test_img = np.transpose(test_img, (2, 0, 1))[None, ...]

ort_out = sess.run(None, {"input": test_img})[0]
print("ONNX output shape:", ort_out.shape)
