In [1]:
# 第一块：抽帧 + YOLO检测写入文件（自包含，不导入 PS_yolov3）
import os, json, subprocess, shutil, tempfile
from pathlib import Path
from typing import Dict, List, Tuple

from PIL import Image as PILImage
import numpy as np

# 路径配置（如需，请按你的设备环境调整）
DARKNET_BIN = "/opt/darknet_ab/darknet"                 # Darknet 可执行文件
SRC_DIR     = "."      # 图片/视频与结果目录根
MODELS_DIR  = "models"   # 模型目录

# YOLOv3-tiny 模型配置
CFG_PATH     = os.path.join(MODELS_DIR, "yolov3-tiny.cfg")
WEIGHTS_PATH = os.path.join(MODELS_DIR, "yolov3-tiny.weights")
NAMES_PATH   = os.path.join(MODELS_DIR, "coco.names")
DATA_PATH    = os.path.join(MODELS_DIR, "coco.data")    # 自动生成，绑定 names
CONF_THRESH  = 0.55                                     # 置信度阈值

def _write_coco_data(data_path: str, names_path: str, classes: int = 80):
    lines = []
    with open(names_path, "r", encoding="utf-8") as f:
        for t in f:
            t = t.strip()
            if t:
                lines.append(t)
    classes = len(lines) if lines else classes
    content = f"""classes={classes}
names={names_path}
train=ignored.txt
valid=ignored.txt
backup=backup/
"""
    with open(data_path, "w", encoding="utf-8") as f:
        f.write(content)

def run_darknet_v3_inmem(image_path: str, thresh: float = CONF_THRESH):
    exec_dir = Path(DARKNET_BIN).parent
    assert os.path.exists(DARKNET_BIN), f"未找到 Darknet: {DARKNET_BIN}"
    assert os.path.exists(CFG_PATH),     f"未找到 cfg: {CFG_PATH}"
    assert os.path.exists(WEIGHTS_PATH), f"未找到 weights: {WEIGHTS_PATH}"
    assert os.path.exists(NAMES_PATH),   f"未找到 coco.names: {NAMES_PATH}"
    assert os.path.exists(image_path),   f"未找到图片: {image_path}"
    _write_coco_data(DATA_PATH, NAMES_PATH)

    with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tf:
        out_json_path = tf.name
    cmd = [
        DARKNET_BIN, "detector", "test",
        DATA_PATH, CFG_PATH, WEIGHTS_PATH, image_path,
        "-thresh", str(thresh), "-dont_show", "-ext_output", "-out", str(out_json_path)
    ]
    print("Running:", " ".join(cmd))
    proc = subprocess.run(cmd, cwd=str(exec_dir), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    print(proc.stdout)
    if proc.returncode != 0:
        print(proc.stderr or proc.stdout)
        print(f"Darknet 执行失败：{image_path}")
        try: os.path.exists(out_json_path) and os.unlink(out_json_path)
        except Exception: pass
        return False, None

    if not os.path.exists(out_json_path):
        print(f"未生成 JSON：{out_json_path}")
        return False, None

    try:
        with open(out_json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    finally:
        try: os.unlink(out_json_path)
        except Exception: pass
    return True, data

def list_videos(dir_path: str, exts=("mp4", "avi", "mov", "mkv")) -> List[str]:
    p = Path(dir_path)
    files = []
    for ext in exts:
        files += sorted([str(x) for x in p.glob(f"*.{ext}")])
    return files

def extract_video_frames(video_path: str, out_dir: str, sample_every: int = 60, resize_to: tuple = (416, 416)) -> Tuple[List[str], float]:
    import cv2
    cap = cv2.VideoCapture(video_path)
    assert cap.isOpened(), f"无法打开视频: {video_path}"
    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    Path(out_dir).mkdir(parents=True, exist_ok=True)
    saved = []; idx = 0; frame_id = 0
    while True:
        ret, frame = cap.read()
        if not ret: break
        if idx % sample_every != 0:
            idx += 1; continue
        if resize_to:
            frame = cv2.resize(frame, resize_to, interpolation=cv2.INTER_LINEAR)
        fname = Path(out_dir) / f"frame_{frame_id:06d}.jpg"
        cv2.imwrite(str(fname), frame)
        saved.append(str(fname))
        frame_id += 1; idx += 1
    cap.release()
    return saved, float(fps)

def _result_root_for_video(video_path: str) -> Path:
    stem = Path(video_path).stem
    return Path(SRC_DIR) / "result" / stem

def _ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def _write_json(path: Path, obj):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)

def yolo_stage_dump(
    video_dir: str,
    sample_every: int = 10,
    resize_to: tuple = (416, 416),
    thresh: float = CONF_THRESH,
) -> Dict[str, Dict]:
    """
    抽帧到 SRC/result/<stem>/frames；每帧YOLO推理写入 SRC/result/<stem>/detections；
    生成 index.json + meta.json。
    返回：{ video_path: {\"result_dir\": str, \"frames\": int, \"fps\": float } }
    """
    session = {}
    videos = list_videos(video_dir)
    if not videos:
        print(f"目录内未找到视频：{video_dir}")
        return session

    for v in videos:
        result_root = _result_root_for_video(v)
        frames_dir = result_root / "frames"
        dets_dir = result_root / "detections"
        _ensure_dir(frames_dir); _ensure_dir(dets_dir)

        # 抽帧
        frames, fps = extract_video_frames(v, str(frames_dir), sample_every=sample_every, resize_to=resize_to)
        if not frames:
            print(f"跳过（未抽到帧）：{v}")
            session[v] = {"result_dir": str(result_root), "frames": 0, "fps": fps}
            continue

        # 推理并落盘 JSON
        index_entries = []
        for idx, fpath in enumerate(frames):
            ok, data_obj = run_darknet_v3_inmem(fpath, thresh=thresh)
            if not ok: continue
            det_file = dets_dir / f"frame_{idx:06d}.json"
            _write_json(det_file, data_obj)
            index_entries.append({
                "frame_id": idx,
                "frame_file": os.path.relpath(fpath, str(result_root)),
                "det_file": os.path.relpath(str(det_file), str(result_root)),
            })

        fps_out = max(1.0, float(fps) / max(1, float(sample_every)))
        _write_json(result_root / "meta.json", {
            "video_path": v,
            "sample_every": sample_every,
            "resize_to": list(resize_to) if resize_to else None,
            "fps_in": fps,
            "fps_out": fps_out,
            "frames_total": len(frames),
            "frames_detected": len(index_entries),
        })
        _write_json(result_root / "index.json", index_entries)

        print(f"阶段一完成：{v} → 结果目录 {result_root}（写入 {len(index_entries)} 帧的检测JSON）")
        session[v] = {"result_dir": str(result_root), "frames": len(index_entries), "fps": fps}
    return session

# 示例调用：把视频放在 SRC/videos 下
video_dir = SRC_DIR
s1 = yolo_stage_dump(video_dir, sample_every=10, resize_to=(416, 416), thresh=CONF_THRESH)
print("阶段一结果：", {vp: info.get("frames", 0) for vp, info in s1.items()})

Running: /opt/darknet_ab/darknet detector test /home/xilinx/jupyter_notebooks/models/coco.data /home/xilinx/jupyter_notebooks/models/yolov3-tiny.cfg /home/xilinx/jupyter_notebooks/models/yolov3-tiny.weights /home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000000.jpg -thresh 0.55 -dont_show -ext_output -out /tmp/tmpqohl3nud.json
 GPU isn't used 
mini_batch = 1, batch = 1, time_steps = 1, train = 0 

 seen 64, trained: 32013 K-images (500 Kilo-batches_64) 
 Detection layer: 16 - type = 28 
 Detection layer: 23 - type = 28 
/home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000000.jpg: Predicted in 64292.552000 milli-seconds.
bird: 72%	(left_x:  228   top_y:  252   width:   70   height:  127)

Running: /opt/darknet_ab/darknet detector test /home/xilinx/jupyter_notebooks/models/coco.data /home/xilinx/jupyter_notebooks/models/yolov3-tiny.cfg /home/xilinx/jupyter_notebooks/models/yolov3-tiny.weights /home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000001.jpg -t

 GPU isn't used 
mini_batch = 1, batch = 1, time_steps = 1, train = 0 

 seen 64, trained: 32013 K-images (500 Kilo-batches_64) 
 Detection layer: 16 - type = 28 
 Detection layer: 23 - type = 28 
/home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000011.jpg: Predicted in 64183.739000 milli-seconds.

Running: /opt/darknet_ab/darknet detector test /home/xilinx/jupyter_notebooks/models/coco.data /home/xilinx/jupyter_notebooks/models/yolov3-tiny.cfg /home/xilinx/jupyter_notebooks/models/yolov3-tiny.weights /home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000012.jpg -thresh 0.55 -dont_show -ext_output -out /tmp/tmp9x8n0jmb.json
 GPU isn't used 
mini_batch = 1, batch = 1, time_steps = 1, train = 0 

 seen 64, trained: 32013 K-images (500 Kilo-batches_64) 
 Detection layer: 16 - type = 28 
 Detection layer: 23 - type = 28 
/home/xilinx/jupyter_notebooks/src/result/bird/frames/frame_000012.jpg: Predicted in 64217.584000 milli-seconds.

Running: /opt/darknet_ab/darknet dete

In [5]:
import os
import re
import json
import struct
import time
from pathlib import Path

import cv2
from PIL import Image as PILImage, ImageDraw, ImageFont

# 路径/参数配置
SRC_DIR = "."
TARGET_CLASSES = None  # 例如 ["person"]; None 表示不过滤

# 输出视频编码尝试
def _try_open_video_writer(out_base: str, frame_size: tuple, fps: float):
    w, h = int(frame_size[0]), int(frame_size[1])
    attempts = [
        ("mp4v", out_base + ".mp4"),
        ("XVID", out_base + ".avi"),
        ("MJPG", out_base + ".avi"),
    ]
    for fourcc_name, path in attempts:
        fourcc = cv2.VideoWriter_fourcc(*fourcc_name)
        writer = cv2.VideoWriter(path, fourcc, float(max(1.0, fps)), (w, h))
        if writer is not None and writer.isOpened():
            print(f"视频写出已打开：{path}（编码器 {fourcc_name}）")
            return writer, path
    return None, None

# 绘图兼容
def _draw_rect_compat(draw, x0, y0, x1, y1, color=(0, 255, 0), thickness=2):
    try:
        draw.rectangle([x0, y0, x1, y1], outline=color, width=thickness)
    except TypeError:
        for t in range(thickness):
            draw.rectangle([x0 - t, y0 - t, x1 + t, y1 + t], outline=color)

def _draw_circle_compat(draw, cx, cy, radius, color=(255, 0, 0), thickness=2):
    try:
        draw.ellipse([(cx - radius, cy - radius), (cx + radius, cy + radius)], outline=color, width=thickness)
    except TypeError:
        for t in range(thickness):
            r = radius + t
            draw.ellipse([(cx - r, cy - r), (cx + r, cy + r)], outline=color)

# 结果目录工具
def _result_root_for_video(video_path: str) -> Path:
    stem = Path(video_path).stem
    return Path(SRC_DIR) / "result" / stem

def _ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

def _read_json(path: Path):
    try:
        with open(str(path), "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception:
        return None

# 渲染一帧（检测/UKF）
def _render_frame_bgr(image_path: str, data_obj, ukf_x=None, target_classes=None):
    img = PILImage.open(image_path).convert("RGB")
    w, h = img.size
    draw = ImageDraw.Draw(img)
    try:
        font = ImageFont.load_default()
    except Exception:
        font = None

    data = data_obj
    objs = []
    if isinstance(data, list):
        for item in data:
            if isinstance(item, dict):
                if "objects" in item and isinstance(item["objects"], list):
                    objs.extend(item["objects"])
                else:
                    objs.append(item)
    elif isinstance(data, dict):
        if "objects" in data and isinstance(data["objects"], list):
            objs = data["objects"]
        elif "detections" in data and isinstance(data["detections"], list):
            objs = data["detections"]

    for obj in objs:
        name = obj.get("name") or obj.get("class") or "obj"
        conf = float(obj.get("confidence", 0.0))
        rc = obj.get("relative_coordinates") or obj.get("bbox") or {}
        if target_classes is not None and name not in target_classes:
            continue
        cx = float(rc.get("center_x", 0.5)); cy = float(rc.get("center_y", 0.5))
        bw = float(rc.get("width", 0.0));    bh = float(rc.get("height", 0.0))
        cxp = int(cx * w); cyp = int(cy * h)
        bwp = int(bw * w); bhp = int(bh * h)
        x0 = max(0, cxp - bwp // 2); y0 = max(0, cyp - bhp // 2)
        x1 = min(w - 1, cxp + bwp // 2); y1 = min(h - 1, cyp + bhp // 2)
        _draw_rect_compat(draw, x0, y0, x1, y1, (0, 255, 0), thickness=2)
        label = f"{name} {conf:.2f}"
        if font:
            draw.text((x0 + 2, y0 + 2), label, fill=(255, 0, 0), font=font)
        else:
            draw.text((x0 + 2, y0 + 2), label, fill=(255, 0, 0))

    if ukf_x is not None and len(ukf_x) >= 2:
        cxp = int(float(ukf_x[0]) * w)
        cyp = int(float(ukf_x[1]) * h)
        _draw_circle_compat(draw, cxp, cyp, radius=4, color=(255, 0, 0), thickness=2)

    arr = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    return img, arr

# 归一化检测 + 测量选择
import numpy as np

def _normalize_detections(data_obj):
    objs = []
    data = data_obj
    if isinstance(data, list):
        for item in data:
            if isinstance(item, dict) and "objects" in item and isinstance(item["objects"], list):
                objs.extend(item["objects"])
            elif isinstance(item, dict):
                objs.append(item)
    elif isinstance(data, dict):
        if "objects" in data and isinstance(data["objects"], list):
            objs = data["objects"]
        elif "detections" in data and isinstance(data["detections"], list):
            objs = data["detections"]

    norm = []
    for obj in objs:
        name = obj.get("name") or obj.get("class") or "obj"
        conf = float(obj.get("confidence", 0.0))
        rc = obj.get("relative_coordinates") or obj.get("bbox") or {}
        cx = float(rc.get("center_x", rc.get("cx", 0.5)))
        cy = float(rc.get("center_y", rc.get("cy", 0.5)))
        w  = float(rc.get("width", rc.get("w", 0.0)))
        h  = float(rc.get("height", rc.get("h", 0.0)))
        cx = max(0.0, min(1.0, cx))
        cy = max(0.0, min(1.0, cy))
        w  = max(0.0, min(1.0, w))
        h  = max(0.0, min(1.0, h))
        norm.append({"class": name, "confidence": conf, "cx": cx, "cy": cy, "w": w, "h": h})
    return norm

def _pick_measurement(norm_list, target_classes=None):
    if not norm_list:
        return None
    candidates = norm_list
    if target_classes:
        s = set(target_classes)
        candidates = [o for o in norm_list if o.get("class") in s] or norm_list
    best = None
    for o in candidates:
        if best is None or float(o.get("confidence", 0.0)) > float(best.get("confidence", 0.0)):
            best = o
    if best is None:
        return None
    return [float(best.get("cx", 0.5)), float(best.get("cy", 0.5))]

# UKF（PL 端）硬件接口
BITSTREAM_PATH = "design_1.bit"
UKF_INSTANCE_HINTS = ("ukf0", "ukf0_0", "ukf_accel_step_0")

UKF_ADDR_AP_CTRL   = 0x00
UKF_ADDR_Q         = 0x18
UKF_ADDR_R         = 0x20
UKF_ADDR_Z_BASE    = 0x10
UKF_ADDR_XIN_BASE  = 0x30
UKF_ADDR_SIN_BASE  = 0x40
UKF_ADDR_XOUT_BASE = 0x80
UKF_ADDR_SOUT_BASE = 0xC0

UKF_MMIO_BASE  = 0x40000000
UKF_MMIO_RANGE = 0x10000  # 64KB

def _float_to_u32(f: float):
    return struct.unpack("<I", struct.pack("<f", float(f)))[0]

def _u32_to_float(u: int):
    return struct.unpack("<f", struct.pack("<I", int(u)))[0]

def _find_ukf_ip(overlay):
    keys = list(getattr(overlay, "ip_dict", {}).keys())
    for k in keys:
        info = overlay.ip_dict.get(k, {})
        t = info.get("type") or ""
        if "ukf_accel_step" in t:
            return getattr(overlay, k)
    for hint in UKF_INSTANCE_HINTS:
        if hasattr(overlay, hint):
            return getattr(overlay, hint)
    if keys:
        return getattr(overlay, keys[0])
    raise RuntimeError("未找到 UKF IP 实例")

def _prepare_ukf_ip_or_mmio(bitstream_path: str = BITSTREAM_PATH):
    try:
        from pynq import Overlay
        ol = Overlay(bitstream_path)
        ol.download()
        try:
            ip = _find_ukf_ip(ol)
            print("UKF IP 通过 HWH 识别并加载")
            return ol, ip
        except Exception:
            from pynq import MMIO
            ip = MMIO(UKF_MMIO_BASE, UKF_MMIO_RANGE)
            print("UKF MMIO 已准备（HWH 未提供 IP 映射）")
            return ol, ip
    except Exception as e:
        from pynq import Bitstream, MMIO
        bit = Bitstream(bitstream_path)
        bit.download()
        ip = MMIO(UKF_MMIO_BASE, UKF_MMIO_RANGE)
        print(f"Overlay 加载失败，已回退到 MMIO：{e}")
        return None, ip

def ukf_step_hw(ip, z, q, r, x_in, S_in):
    ip.write(UKF_ADDR_Q, _float_to_u32(q))
    ip.write(UKF_ADDR_R, _float_to_u32(r))
    for i in range(2):
        ip.write(UKF_ADDR_Z_BASE + 4 * i, _float_to_u32(z[i]))
    for i in range(4):
        ip.write(UKF_ADDR_XIN_BASE + 4 * i, _float_to_u32(x_in[i]))
    base = UKF_ADDR_SIN_BASE
    for i in range(4):
        for j in range(4):
            ip.write(base + 4 * (i * 4 + j), _float_to_u32(S_in[i][j]))
    ip.write(UKF_ADDR_AP_CTRL, 1)
    while (ip.read(UKF_ADDR_AP_CTRL) & 0x2) == 0:
        pass
    x_out = [_u32_to_float(ip.read(UKF_ADDR_XOUT_BASE + 4 * i)) for i in range(4)]
    S_out = [[_u32_to_float(ip.read(UKF_ADDR_SOUT_BASE + 4 * (i * 4 + j))) for j in range(4)] for i in range(4)]
    return x_out, S_out

# 阶段二主函数（自包含）
def stage2_consume_hw(
    video_dir: str,
    save_video: bool = True,
    use_ukf: bool = False,
    bitstream_path: str = BITSTREAM_PATH,
    ukf_q: float = 0.05,
    ukf_r: float = 0.05,
    target_classes=None,
):
    """
    阶段二（消费阶段）：读取阶段一生成的 frames + detections JSON，
    叠加（可选）UKF 点并写出视频到 SRC/result/<stem>/out，文件名区分为 video_result_yolov3.*。
    """
    import cv2  # 确保依赖存在
    session = {}

    # UKF 耗时统计（跨所有视频）
    ukf_time_sum_total = 0.0
    ukf_calls_total = 0
    ukf_skipped_no_meas = 0

    print(f"[Stage2] use_ukf={use_ukf}, bitstream='{bitstream_path}'", flush=True)

    ol = None
    ip = None
    # x_state 与 S_state 已移至循环内初始化
    if use_ukf:
        try:
            ol, ip = _prepare_ukf_ip_or_mmio(bitstream_path)
            print("UKF 接口就绪", flush=True)
        except Exception as e:
            print(f"UKF 初始化失败：{e}，将禁用 UKF", flush=True)
            use_ukf = False
    else:
        print("UKF 未启用，跳过耗时统计", flush=True)

    # 自包含枚举视频文件
    video_dir_p = Path(video_dir)
    try:
        videos = sorted([str(p) for p in video_dir_p.iterdir() if p.is_file() and p.suffix.lower() in (".mp4", ".avi", ".mov", ".mkv")])
    except Exception:
        videos = []
    
    if not videos:
        print(f"目录内未找到视频：{video_dir}", flush=True)
        return session

    for v in videos:
        # 在每个视频开始前重置 UKF 状态，防止视频间状态混淆
        x_state = None
        S_state = [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.0], [0.0, 0.0, 0.0, 1.0]]
        
        stem = Path(v).stem
        result_root = _result_root_for_video(v)
        frames_dir = result_root / "frames"
        dets_dir = result_root / "detections"
        if not frames_dir.exists():
            print(f"跳过 {v}：缺少 frames 目录 {frames_dir}", flush=True)
            session[v] = {"result_dir": str(result_root), "frames": 0}
            continue

        meta_path = result_root / "meta.json"
        index_path = result_root / "index.json"
        fps_out = 10.0
        if meta_path.exists():
            meta = _read_json(meta_path) or {}
            try:
                fps_out = float(meta.get("fps_out", fps_out))
            except Exception:
                pass
        fps_out = max(1.0, fps_out)

        entries = []
        if index_path.exists():
            entries = _read_json(index_path) or []
        if not entries:
            # 回退扫描 frames/detections
            frame_files = sorted([p for p in frames_dir.iterdir() if p.suffix.lower() in (".jpg", ".jpeg", ".png")])
            import re
            pat = re.compile(r"^frame_(\d+)\.(jpg|jpeg|png)$", re.IGNORECASE)
            for p in frame_files:
                m = pat.match(p.name)
                idx = int(m.group(1)) if m else None
                det_rel = None
                if idx is not None:
                    cand = dets_dir / f"frame_{idx:06d}.json"
                    if cand.exists():
                        det_rel = str(cand.relative_to(result_root))
                entry = {"frame_file": str(p.relative_to(result_root))}
                if det_rel:
                    entry["det_file"] = det_rel
                entries.append(entry)
        if not entries:
            print(f"跳过 {v}：未找到 index.json 且 frames 为空", flush=True)
            session[v] = {"result_dir": str(result_root), "frames": 0}
            continue

        writer = None
        out_vid_path = None
        frames_count = 0
        for idx, entry in enumerate(entries):
            frame_rel = entry.get("frame_file")
            det_rel = entry.get("det_file")
            frame_path = str(result_root / frame_rel)
            det_path = str(result_root / det_rel) if det_rel else None

            # 读取检测 JSON（若存在）
            if det_path and os.path.exists(det_path):
                data_obj = _read_json(det_path) or []
            else:
                data_obj = []

            # UKF 更新（基于检测）
            if use_ukf:
                try:
                    norm = _normalize_detections(data_obj)
                except Exception:
                    norm = []
                z = _pick_measurement(norm, target_classes or TARGET_CLASSES)
                if z is not None:
                    if x_state is None:
                        x_state = [z[0], z[1], 0.0, 0.0]
                    try:
                        t0 = time.perf_counter()
                        x_out, S_out = ukf_step_hw(ip, z, ukf_q, ukf_r, x_state, S_state)
                        t1 = time.perf_counter()
                        dt_ms = (t1 - t0) * 1000.0
                        ukf_time_sum_total += (t1 - t0)
                        ukf_calls_total += 1
                        print(f"UKF 调用耗时：{dt_ms:.3f} ms", flush=True)
                        x_state, S_state = x_out, S_out
                    except Exception as e:
                        print(f"UKF 调用失败：{e}", flush=True)
                else:
                    ukf_skipped_no_meas += 1

            # 渲染叠加
            try:
                img_pil, frame_bgr = _render_frame_bgr(
                    frame_path, data_obj, ukf_x=(x_state if use_ukf else None), target_classes=(target_classes or TARGET_CLASSES)
                )
            except Exception as e:
                print(f"渲染失败：{e}", flush=True)
                img_pil, frame_bgr = None, None

            # 视频写出
            if save_video and frame_bgr is not None:
                if writer is None:
                    w_out, h_out = (img_pil.size if img_pil is not None else (None, None))
                    if w_out is None:
                        try:
                            _img = cv2.imread(frame_path, cv2.IMREAD_COLOR)
                            if _img is not None:
                                h_out, w_out = _img.shape[:2]
                            else:
                                w_out, h_out = 416, 416
                        except Exception:
                            w_out, h_out = 416, 416
                    out_dir = result_root / "out"
                    _ensure_dir(out_dir)
                    out_base = str(out_dir / "video_result_yolov3")
                    writer, out_vid_path = _try_open_video_writer(out_base, (w_out, h_out), fps_out)
                    if writer is None:
                        print("视频写出不可用（编码器打开失败），将跳过保存视频", flush=True)
                        save_video = False
                if writer is not None:
                    try:
                        writer.write(frame_bgr)
                        frames_count += 1
                    except Exception as e:
                        print(f"视频写出失败：{e}", flush=True)

        # 释放资源与总结
        if writer is not None:
            try:
                writer.release()
            except Exception:
                pass
            print(f"{v} 输出视频已保存：{out_vid_path}，帧率约 {fps_out:.2f} fps", flush=True)
        print(f"{v} 完成：消费 {frames_count} 帧", flush=True)
        session[v] = {"result_dir": str(result_root), "frames": frames_count}

    # 打印 UKF 总结（若启用）
    if use_ukf:
        if ukf_calls_total > 0:
            avg_ms = (ukf_time_sum_total / ukf_calls_total) * 1000.0
            print(f"UKF 平均耗时：{avg_ms:.3f} ms（{ukf_calls_total} 次调用；无测量跳过 {ukf_skipped_no_meas} 帧）", flush=True)
        else:
            print(f"UKF 未产生有效调用（可能无检测或过滤导致；无测量跳过 {ukf_skipped_no_meas} 帧）", flush=True)

    return session

if __name__ == "__main__":
    video_dir = SRC_DIR
    print("运行阶段二：从阶段一的产物读取进行渲染与视频写出（硬件UKF可选）")
    res = stage2_consume_hw(
        video_dir,
        save_video=True,
        use_ukf=True,          # 若设备具备 PYNQ 与 bitstream，可改 True
        bitstream_path=BITSTREAM_PATH,
        ukf_q=0.05,
        ukf_r=0.05,
    )
    print("阶段二结果:", res)

运行阶段二：从阶段一的产物读取进行渲染与视频写出（硬件UKF可选）
[Stage2] use_ukf=True, bitstream='/home/xilinx/jupyter_notebooks/design_1.bit'
UKF IP 通过 HWH 识别并加载
UKF 接口就绪
UKF 调用耗时：2.424 ms
视频写出已打开：/home/xilinx/jupyter_notebooks/src/result/bird/out/video_result_yolov3.mp4（编码器 mp4v）
UKF 调用耗时：2.357 ms
UKF 调用耗时：2.369 ms
UKF 调用耗时：2.460 ms
UKF 调用耗时：2.373 ms
UKF 调用耗时：2.372 ms
UKF 调用耗时：2.375 ms
UKF 调用耗时：2.473 ms
/home/xilinx/jupyter_notebooks/src/videos4/bird.mp4 输出视频已保存：/home/xilinx/jupyter_notebooks/src/result/bird/out/video_result_yolov3.mp4，帧率约 2.40 fps
/home/xilinx/jupyter_notebooks/src/videos4/bird.mp4 完成：消费 14 帧
UKF 平均耗时：2.400 ms（8 次调用；无测量跳过 6 帧）
阶段二结果: {'/home/xilinx/jupyter_notebooks/src/videos4/bird.mp4': {'result_dir': '/home/xilinx/jupyter_notebooks/src/result/bird', 'frames': 14}}
