In [1]:
!git clone https://github.com/jsh112/jsh_triangulation.git
%cd /content/jsh_triangulation/0923

Cloning into 'jsh_triangulation'...
remote: Enumerating objects: 762, done.[K
remote: Counting objects: 100% (51/51), done.[K
remote: Compressing objects: 100% (36/36), done.[K
remote: Total 762 (delta 21), reused 42 (delta 12), pack-reused 711 (from 1)[K
Receiving objects: 100% (762/762), 78.20 MiB | 29.59 MiB/s, done.
Resolving deltas: 100% (84/84), done.
Updating files: 100% (1413/1413), done.
/content/jsh_triangulation/0923


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import cv2
from config import *
from pathlib import Path
import stereo_utils as su

In [None]:
TMP_NPZ_PATH = Path("./calib_out/stereo_params_scaled.npz")
map1x, map1y, map2x, map2y, P1, P2, size, B, M = su.load_stereo(TMP_NPZ_PATH)
selected_class_name = 'Hold_Green'

In [None]:
# pip install 들어와야할 자리
!pip install ultralytics

In [None]:
def extract_holds_with_indices(frame_bgr, model, selected_class_name=None,
                               mask_thresh=0.7, row_tol=50):
    h, w = frame_bgr.shape[:2]
    res = model(frame_bgr)[0]
    holds = []
    if res.masks is None: return []
    masks = res.masks.data; boxes = res.boxes; names = model.names
    print(f"[dbg] masks={tuple(res.masks.data.shape)} | frame={(h,w)}")
    for i in range(masks.shape[0]):
        mask = masks[i].cpu().numpy()
        mask_rs = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
        binary = (mask_rs > mask_thresh).astype(np.uint8) * 255
        contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if not contours: continue
        contour = max(contours, key=cv2.contourArea)
        cls_id = int(boxes.cls[i].item()); conf = float(boxes.conf[i].item())
        class_name = names[cls_id]
        if (selected_class_name is not None) and (class_name != selected_class_name):
            continue
        Mom = cv2.moments(contour)
        if Mom["m00"] == 0: continue
        cx = int(Mom["m10"]/Mom["m00"]); cy = int(Mom["m01"]/Mom["m00"])
        holds.append({"class_name": class_name, "color": COLOR_MAP.get(class_name,(255,255,255)),
                      "contour": contour, "center": (cx, cy), "conf": conf})
    if not holds: return []
    enriched = [{"cx": h_["center"][0], "cy": h_["center"][1], **h_} for h_ in holds]
    enriched.sort(key=lambda h: h["cy"])
    rows, cur = [], [enriched[0]]
    for h_ in enriched[1:]:
        if abs(h_["cy"] - cur[0]["cy"]) < row_tol: cur.append(h_)
        else: rows.append(cur); cur = [h_]
    rows.append(cur)
    final_sorted = []
    for row in rows:
        row.sort(key=lambda h: h["cx"])
        final_sorted.extend(row)
    for idx, h_ in enumerate(final_sorted):
        h_["hold_index"] = idx
    return final_sorted

def assign_indices(holds, row_tol=50):
    if not holds:
        return []
    enriched = [{"cx": h["center"][0], "cy": h["center"][1], **h} for h in holds]
    enriched.sort(key=lambda h: h["cy"])
    rows, cur = [], [enriched[0]]
    for h_ in enriched[1:]:
        if abs(h_["cy"] - cur[0]["cy"]) < row_tol: cur.append(h_)
        else: rows.append(cur); cur = [h_]
    rows.append(cur)
    final_sorted = []
    for row in rows:
        row.sort(key=lambda h: h["cx"])
        final_sorted.extend(row)
    for idx, h_ in enumerate(final_sorted):
        h_["hold_index"] = idx
    return final_sorted


def merge_holds_by_center(holds_lists, merge_dist_px=18):
    merged = []
    for holds in holds_lists:
        for h in holds:
            h = {k: v for k, v in h.items()}  # shallow copy
            h.pop("hold_index", None)         # 인덱스는 최종에 재부여
            assigned = False
            for m in merged:
                dx = h["center"][0] - m["center"][0]
                dy = h["center"][1] - m["center"][1]
                if (dx*dx + dy*dy) ** 0.5 <= merge_dist_px:
                    # 대표 갱신 기준: 면적 우선, 비슷하면 conf 큰 것
                    area_h = cv2.contourArea(h["contour"])
                    area_m = cv2.contourArea(m["contour"])
                    if (area_h > area_m) or (abs(area_h - area_m) < 1e-6 and h.get("conf",0) > m.get("conf",0)):
                        m.update(h)
                    assigned = True
                    break
            if not assigned:
                merged.append(h)
    return merged

In [None]:
from ultralytics import YOLO
from pathlib import Path
model = YOLO("./models/best_6.pt")

In [None]:
L_dir = Path("rectified_frames_L")
R_dir = Path("rectified_frames_R")

L_images = sorted(list(L_dir.glob("L_*.png")))
R_images = sorted(list(R_dir.glob("R_*.png")))

L_sets, R_sets = [], []

In [None]:
for k, (L_path, R_path) in enumerate(zip(L_images, R_images)):
    Lr_k = cv2.imread(str(L_path))
    Rr_k = cv2.imread(str(R_path))

    holdsL_k = extract_holds_with_indices(Lr_k, model, selected_class_name, THRESH_MASK, ROW_TOL_Y)
    holdsR_k = extract_holds_with_indices(Rr_k, model, selected_class_name, THRESH_MASK, ROW_TOL_Y)

    L_sets.append(holdsL_k)
    R_sets.append(holdsR_k)

    print(f"  - frame {k+1}/{len(L_images)}: L={len(holdsL_k)}  R={len(holdsR_k)}")


In [None]:
holdsL = assign_indices(merge_holds_by_center(L_sets, CENTER_MERGE_PX), ROW_TOL_Y)
holdsR = assign_indices(merge_holds_by_center(R_sets, CENTER_MERGE_PX), ROW_TOL_Y)

if not holdsL or not holdsR:
    print("[Warn] 한쪽 또는 양쪽에서 홀드가 검출되지 않았습니다.")

In [None]:
# index → hold 맵 & 공통 ID
idxL = {h["hold_index"]: h for h in holdsL}
idxR = {h["hold_index"]: h for h in holdsR}
common_ids = sorted(set(idxL.keys()) & set(idxR.keys()))
if not common_ids:
    print("[Warn] 좌/우 공통 hold_index가 없습니다.")
else:
    print(f"[Info] 매칭된 홀드 쌍 수: {len(common_ids)}")

In [None]:
def yaw_pitch_from_X(X, O, y_up_is_negative=True):
    v = X - O
    vx, vy, vz = float(v[0]), float(v[1]), float(v[2])
    yaw   = np.degrees(np.arctan2(vx, vz))
    pitch = np.degrees(np.arctan2((-vy if y_up_is_negative else vy), np.hypot(vx, vz)))
    return yaw, pitch

def triangulate_xy(P1, P2, ptL, ptR):
    xl = np.array(ptL, dtype=np.float64).reshape(2,1)
    xr = np.array(ptR, dtype=np.float64).reshape(2,1)
    Xh = cv2.triangulatePoints(P1, P2, xl, xr)
    X  = (Xh[:3] / Xh[3]).reshape(3)  # [X,Y,Z] (mm)
    return X

def wrap_deg(d): return (d + 180.0) % 360.0 - 180.0

def angle_between(v1, v2):
    a = np.linalg.norm(v1); b = np.linalg.norm(v2)
    if a == 0 or b == 0: return 0.0
    cosang = np.clip(np.dot(v1, v2) / (a * b), -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

In [None]:
# 매칭 결과 사전 계산(3D, 거리, 각도) — LEFT 원점 기반
matched_results = []
for hid in common_ids:
    Lh = idxL[hid]; Rh = idxR[hid]
    X = triangulate_xy(P1, P2, Lh["center"], Rh["center"])
    d_left  = float(np.linalg.norm(X - L))            # LEFT 기준 거리
    d_line  = float(np.hypot(X[1], X[2]))
    yaw_deg, pitch_deg = yaw_pitch_from_X(X, O, Y_UP_IS_NEGATIVE)
    matched_results.append({
        "hid": hid,
        "Lcx": Lh["center"][0], "Lcy": Lh["center"][1],
        "Rcx": Rh["center"][0], "Rcy": Rh["center"][1],
        "color": Lh["color"],
        "X": X, "d_left": d_left, "d_line": d_line,
        "yaw_deg": yaw_deg, "pitch_deg": pitch_deg,
    })

In [None]:
# 연속 인덱스 각도차 (정보용)
by_id = {mr["hid"]: mr for mr in matched_results}
max_id = max(by_id) if by_id else -1
angle_deltas = []
for i in range(max_id):
    if (i in by_id) and (i+1 in by_id):
        a = by_id[i]; b = by_id[i+1]
        dyaw   = wrap_deg(b["yaw_deg"]   - a["yaw_deg"])
        dpitch = wrap_deg(b["pitch_deg"] - a["pitch_deg"])
        v1 = a["X"] - O; v2 = b["X"] - O
        d3d = angle_between(v1, v2)
        angle_deltas.append((i, i+1, dyaw, dpitch, d3d))


    print("\n[ΔAngles] (i -> i+1):  Δyaw(deg), Δpitch(deg), 3D_angle(deg)")
    for i, j, dyaw, dpitch, d3d in angle_deltas:
        print(f"  {i:>2}→{j:<2} :  {dyaw:+6.2f}°, {dpitch:+6.2f}°, {d3d:6.2f}°")