In [11]:
import os
import glob
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# ============================================================
# 1. IoU 계산 (2D 바운딩 박스)
# ============================================================
def iou_2d(boxA, boxB):
    """
    boxA, boxB: [x1,y1,x2,y2]
    """
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH

    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    iou = interArea / float(boxAArea + boxBArea - interArea + 1e-6)
    return iou

# ============================================================
# 2. KITTI 파일 로더 (prediction & label 공통)
# ============================================================
def load_kitti_file(path, with_score=True):
    """
    path: KITTI label or prediction txt
    return: list of dicts [{'box2d','box3d','score'}]
    """
    dets = []
    with open(path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 15:  # invalid line
                continue
            cls = parts[0]
            if cls != 'Car':  # Car만 처리
                continue
            x1, y1, x2, y2 = map(float, parts[4:8])
            h, w, l = map(float, parts[8:11])
            x, y, z, ry = map(float, parts[11:15])
            score = float(parts[15]) if with_score and len(parts) > 15 else 1.0
            dets.append({
                "box2d":[x1,y1,x2,y2],
                "box3d":[x,y,z,w,h,l,ry],
                "score":score
            })
    return dets


# ============================================================
# 3. Keypoint set 로더 (예시: json or npy → KITTI와 동일 구조로 변환했다고 가정)
# ============================================================
import json
def load_keypoint_json(json_path, image_id):
    """
    json_path: keypoints_with_theta_pred_train.json
    image_id:  "000123" (확장자 제거)
    return: [{'box2d','keypoints','ry'}]
    """
    with open(json_path, 'r') as f:
        data = json.load(f)

    kp_sets = []
    for obj in data:
        # image_id 매칭
        if obj["image_id"].split('.')[0] != image_id:
            continue
        kp_sets.append({
            "box2d": obj["crop_bbox"],                # [x1,y1,x2,y2]
            "keypoints": np.array(obj["keypoints"]),  # (K,2)
            "ry": obj["theta"]
        })
    return kp_sets

# ============================================================
# 4. 매칭 함수 no 3d박스 iou
# ============================================================
# def build_samples(output_dir, kp_json_path, label_dir, iou_thresh=0.8):
#     samples = []
#     out_files = sorted(glob.glob(os.path.join(output_dir, "*.txt")))

#     for out_path in out_files:
#         image_id = os.path.splitext(os.path.basename(out_path))[0]
#         label_path = os.path.join(label_dir, image_id + ".txt")

#         if not os.path.exists(label_path):
#             continue

#         outputs = load_kitti_file(out_path, with_score=True)
#         kp_sets = load_keypoint_json(kp_json_path, image_id)  # JSON에서 해당 image_id 가져오기
#         labels  = load_kitti_file(label_path, with_score=False)

#         for out in outputs:
#             for kp in kp_sets:
#                 iou = iou_2d(out["box2d"], kp["box2d"])
#                 if iou >= iou_thresh:
#                     # label에서 가장 IoU 큰 것 선택
#                     gt = max(labels, key=lambda g: iou_2d(out["box2d"], g["box2d"]))
#                     samples.append({
#                         "init_3d": np.array(out["box3d"], dtype=np.float32),
#                         "keypoints": np.array(kp["keypoints"], dtype=np.float32).flatten(),
#                         "ry_keypoint": np.array([kp["ry"]], dtype=np.float32),
#                         "gt_3d": np.array(gt["box3d"], dtype=np.float32),
#                     })
#     return samples



def bev_iou(box1, box2):
    """두 3D box의 BEV IoU 계산 (KITTI x,z 평면 기준)."""
    from shapely.geometry import Polygon
    x, y, z, w, h, l, ry = box1
    cosa, sina = np.cos(ry), np.sin(ry)
    dx, dz = w/2, l/2
    corners = np.array([
        [ dx,  dz],
        [ dx, -dz],
        [-dx, -dz],
        [-dx,  dz]
    ])
    rot = np.array([[cosa, -sina],[sina, cosa]])
    corners = corners @ rot.T
    corners += np.array([x, z])
    poly1 = Polygon(corners)

    x, y, z, w, h, l, ry = box2
    cosa, sina = np.cos(ry), np.sin(ry)
    dx, dz = w/2, l/2
    corners = np.array([
        [ dx,  dz],
        [ dx, -dz],
        [-dx, -dz],
        [-dx,  dz]
    ])
    rot = np.array([[cosa, -sina],[sina, cosa]])
    corners = corners @ rot.T
    corners += np.array([x, z])
    poly2 = Polygon(corners)

    inter = poly1.intersection(poly2).area
    union = poly1.union(poly2).area
    return inter / (union + 1e-6)


def build_samples(output_dir, kp_json_path, label_dir, iou_thresh=0.8, iou3d_thresh=0.3):
    samples = []
    out_files = sorted(glob.glob(os.path.join(output_dir, "*.txt")))

    for out_path in out_files:
        image_id = os.path.splitext(os.path.basename(out_path))[0]
        label_path = os.path.join(label_dir, image_id + ".txt")
        if not os.path.exists(label_path):
            continue

        outputs = load_kitti_file(out_path, with_score=True)
        kp_sets = load_keypoint_json(kp_json_path, image_id)
        labels  = load_kitti_file(label_path, with_score=False)

        for out in outputs:
            for kp in kp_sets:
                iou2d = iou_2d(out["box2d"], kp["box2d"])
                if iou2d >= iou_thresh:
                    # label 중에서 3D IoU ≥ iou3d_thresh 조건 만족하는 것만 고려
                    valid_gts = [g for g in labels if bev_iou(out["box3d"], g["box3d"]) >= iou3d_thresh]
                    if len(valid_gts) == 0:
                        continue  # 3D IoU ≥ 0.3 만족하는 GT 없으면 skip
                    gt = max(valid_gts, key=lambda g: bev_iou(out["box3d"], g["box3d"]))
                    samples.append({
                        "init_3d": np.array(out["box3d"], dtype=np.float32),
                        "keypoints": np.array(kp["keypoints"], dtype=np.float32).flatten(),
                        "ry_keypoint": np.array([kp["ry"]], dtype=np.float32),
                        "gt_3d": np.array(gt["box3d"], dtype=np.float32),
                    })
    return samples










# ============================================================
# 5. Dataset & Model (앞에서 정의한 그대로)
# ============================================================
class RefineDataset(torch.utils.data.Dataset):
    def __init__(self, samples):
        self.samples = samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]
        return {
            "init_3d": torch.tensor(s["init_3d"]),
            "keypoints": torch.tensor(s["keypoints"]),
            "ry_keypoint": torch.tensor(s["ry_keypoint"]),
            "gt_3d": torch.tensor(s["gt_3d"]),
        }

class OutputKeypointRegressor(nn.Module):
    def __init__(self, kp_dim, hidden_dim=128):
        super().__init__()
        input_dim = 7 + kp_dim + 1
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc_out = nn.Linear(hidden_dim, 7)

    def forward(self, init_3d, keypoints, yaw):
        x = torch.cat([init_3d, keypoints, yaw], dim=-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        delta = self.fc_out(x)
        return delta

def train_regressor(samples, kp_dim=24, epochs=10, batch_size=16, lr=1e-3,
                    save_path="../outputs/regressor_best.pth"):
    dataset = RefineDataset(samples)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    model = OutputKeypointRegressor(kp_dim=kp_dim)
    optimizer = optim.Adam(model.parameters(), lr=lr)

    best_loss = float("inf")

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for batch in dataloader:
            init_3d = batch["init_3d"]
            keypoints = batch["keypoints"]
            ry_keypoint = batch["ry_keypoint"]
            gt_3d = batch["gt_3d"]

            delta_gt = gt_3d - init_3d
            delta_pred = model(init_3d, keypoints, ry_keypoint)

            loss = F.smooth_l1_loss(delta_pred, delta_gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"[Epoch {epoch+1}/{epochs}] Loss: {avg_loss:.4f}")

        # ✅ Best 모델 저장
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), save_path)
            print(f"  ↳ 새로운 best 모델 저장됨: {save_path} (loss={best_loss:.4f})")

    print("학습 완료. Best Loss =", best_loss)
    return model



def resume_training(samples, kp_dim, epochs=10, batch_size=16, lr=1e-3,
                    resume_path="../outputs/regressor_best.pth",
                    save_path="../outputs/regressor_best.pth"):
    dataset = RefineDataset(samples)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
    model = OutputKeypointRegressor(kp_dim=kp_dim)

    # ✅ 기존 checkpoint 불러오기
    if os.path.exists(resume_path):
        model.load_state_dict(torch.load(resume_path))
        print(f"기존 모델 로드 완료: {resume_path}")
    else:
        print("⚠️ resume_path 에 파일이 없어 새로 학습을 시작합니다.")

    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_loss = float("inf")

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for batch in dataloader:
            init_3d = batch["init_3d"]
            keypoints = batch["keypoints"]
            ry_keypoint = batch["ry_keypoint"]
            gt_3d = batch["gt_3d"]

            delta_gt = gt_3d - init_3d
            delta_pred = model(init_3d, keypoints, ry_keypoint)

            loss = F.smooth_l1_loss(delta_pred, delta_gt)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"[Resume Epoch {epoch+1}/{epochs}] Loss: {avg_loss:.4f}")

        # ✅ best 저장
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), save_path)
            print(f"  ↳ 새로운 best 모델 저장됨: {save_path} (loss={best_loss:.4f})")

    print("추가 학습 완료. Best Loss =", best_loss)
    return model




# 실행 예시
output_dir = "../dataset/merge_output_train"         # detector 결과 (KITTI txt)
kp_json_path = "../dataset/keypoints_with_theta_pred_train.json"  # keypoint + theta JSON
label_dir    = "../dataset/label_2_train"         # KITTI GT labels

samples = build_samples(output_dir, kp_json_path, label_dir, iou_thresh=0.75)
print(f"총 {len(samples)} 개의 학습 샘플 생성됨")

총 1127 개의 학습 샘플 생성됨


In [12]:
#학습 & best.pth 저장
model = train_regressor(
    samples,
    kp_dim=len(samples[0]["keypoints"]),
    epochs=10,
    save_path="../outputs/regressor_best.pth"
)





# # 기존 모델 이어서 학습
# model = resume_training(
#     samples,
#     kp_dim=len(samples[0]["keypoints"]),
#     epochs=500,   # 추가 학습 epoch
#     resume_path="../outputs/regressor_best.pth",
#     save_path="../outputs/regressor_best.pth"
# )



# def refine_outputs(model, output_dir, kp_json_path, save_dir, iou_thresh=0.5):
#     os.makedirs(save_dir, exist_ok=True)
#     out_files = sorted(glob.glob(os.path.join(output_dir, "*.txt")))

#     for out_path in out_files:
#         image_id = os.path.splitext(os.path.basename(out_path))[0]

#         # 원래 detection 결과
#         outputs = load_kitti_file(out_path, with_score=True)
#         if len(outputs) == 0:
#             # 비어있으면 빈 txt 저장
#             open(os.path.join(save_dir, image_id + ".txt"), "w").close()
#             continue

#         # keypoints set
#         kp_sets = load_keypoint_json(kp_json_path, image_id)

#         refined_dets = []
#         for out in outputs:
#             best_match = None
#             best_iou = 0.0
#             for kp in kp_sets:
#                 iou = iou_2d(out["box2d"], kp["box2d"])
#                 if iou > best_iou:
#                     best_iou = iou
#                     best_match = kp

#             if best_match is not None and best_iou >= iou_thresh:
#                 # refine 적용
#                 init_3d = torch.tensor(out["box3d"], dtype=torch.float32).unsqueeze(0)
#                 keypoints = torch.tensor(best_match["keypoints"], dtype=torch.float32).flatten().unsqueeze(0)
#                 ry_keypoint = torch.tensor([best_match["ry"]], dtype=torch.float32).unsqueeze(0)

#                 delta = model(init_3d, keypoints, ry_keypoint).detach().cpu().numpy()[0]
#                 refined_box = init_3d.numpy()[0] + delta*0.5
#             else:
#                 # 매칭 안되면 그대로 사용
#                 refined_box = out["box3d"]

#             refined_dets.append({
#                 "cls": "Car",
#                 "box2d": out["box2d"],
#                 "box3d": refined_box,
#                 "score": out["score"]
#             })

#         # KITTI 형식으로 저장
#         save_path = os.path.join(save_dir, image_id + ".txt")
#         with open(save_path, "w") as f:
#             for det in refined_dets:
#                 x1, y1, x2, y2 = det["box2d"]
#                 x, y, z, w, h, l, ry = det["box3d"]
#                 score = det["score"]
#                 line = f"Car 0.00 0 -1.67 {x1:.2f} {y1:.2f} {x2:.2f} {y2:.2f} " \
#                        f"{h:.2f} {w:.2f} {l:.2f} {x:.2f} {y:.2f} {z:.2f} {ry:.2f} {score:.3f}\n"
                # f.write(line)

    # print(f"✅ Refined 결과 저장 완료: {save_dir}")


[Epoch 1/10] Loss: 2.6477
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=2.6477)
[Epoch 2/10] Loss: 0.6659
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.6659)
[Epoch 3/10] Loss: 0.5232
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.5232)
[Epoch 4/10] Loss: 0.3914
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.3914)
[Epoch 5/10] Loss: 0.2278
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.2278)
[Epoch 6/10] Loss: 0.3674
[Epoch 7/10] Loss: 0.3153
[Epoch 8/10] Loss: 0.2824
[Epoch 9/10] Loss: 0.1967
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.1967)
[Epoch 10/10] Loss: 0.1931
  ↳ 새로운 best 모델 저장됨: ../outputs/regressor_best.pth (loss=0.1931)
학습 완료. Best Loss = 0.19308086054425844


In [16]:
def refine_outputs(model, output_dir, kp_json_path, save_dir, iou_thresh=0.5):
    os.makedirs(save_dir, exist_ok=True)
    out_files = sorted(glob.glob(os.path.join(output_dir, "*.txt")))
    total_boxes = 0 #
    matched_boxes = 0 #
    unmatched_boxes = 0 #


    for out_path in out_files:
        image_id = os.path.splitext(os.path.basename(out_path))[0]

        # 원래 detection 결과
        outputs = load_kitti_file(out_path, with_score=True)

        total_boxes += len(outputs) #


        if len(outputs) == 0:
            # 비어있으면 빈 txt 저장
            open(os.path.join(save_dir, image_id + ".txt"), "w").close()
            continue

        # keypoints set
        kp_sets = load_keypoint_json(kp_json_path, image_id)

        refined_dets = []
        for out in outputs:
            best_match = None
            best_iou = 0.0
            for kp in kp_sets:
                iou = iou_2d(out["box2d"], kp["box2d"])
                if iou > best_iou:
                    best_iou = iou
                    best_match = kp

            if best_match is not None and best_iou >= iou_thresh:
                # refine 적용
                matched_boxes += 1 #
                init_3d = torch.tensor(out["box3d"], dtype=torch.float32).unsqueeze(0)
                keypoints = torch.tensor(best_match["keypoints"], dtype=torch.float32).flatten().unsqueeze(0)
                ry_keypoint = torch.tensor([best_match["ry"]], dtype=torch.float32).unsqueeze(0)

                delta = model(init_3d, keypoints, ry_keypoint).detach().cpu().numpy()[0]
                refined_box = init_3d.numpy()[0] + delta*0.3
            else:
                # 매칭 안되면 그대로 사용
                unmatched_boxes += 1 #
                refined_box = out["box3d"]

            refined_dets.append({
                "cls": "Car",
                "box2d": out["box2d"],
                "box3d": refined_box,
                "score": out["score"]
            })

        # KITTI 형식으로 저장
        save_path = os.path.join(save_dir, image_id + ".txt")
        with open(save_path, "w") as f:
            for det in refined_dets:
                x1, y1, x2, y2 = det["box2d"]
                x, y, z, w, h, l, ry = det["box3d"]
                score = det["score"]
                line = f"Car 0.00 0 -1.67 {x1:.2f} {y1:.2f} {x2:.2f} {y2:.2f} " \
                       f"{h:.2f} {w:.2f} {l:.2f} {x:.2f} {y:.2f} {z:.2f} {ry:.2f} {score:.3f}\n"
                f.write(line)

    print(f"✅ Refined 결과 저장 완료: {save_dir}")



In [17]:
# 1. 모델 불러오기
model = OutputKeypointRegressor(kp_dim=24)
model.load_state_dict(torch.load("../outputs/regressor_best.pth"))
model.eval()

# 2. refinement 실행
refine_outputs(
    model,
    output_dir="../dataset/merge_output_val",
    kp_json_path="../dataset/keypoints_with_theta_pred_val.json",
    save_dir="../dataset/keypoint_refined_output_val",
    iou_thresh=0.5
)



# ✅ 최종 통계 출력
print("====== Refinement 매칭 통계 ======")
print(f"총 박스 수: {total_boxes}")
print(f"매칭 성공: {matched_boxes}  ({matched_boxes/total_boxes*100:.2f}%)")
print(f"매칭 실패: {unmatched_boxes}  ({unmatched_boxes/total_boxes*100:.2f}%)")
print("================================")


✅ Refined 결과 저장 완료: ../dataset/keypoint_refined_output_val


NameError: name 'total_boxes' is not defined

In [1]:
## theta 교체

import os
import glob
import json
import numpy as np

# ------------------------
# IoU 계산 함수 (2D)
# ------------------------
def iou_2d(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH

    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    return interArea / float(boxAArea + boxBArea - interArea + 1e-6)

# ------------------------
# KITTI prediction 파일 로더/저장
# ------------------------
def load_kitti_pred_file(path):
    dets = []
    with open(path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 15:  # 잘못된 줄 skip
                continue
            cls = parts[0]
            if cls != "Car":  # Car만 처리
                continue
            x1, y1, x2, y2 = map(float, parts[4:8])
            h, w, l = map(float, parts[8:11])
            x, y, z, ry = map(float, parts[11:15])
            score = float(parts[15]) if len(parts) > 15 else 1.0
            dets.append({
                "cls": cls,
                "box2d": [x1, y1, x2, y2],
                "box3d": [x, y, z, w, h, l, ry],
                "score": score
            })
    return dets

def save_kitti_pred_file(path, detections):
    with open(path, "w") as f:
        for det in detections:
            x1, y1, x2, y2 = det["box2d"]
            x, y, z, w, h, l, ry = det["box3d"]
            score = det["score"]
            cls = det["cls"]
            truncated, occluded, alpha = 0.00, 0, -1.67
            line = f"{cls} {truncated:.2f} {occluded} {alpha:.2f} " \
                   f"{x1:.2f} {y1:.2f} {x2:.2f} {y2:.2f} " \
                   f"{h:.2f} {w:.2f} {l:.2f} " \
                   f"{x:.2f} {y:.2f} {z:.2f} {ry:.2f} {score:.4f}\n"
            f.write(line)

# ------------------------
# Keypoint JSON 로더
# ------------------------
def load_keypoints(json_path, image_id):
    with open(json_path, "r") as f:
        data = json.load(f)
    kp_sets = []
    for obj in data:
        if obj["image_id"].split(".")[0] != image_id:
            continue
        kp_sets.append({
            "box2d": obj["crop_bbox"],  # [x1,y1,x2,y2]
            "ry": obj["theta"]
        })
    return kp_sets

# ------------------------
# yaw 교체 함수
# ------------------------
def replace_yaw(output_dir, kp_json_path, save_dir, iou_thresh=0.75):
    os.makedirs(save_dir, exist_ok=True)
    out_files = sorted(glob.glob(os.path.join(output_dir, "*.txt")))

    for out_path in out_files:
        image_id = os.path.splitext(os.path.basename(out_path))[0]
        detections = load_kitti_pred_file(out_path)
        kp_sets = load_keypoints(kp_json_path, image_id)

        for det in detections:
            best_match = None
            best_iou = 0.0
            for kp in kp_sets:
                iou = iou_2d(det["box2d"], kp["box2d"])
                if iou > best_iou:
                    best_iou = iou
                    best_match = kp

            if best_match is not None and best_iou >= iou_thresh:
                # ✅ yaw 교체
                det["box3d"][-1] = best_match["ry"]

        save_path = os.path.join(save_dir, image_id + ".txt")
        save_kitti_pred_file(save_path, detections)

    print(f"✅ yaw 교체 완료: {save_dir}")

# ------------------------
# 실행 예시
# ------------------------
if __name__ == "__main__":
    output_dir = "../dataset/merge_output_val"   # MonoDGP 예측 txt
    kp_json_path = "../dataset/keypoints_with_theta_pred_val.json"
    save_dir = "../dataset/yaw_replaced_output_val"

    replace_yaw(output_dir, kp_json_path, save_dir, iou_thresh=0.85)
    print('done')

✅ yaw 교체 완료: ../dataset/yaw_replaced_output_val
done
