In [2]:
# ==== KITTI Tracking → 카메라 전용 3D 시각화 영상 만들기 ====
import os, cv2, math, torch, numpy as np
from math import atan2
from pathlib import Path

device = "cuda" if torch.cuda.is_available() else "cpu"

# ------------------------ 경로 설정 ------------------------
TRACK_ROOT = "/home/jinjinjara1022/AutonomousDriving/datasets/KITTI_Tracking"
TRACK_TRAIN = f"{TRACK_ROOT}/training"
TRACK_TEST  = f"{TRACK_ROOT}/testing"

CKPT_2D = Path("./checkpoints/kitti2d_frcnn_all.pt")   # 있으면 사용(우리 9클래스 헤드)
CKPT_3D = "./models/mono3d_baseline.pt"                 # 반드시 존재(요청 경로)

# ------------------------ 3D 유틸 ------------------------
def compute_box_3d_fixed(dim, loc, ry):
    h,w,l = float(dim[0]), float(dim[1]), float(dim[2])
    x = [ w/2,  w/2, -w/2, -w/2,  w/2,  w/2, -w/2, -w/2]
    y = [   0,    0,    0,    0,  -h,  -h,   -h,   -h]
    z = [ l/2, -l/2, -l/2,  l/2,  l/2, -l/2, -l/2,  l/2]
    C = np.vstack([x,y,z]).astype(np.float32)
    c,s = math.cos(ry), math.sin(ry)
    R = np.array([[c,0,s],[0,1,0],[-s,0,c]], np.float32)
    return R @ C + np.array(loc, np.float32).reshape(3,1)

def project_to_image(pts3d, P):
    n = pts3d.shape[1]
    homo = np.vstack([pts3d, np.ones((1,n), np.float32)])
    uvw = P @ homo
    return uvw[:2] / np.clip(uvw[2:], 1e-6, None)

def uvz_to_xyz(uv, Z, P2):
    fx, fy, cx, cy = P2[0,0], P2[1,1], P2[0,2], P2[1,2]
    u, v = uv
    X = (u - cx) * Z / fx
    Y = (v - cy) * Z / fy
    return np.array([X, Y, Z], np.float32)

def draw_projected_box3d(img, qs, color=(0,165,255), thickness=2):
    qs = qs.T.astype(int)
    edges = [(0,1),(1,2),(2,3),(3,0),(4,5),(5,6),(6,7),(7,4),(0,4),(1,5),(2,6),(3,7)]
    for i,j in edges: cv2.line(img, tuple(qs[i]), tuple(qs[j]), color, thickness)
    return img

# ------------------------ BEV 유틸 ------------------------
def bev_canvas(W=600,H=600,bg=255): return np.full((H,W,3), bg, np.uint8)
def world_to_bev(x,z,xr=(-20,20),zr=(0,60),W=600,H=600):
    u=(x-xr[0])/(xr[1]-xr[0])*(W-1); v=H-1-(z-zr[0])/(zr[1]-zr[0])*(H-1)
    return int(round(u)), int(round(v))
def draw_rot_bev_rect(bev, xz, color, xr=(-20,20), zr=(0,60)):
    pts=[world_to_bev(X,Z,xr,zr,bev.shape[1],bev.shape[0]) for X,Z in xz]
    cv2.polylines(bev, [np.int32(pts)], True, color, 2)
def get_bottom_rect_xz(dims, loc, ry):
    C = compute_box_3d_fixed(dims, loc, ry)
    if (C[2] <= 0).any(): return None
    return list(zip(C[0,:4], C[2,:4]))
def estimate_half_fov_x(P2, img_w):
    fx,cx = P2[0,0], P2[0,2]
    return math.atan(0.5*(abs((0-cx)/fx)+abs((img_w-cx)/fx)))
def draw_fov(bev, P2, img_w, xr=(-20,20), zr=(0,60), color=(180,180,180)):
    H,W = bev.shape[:2]; th = estimate_half_fov_x(P2, img_w)
    p0 = world_to_bev(0,0,xr,zr,W,H)
    for s in (-1,1):
        p1 = world_to_bev(s*zr[1]*math.tan(th), zr[1], xr,zr,W,H)
        cv2.line(bev,p0,p1,color,2,cv2.LINE_AA)
    cv2.circle(bev, p0, 4, (0,0,255), -1)

# ------------------------ Tracking calib (시퀀스별 P2) ------------------------
def read_tracking_calib(seq_txt):
    P2=None
    with open(seq_txt,'r') as f:
        for ln in f:
            ln=ln.strip()
            if ln.startswith("P2:") or ln.startswith("P2 "):
                vals=[float(x) for x in ln.split()[1:]]
                P2=np.array(vals,np.float32).reshape(3,4)
                break
    assert P2 is not None, f"P2 not found in {seq_txt}"
    return P2

# ------------------------ ROI 전처리 ------------------------
IMAGENET_MEAN = np.array([0.485,0.456,0.406], np.float32)
IMAGENET_STD  = np.array([0.229,0.224,0.225], np.float32)
IMG_SIZE=128
PAD_SCALE=1.2

def crop_resize(img, bbox, out=IMG_SIZE, scale=PAD_SCALE):
    h,w = img.shape[:2]; l,t,r,b = bbox
    cx,cy=(l+r)/2,(t+b)/2; bw,bh=(r-l),(b-t); s=max(bw,bh)*scale
    x1,y1=int(cx-s/2),int(cy-s/2); x2,y2=int(cx+s/2),int(cy+s/2)
    pl=max(0,-x1); pt=max(0,-y1); pr=max(0,x2-w); pb=max(0,y2-h)
    if pl or pt or pr or pb:
        img=cv2.copyMakeBorder(img,pt,pb,pl,pr,cv2.BORDER_CONSTANT,value=(0,0,0))
        x1+=pl; x2+=pl; y1+=pt; y2+=pt
    crop=img[y1:y2, x1:x2]
    return cv2.resize(crop,(out,out),interpolation=cv2.INTER_LINEAR)

# ------------------------ 2D Detector (있으면 우리 ckpt, 없으면 COCO) ------------------------
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_kitti_frcnn(num_classes):
    m = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
    in_feat = m.roi_heads.box_predictor.cls_score.in_features
    m.roi_heads.box_predictor = FastRCNNPredictor(in_feat, num_classes)  # 배경 포함
    return m

# 클래스 매핑/priors (3종)
KITTI3 = ["Car","Pedestrian","Cyclist"]
class_to_idx = {"Car":0,"Pedestrian":1,"Cyclist":2}
priors_arr = np.stack([
    np.array([1.52,1.63,3.88],np.float32),  # Car
    np.array([1.73,0.60,0.80],np.float32),  # Ped
    np.array([1.73,0.60,1.76],np.float32),  # Cyc
],0)

# 모델 구성
if CKPT_2D.exists():
    # 우리 9클래스 헤드 버전 (배경+KITTI8)
    CLASSES_ALL = ["__background__","Car","Van","Truck","Pedestrian","Person_sitting","Cyclist","Tram","Misc"]
    model2d = get_kitti_frcnn(num_classes=len(CLASSES_ALL)).to(device)
    model2d.load_state_dict(torch.load(CKPT_2D, map_location=device))
    model2d.eval()
    USE_COCO = False
else:
    # COCO 프리트레인 (fallback)
    model2d = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT").to(device).eval()
    USE_COCO = True
    # COCO 카테고리 → 우리 3종 매핑
    COCO_TO_KITTI = {1:"Pedestrian", 2:"Cyclist", 3:"Car", 4:"Cyclist", 6:"Car", 8:"Car"}

@torch.no_grad()
def run_detector_rois(img_rgb, score_thresh=0.55, max_dets=60):
    x = torch.from_numpy(img_rgb).permute(2,0,1).float()/255.0
    out = model2d([x.to(device)])[0]
    boxes = out["boxes"].detach().cpu().numpy().astype(int)
    labels= out["labels"].detach().cpu().numpy().astype(int)
    scores= out["scores"].detach().cpu().numpy()

    rois=[]
    H,W,_ = img_rgb.shape
    if USE_COCO:
        for b,l,s in zip(boxes, labels, scores):
            if s < score_thresh: continue
            kcls = COCO_TO_KITTI.get(int(l))
            if kcls not in class_to_idx: continue
            l_,t_,r_,b_ = b.tolist()
            l_=max(0,l_); t_=max(0,t_); r_=min(W-1,r_); b_=min(H-1,b_)
            if r_-l_ < 12 or b_-t_ < 16: continue
            rois.append({"bbox":[l_,t_,r_,b_], "cls":kcls, "score":float(s)})
            if len(rois)>=max_dets: break
    else:
        # 우리 헤드(9 클래스) → 3종만 필터
        id2name = {i:n for i,n in enumerate(CLASSES_ALL)}
        for b,l,s in zip(boxes, labels, scores):
            if s < score_thresh: continue
            cls = id2name.get(int(l), "")
            if cls not in KITTI3: continue
            l_,t_,r_,b_ = b.tolist()
            l_=max(0,l_); t_=max(0,t_); r_=min(W-1,r_); b_=min(H-1,b_)
            if r_-l_ < 12 or b_-t_ < 16: continue
            rois.append({"bbox":[l_,t_,r_,b_], "cls":cls, "score":float(s)})
            if len(rois)>=max_dets: break
    return rois

# ------------------------ 3D Head 정의 & 체크포인트 로드 ------------------------
import torchvision.models as tvm
import torch.nn as nn

class Mono3DHead(nn.Module):
    def __init__(self, feat_dim=512, out_dim=6):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(feat_dim,256), nn.ReLU(inplace=True),
            nn.Linear(256,128), nn.ReLU(inplace=True),
            nn.Linear(128,out_dim)
        )
    def forward(self, f): return self.fc(f)

class Mono3DNet(nn.Module):
    def __init__(self):
        super().__init__()
        m = tvm.resnet18(weights=tvm.ResNet18_Weights.IMAGENET1K_V1)
        self.backbone = nn.Sequential(*(list(m.children())[:-1]))  # (B,512,1,1)
        self.head = Mono3DHead(512, 6)
    def forward(self, x):
        f = self.backbone(x); f = torch.flatten(f,1)
        o = self.head(f)
        dims_res = o[:,0:3]
        logz     = o[:,3:4]
        yaw_raw  = o[:,4:6]
        yaw = yaw_raw / (torch.linalg.norm(yaw_raw,dim=1,keepdim=True)+1e-6)
        return dims_res, logz, yaw

# 모델 만들고 ckpt 로드 (state_dict / whole model 모두 대응)
model3d = Mono3DNet().to(device)
_state = torch.load(CKPT_3D, map_location=device)
try:
    model3d.load_state_dict(_state)
except Exception:
    model3d = _state.to(device)
model3d.eval()
print(f"[✓] 3D head loaded from {CKPT_3D}")

# ------------------------ 트래킹 시퀀스 → MP4 ------------------------
@torch.no_grad()
def make_kitti_tracking_video(
    seq_id="0000",                  # 시퀀스 폴더명 (4자리 문자열)
    split="training",               # "training" or "testing"
    out_mp4="./track_0000.mp4",
    score_thresh=0.55, max_dets=60,
    x_range=(-20,20), z_range=(0,60),
    fps=10
):
    base = TRACK_TRAIN if split=="training" else TRACK_TEST
    img_dir = Path(base)/"image_02"/seq_id
    calib_txt = Path(base)/"calib"/f"{seq_id}.txt"   # tracking은 시퀀스별 P2
    assert img_dir.exists(), f"no frames at {img_dir}"
    assert calib_txt.exists(), f"no calib at {calib_txt}"

    P2 = read_tracking_calib(str(calib_txt))
    frames = sorted(img_dir.glob("*.png"))
    assert frames, f"no images in {img_dir}"

    # 비디오 해상도 결정
    sample = cv2.imread(str(frames[0]))
    H,W = sample.shape[:2]
    bev_side = H  # BEV를 이미지 높이에 맞춤
    out_size = (W + bev_side, H)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(out_mp4, fourcc, fps, out_size)

    for i,fp in enumerate(frames):
        img_bgr = cv2.imread(str(fp))
        img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

        # BEV 초기화 + FOV
        bev = bev_canvas(600,600,255); draw_fov(bev, P2, W, x_range, z_range)

        # 2D detector → ROI (파랑)
        rois = run_detector_rois(img_rgb, score_thresh=score_thresh, max_dets=max_dets)

        for r in rois:
            l,t,rgt,b = r["bbox"]; cls = r["cls"]
            # 2D 박스
            cv2.rectangle(img_bgr,(l,t),(rgt,b),(255,0,0),2)

            # ROI 전처리 → 3D 추론
            crop = crop_resize(img_rgb, [l,t,rgt,b], out=IMG_SIZE, scale=PAD_SCALE)
            x = torch.from_numpy(((crop/255.0 - IMAGENET_MEAN)/IMAGENET_STD)).permute(2,0,1).unsqueeze(0).float().to(device)
            dr, lz, yv = model3d(x)

            prior = priors_arr[class_to_idx.get(cls, 0)]
            dims  = (dr.squeeze(0).cpu().numpy() + prior)
            Z     = float(torch.exp(lz.squeeze()).cpu().numpy())
            ry    = float(atan2(yv.squeeze(0)[0].cpu().numpy(), yv.squeeze(0)[1].cpu().numpy()))

            # (X,Y,Z) 복원: 바닥 중앙 사용
            uv  = ((l+rgt)/2.0, b)
            loc = uvz_to_xyz(uv, Z, P2)

            # 3D 박스 (이미지+BEV)
            C = compute_box_3d_fixed(dims, loc, ry)
            if (C[2] > 0).all():
                img_bgr = draw_projected_box3d(img_bgr, project_to_image(C, P2), (0,165,255), 2)
                rect = get_bottom_rect_xz(dims, loc, ry)
                if rect: draw_rot_bev_rect(bev, rect, (0,165,255), x_range, z_range)

        bev_res = cv2.resize(bev, (H, H))
        combo = cv2.hconcat([img_bgr, bev_res])
        writer.write(combo)

        if (i+1) % 50 == 0:
            print(f"{seq_id}: {i+1}/{len(frames)} frames")

    writer.release()
    print(f"[✓] saved video: {out_mp4}")
    return out_mp4

[✓] 3D head loaded from ./models/mono3d_baseline.pt


In [7]:
import imageio

@torch.no_grad()
def make_kitti_tracking_gif(
    seq_id="0000",                  # 시퀀스 폴더명 (4자리 문자열)
    split="training",               # "training" or "testing"
    out_gif="./track_0000.gif",
    score_thresh=0.55, max_dets=60,
    x_range=(-20,20), z_range=(0,60),
    fps=10
):
    base = TRACK_TRAIN if split=="training" else TRACK_TEST
    img_dir = Path(base)/"image_02"/seq_id
    calib_txt = Path(base)/"calib"/f"{seq_id}.txt"
    assert img_dir.exists() and calib_txt.exists()

    P2 = read_tracking_calib(str(calib_txt))
    frames = sorted(img_dir.glob("*.png"))
    assert frames, f"no images in {img_dir}"

    # 프레임 저장용 리스트
    gif_frames = []

    for i,fp in enumerate(frames):
        img_bgr = cv2.imread(str(fp))
        img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

        # BEV 초기화 + FOV
        bev = bev_canvas(600,600,255); draw_fov(bev, P2, img_rgb.shape[1], x_range, z_range)

        # 2D detector → ROI (파랑)
        rois = run_detector_rois(img_rgb, score_thresh=score_thresh, max_dets=max_dets)

        for r in rois:
            l,t,rgt,b = r["bbox"]; cls = r["cls"]
            #cv2.rectangle(img_bgr,(l,t),(rgt,b),(255,0,0),2)

            # ROI 전처리 → 3D 추론
            crop = crop_resize(img_rgb, [l,t,rgt,b], out=IMG_SIZE, scale=PAD_SCALE)
            x = torch.from_numpy(((crop/255.0 - IMAGENET_MEAN)/IMAGENET_STD)).permute(2,0,1).unsqueeze(0).float().to(device)
            dr, lz, yv = model3d(x)

            prior = priors_arr[class_to_idx.get(cls, 0)]
            dims  = (dr.squeeze(0).cpu().numpy() + prior)
            Z     = float(torch.exp(lz.squeeze()).cpu().numpy())
            ry    = float(atan2(yv.squeeze(0)[0].cpu().numpy(), yv.squeeze(0)[1].cpu().numpy()))

            uv  = ((l+rgt)/2.0, b)
            loc = uvz_to_xyz(uv, Z, P2)

            C = compute_box_3d_fixed(dims, loc, ry)
            if (C[2] > 0).all():
                img_bgr = draw_projected_box3d(img_bgr, project_to_image(C, P2), (0,165,255), 2)
                rect = get_bottom_rect_xz(dims, loc, ry)
                if rect: draw_rot_bev_rect(bev, rect, (0,165,255), x_range, z_range)

        bev_res = cv2.resize(bev, (img_bgr.shape[0], img_bgr.shape[0]))
        combo = cv2.hconcat([img_bgr, bev_res])

        # BGR → RGB 변환 후 GIF 프레임 추가
        gif_frames.append(cv2.cvtColor(combo, cv2.COLOR_BGR2RGB))

        if (i+1) % 50 == 0:
            print(f"{seq_id}: {i+1}/{len(frames)} frames")

    # GIF 저장
    imageio.mimsave(out_gif, gif_frames, fps=fps, loop=0)
    print(f"[✓] saved gif: {out_gif}")
    return out_gif


# 사용 예시
make_kitti_tracking_gif(seq_id="0003", split="training", out_gif="./track_0003.gif", fps=10)


0003: 50/144 frames
0003: 100/144 frames
[✓] saved gif: ./track_0003.gif


'./track_0003.gif'