In [5]:
import os, json, math
import numpy as np
from ultralytics import YOLO
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import mean_squared_error

In [6]:
# =====================================================
# 1. 데이터 준비 함수
# =====================================================
def pixel_distance(p1, p2):
    return math.sqrt((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2)

def extract_features(json_path, img_path, model):
    with open(json_path, "r") as f:
        data = json.load(f)

    features, labels = [], []
    # 메타데이터
    for _, v in data.items():
        res   = float(v["file_attributes"]["img_resolution"])
        roll  = float(v["file_attributes"]["img_roll_tilt"])
        pitch = float(v["file_attributes"]["img_pitch_tilt"])
        h     = float(v["file_attributes"]["img_height"])
        w     = float(v["file_attributes"]["img_width"])

        # YOLO 예측
        results = model.predict(img_path, verbose=False)
        kpts_all = results[0].keypoints.xy.cpu().numpy()  # (N, K, 2)

        for i, region in enumerate(v["regions"]):
            xs = region["shape_attributes"]["all_points_x"]
            ys = region["shape_attributes"]["all_points_y"]
            h_true = float(region["region_attributes"]["chi_height_m"])

            if kpts_all.shape[0] > i:  # YOLO가 해당 굴뚝을 탐지했다고 가정
                kp = kpts_all[i]
                if kp.shape[0] >= 2:
                    x1, y1 = kp[0][0], kp[0][1]
                    x2, y2 = kp[1][0], kp[1][1]
                    pred_pixel_len = pixel_distance((x1,y1),(x2,y2))

                    # feature = [pred_pixel_len, res, roll, pitch]
                    features.append([pred_pixel_len, res, roll, pitch])
                    labels.append(h_true)

    return features, labels


In [7]:
# =====================================================
# 2. Dataset 클래스
# =====================================================
class ChimneyDataset(Dataset):
    def __init__(self, img_dir, json_dir, model):
        self.X, self.y = [], []
        for img_name in os.listdir(img_dir):
            if not img_name.endswith(".jpg"):
                continue
            img_path = os.path.join(img_dir, img_name)
            json_path = os.path.join(json_dir, img_name.replace(".jpg",".json"))
            feats, labels = extract_features(json_path, img_path, model)
            self.X.extend(feats)
            self.y.extend(labels)

        self.X = torch.tensor(self.X, dtype=torch.float32)
        self.y = torch.tensor(self.y, dtype=torch.float32).view(-1,1)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [8]:
# =====================================================
# 3. 모델 정의 (MLP)
# =====================================================
class HeightRegressor(nn.Module):
    def __init__(self, in_dim=4, hidden=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1)
        )

    def forward(self, x):
        return self.net(x)


In [9]:
# =====================================================
# 4. 학습 루프
# =====================================================
VAL_IMG_DIR = "../valid/images"
VAL_JSON_DIR = "../valid/json"

# best.pt 불러오기
model_yolo = YOLO("C:/Users/USER/Desktop/personal/dcc/mission2/mission2_yolo2/runs/chimney_pose/weights/best.pt")

dataset = ChimneyDataset(VAL_IMG_DIR, VAL_JSON_DIR, model_yolo)
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
regressor = HeightRegressor().to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(regressor.parameters(), lr=1e-3)

EPOCHS = 30
for epoch in range(EPOCHS):
    regressor.train()
    epoch_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        pred = regressor(X_batch)
        loss = criterion(pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {epoch_loss/len(train_loader):.4f}")


Epoch 1/30, Loss: 6873.4937
Epoch 2/30, Loss: 1369.9614
Epoch 3/30, Loss: 1337.2332
Epoch 4/30, Loss: 1337.2065
Epoch 5/30, Loss: 1296.4686
Epoch 6/30, Loss: 1271.9895
Epoch 7/30, Loss: 1277.1972
Epoch 8/30, Loss: 1255.6275
Epoch 9/30, Loss: 1240.7415
Epoch 10/30, Loss: 1238.1536
Epoch 11/30, Loss: 1222.3348
Epoch 12/30, Loss: 1232.2620
Epoch 13/30, Loss: 1264.3118
Epoch 14/30, Loss: 1212.1470
Epoch 15/30, Loss: 1172.3432
Epoch 16/30, Loss: 1179.6570
Epoch 17/30, Loss: 1169.5710
Epoch 18/30, Loss: 1159.9460
Epoch 19/30, Loss: 1156.7954
Epoch 20/30, Loss: 1156.3407
Epoch 21/30, Loss: 1138.8713
Epoch 22/30, Loss: 1131.7105
Epoch 23/30, Loss: 1114.9402
Epoch 24/30, Loss: 1114.0730
Epoch 25/30, Loss: 1097.4155
Epoch 26/30, Loss: 1109.2543
Epoch 27/30, Loss: 1092.4640
Epoch 28/30, Loss: 1071.5891
Epoch 29/30, Loss: 1056.8135
Epoch 30/30, Loss: 1038.8655


In [10]:
# =====================================================
# 5. RMSE 평가
# =====================================================
regressor.eval()
with torch.no_grad():
    preds = regressor(dataset.X.to(device)).cpu().numpy().flatten()
    y_true = dataset.y.numpy().flatten()
    rmse = math.sqrt(mean_squared_error(y_true, preds))

print("최종 RMSE:", rmse)

최종 RMSE: 31.909612794138933


In [13]:
import random
import cv2
import matplotlib.pyplot as plt

# =====================================================
# 50개 랜덤 샘플 시각화 및 저장
# =====================================================
def visualize_batch(img_dir, json_dir, model_yolo, regressor, save_dir="vis_results", device="cpu", num_samples=50):
    os.makedirs(save_dir, exist_ok=True)

    img_files = [f for f in os.listdir(img_dir) if f.endswith(".jpg")]
    random.shuffle(img_files)
    img_files = img_files[:num_samples]

    for idx, img_name in enumerate(img_files):
        img_path = os.path.join(img_dir, img_name)
        json_path = os.path.join(json_dir, img_name.replace(".jpg",".json"))

        # --- GT 불러오기 ---
        with open(json_path, "r") as f:
            data = json.load(f)
        h_true = float(data[list(data.keys())[0]]["regions"][0]["region_attributes"]["chi_height_m"])

        # --- YOLO 예측 ---
        results = model_yolo.predict(img_path, verbose=False)
        if len(results[0].keypoints.xy) == 0:
            continue

        img = cv2.imread(img_path)
        kpts = results[0].keypoints.xy.cpu().numpy()[0]  # 첫 번째 객체만 예시
        x1, y1 = int(kpts[0][0]), int(kpts[0][1])
        x2, y2 = int(kpts[1][0]), int(kpts[1][1])

        # --- 메타데이터 로드 ---
        v = list(data.values())[0]
        res   = float(v["file_attributes"]["img_resolution"])
        roll  = float(v["file_attributes"]["img_roll_tilt"])
        pitch = float(v["file_attributes"]["img_pitch_tilt"])

        # --- 회귀 예측 ---
        pred_pixel_len = math.sqrt((x2-x1)**2 + (y2-y1)**2)
        feat = torch.tensor([[pred_pixel_len, res, roll, pitch]], dtype=torch.float32).to(device)
        h_pred = regressor(feat).item()

        # --- 시각화 ---
        cv2.circle(img, (x1,y1), 5, (0,0,255), -1)
        cv2.circle(img, (x2,y2), 5, (0,255,0), -1)
        cv2.line(img, (x1,y1), (x2,y2), (255,0,0), 2)

        text = f"Pred: {h_pred:.2f} m | GT: {h_true:.2f} m"
        cv2.putText(img, text, (30,30), cv2.FONT_HERSHEY_SIMPLEX, 
                    0.8, (255,255,255), 2, cv2.LINE_AA)

        # 저장
        save_path = os.path.join(save_dir, f"vis_{idx}_{img_name}")
        cv2.imwrite(save_path, img)

    print(f"[완료] {len(img_files)}개 샘플 시각화 → {save_dir} 폴더에 저장됨")


In [14]:
visualize_batch(
    img_dir="../valid/images",
    json_dir="../valid/json",
    model_yolo=model_yolo,
    regressor=regressor,
    save_dir="vis_results",
    device=device,
    num_samples=50
)


[완료] 50개 샘플 시각화 → vis_results 폴더에 저장됨
