<a href="https://colab.research.google.com/github/tntnu/20242R0136COSE41600/blob/main/20242R0136COSE41600.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install open3d

Collecting open3d
  Downloading open3d-0.18.0-cp310-cp310-manylinux_2_27_x86_64.whl.metadata (4.2 kB)
Collecting dash>=2.6.0 (from open3d)
  Downloading dash-2.18.2-py3-none-any.whl.metadata (10 kB)
Collecting configargparse (from open3d)
  Downloading ConfigArgParse-1.7-py3-none-any.whl.metadata (23 kB)
Collecting ipywidgets>=8.0.4 (from open3d)
  Downloading ipywidgets-8.1.5-py3-none-any.whl.metadata (2.3 kB)
Collecting addict (from open3d)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting pyquaternion (from open3d)
  Downloading pyquaternion-0.9.9-py3-none-any.whl.metadata (1.4 kB)
Collecting werkzeug>=2.2.3 (from open3d)
  Downloading werkzeug-3.0.6-py3-none-any.whl.metadata (3.7 kB)
Collecting dash-html-components==2.0.0 (from dash>=2.6.0->open3d)
  Downloading dash_html_components-2.0.0-py3-none-any.whl.metadata (3.8 kB)
Collecting dash-core-components==2.0.0 (from dash>=2.6.0->open3d)
  Downloading dash_core_components-2.0.0-py3-none-any.whl.metadata (2.9 

# 계층적 샘플링

In [None]:
import os
import random
import shutil
from collections import defaultdict

def get_files_by_behavior(folders):
    """
    각 행동(폴더)에서 PCD 파일을 분류하여 반환.
    """
    behavior_files = defaultdict(list)
    for folder in folders:
        behavior_name = os.path.basename(os.path.dirname(folder))  # 폴더 이름을 행동 이름으로 사용
        for file_name in os.listdir(folder):
            if file_name.endswith('.pcd'):
                file_path = os.path.join(folder, file_name)
                behavior_files[behavior_name].append(file_path)
    return behavior_files

def stratified_sampling(behavior_files, behavior_ratios):
    """
    각 행동별로 설정된 비율로 데이터를 샘플링.
    """
    sampled_files = set()  # 중복 방지를 위해 집합(set) 사용
    for behavior, files in behavior_files.items():
        sample_ratio = behavior_ratios.get(behavior, 0.3)  # 비율이 지정되지 않은 경우 기본값 0.3
        sample_size = int(len(files) * sample_ratio)  # 행동별 샘플링 개수
        sampled_behavior_files = random.sample(files, min(sample_size, len(files)))  # 샘플링
        sampled_files.update(sampled_behavior_files)  # 집합에 추가 (중복 방지)
        print(f"[{behavior}] 총 {len(files)}개 중 {len(sampled_behavior_files)}개 샘플링")
    return list(sampled_files)  # 최종 결과를 리스트로 반환

def save_sampled_files(sampled_files, output_folder):
    """
    샘플링된 파일을 지정된 폴더에 저장.
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_path in sampled_files:
        file_name = os.path.basename(file_path)
        shutil.copy(file_path, os.path.join(output_folder, file_name))

In [None]:
# 행동별 PCD 파일이 저장된 폴더 리스트
folders = [
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/01_straight_walk/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/02_straight_duck_walk/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/03_straight_crawl/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/04_zigzag_walk/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/05_straight_duck_walk/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/06_straight_crawl/pcd",
    "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/data/07_straight_walk/pcd"
]

# 행동별 샘플링 비율 설정
behavior_ratios = {
    "01_straight_walk": 0.6,
    "02_straight_duck_walk": 0.5,
    "03_straight_crawl": 0.3,
    "04_zigzag_walk": 0.5,
    "05_straight_duck_walk": 0.5,
    "06_straight_crawl": 0.4,
    "07_straight_walk": 0.5
}

In [None]:
# 1. 행동별 파일 리스트 가져오기
behavior_files = get_files_by_behavior(folders)

# 각 행동별 파일 수 출력 (디버깅용)
for behavior, files in behavior_files.items():
    print(f"[{behavior}] 총 파일 수: {len(files)}")

[01_straight_walk] 총 파일 수: 288
[02_straight_duck_walk] 총 파일 수: 549
[03_straight_crawl] 총 파일 수: 1260
[04_zigzag_walk] 총 파일 수: 364
[05_straight_duck_walk] 총 파일 수: 577
[06_straight_crawl] 총 파일 수: 774
[07_straight_walk] 총 파일 수: 443


In [None]:
# 2. 계층적 샘플링 수행
sampled_files = stratified_sampling(behavior_files, behavior_ratios)

print("\n샘플링된 총 파일 수:", len(sampled_files))

[01_straight_walk] 총 288개 중 172개 샘플링
[02_straight_duck_walk] 총 549개 중 274개 샘플링
[03_straight_crawl] 총 1260개 중 378개 샘플링
[04_zigzag_walk] 총 364개 중 182개 샘플링
[05_straight_duck_walk] 총 577개 중 288개 샘플링
[06_straight_crawl] 총 774개 중 309개 샘플링
[07_straight_walk] 총 443개 중 221개 샘플링

샘플링된 총 파일 수: 1824


In [None]:
def save_sampled_files(sampled_files, output_folder):
    """
    샘플링된 파일을 지정된 폴더에 저장하며 파일 이름이 고유하도록 수정.
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_path in sampled_files:
        file_name = os.path.basename(file_path)
        behavior_name = os.path.basename(os.path.dirname(file_path))  # 폴더 이름 가져오기
        unique_file_name = f"{behavior_name}_{file_name}"  # 고유 이름 생성
        save_path = os.path.join(output_folder, unique_file_name)
        shutil.copy(file_path, save_path)


In [None]:
# 샘플링된 파일 저장
output_folder = "/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/sampled_pcd_files"
save_sampled_files(sampled_files, output_folder)

# 저장된 파일 개수 출력
saved_files_count = len(os.listdir(output_folder))
print(f"저장된 파일 개수: {saved_files_count}")


저장된 파일 개수: 1630


# 최적의 파라미터

In [None]:
zip_file_path = '/content/drive/MyDrive/ColabNotebooks/sampled_pcd_files.zip'

# 3. 압축 파일 풀기
import zipfile

# 압축을 풀 디렉토리 설정
extracted_folder = '/content/drive/MyDrive/ColabNotebooks/optimal/'

# 압축 풀기
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder)

print(f"압축 해제된 파일이 {extracted_folder}에 저장되었습니다.")

# 4. 압축 풀린 파일 목록 확인
import os
extracted_files = os.listdir(extracted_folder)
print("압축 풀린 파일 목록:")
print(extracted_files)

# 5. 예시로 첫 번째 파일을 Open3D로 읽어보기
import open3d as o3d

# 예시로 첫 번째 .pcd 파일을 읽어봄
for filename in extracted_files:
    if filename.endswith('.pcd'):
        pcd_path = os.path.join(extracted_folder, filename)
        print(f"Processing {pcd_path}")
        pcd = o3d.io.read_point_cloud(pcd_path)
        o3d.visualization.draw_geometries([pcd])
        break  # 첫 번째 파일만 처리 후 종료

In [None]:
import os
import numpy as np
import open3d as o3d

# PCD 파일 로드 및 voxel 다운샘플링 함수
def voxel_down_sample(pcd, voxel_size):
    return pcd.voxel_down_sample(voxel_size)

def find_optimal_voxel_size_for_all(pcd_files, pcd_directory):
    # 테스트할 voxel 크기 범위 정의
    voxel_sizes = np.arange(0.01, 0.1, 0.01)  # 0.01부터 0.1까지 0.01 간격
    voxel_size_scores = []

    # 각 voxel 크기에 대해 모든 PCD 파일을 테스트
    for voxel_size in voxel_sizes:
        total_points_after_downsampling = 0

        for pcd_file in pcd_files:
            pcd_path = os.path.join(pcd_directory, pcd_file)
            pcd = o3d.io.read_point_cloud(pcd_path)

            # 다운샘플링
            downsampled_pcd = voxel_down_sample(pcd, voxel_size)

            # 다운샘플링 후 포인트 개수 누적
            total_points_after_downsampling += len(downsampled_pcd.points)

        # 평균 포인트 개수 계산
        avg_points = total_points_after_downsampling / len(pcd_files)
        voxel_size_scores.append((voxel_size, avg_points))

    # 평균 포인트 개수가 가장 적은 voxel size 선택
    optimal_voxel_size = min(voxel_size_scores, key=lambda x: x[1])[0]
    return optimal_voxel_size

# PCD 파일이 있는 디렉토리 경로
pcd_directory = "/content/drive/MyDrive/ColabNotebooks/optimal/sampled_pcd_files"

# 디렉토리 내 모든 PCD 파일 목록
pcd_files = [f for f in os.listdir(pcd_directory) if f.endswith('.pcd')]

# PCD 파일이 존재하는지 확인
if not pcd_files:
    print("Error: No .pcd files found in the specified directory.")
else:
    # 모든 파일에서 최적의 voxel size 계산
    optimal_voxel_size = find_optimal_voxel_size_for_all(pcd_files, pcd_directory)

    # 최적의 voxel size 출력
    print(f"Final chosen optimal voxel size: {optimal_voxel_size}")

# Final chosen optimal voxel size: 0.09


In [None]:
import os
import numpy as np
import open3d as o3d


def find_optimal_voxel_size(pcd, voxel_sizes):
    """
    주어진 Voxel Size 리스트에서 최적의 값을 찾습니다.
    최적의 기준: 다운샘플링 후 포인트 개수가 가장 작으면서 포인트가 유지되는 값
    """
    results = []
    for voxel_size in voxel_sizes:
        downsample_pcd = pcd.voxel_down_sample(voxel_size)
        results.append((voxel_size, len(downsample_pcd.points)))

    # 포인트 개수가 최소가 되는 voxel_size 반환
    optimal_voxel_size = min(results, key=lambda x: x[1])
    return optimal_voxel_size


def find_optimal_ror(pcd, nb_points_list, radius_list):
    """
    주어진 ROR 파라미터(nb_points, radius) 조합에서 최적의 값을 찾습니다.
    최적의 기준: 필터링 후 남은 포인트 개수가 가장 많은 조합
    """
    results = []
    for nb_points in nb_points_list:
        for radius in radius_list:
            cl, ind = pcd.remove_radius_outlier(nb_points=nb_points, radius=radius)
            filtered_pcd = pcd.select_by_index(ind)
            results.append(((nb_points, radius), len(filtered_pcd.points)))

    # 포인트 개수가 최대가 되는 nb_points, radius 조합 반환
    optimal_ror = max(results, key=lambda x: x[1])
    return optimal_ror


# PCD 파일 경로
pcd_file_path = "/content/drive/MyDrive/ColabNotebooks/optimal/sampled_pcd_files"

# PCD 파일 로드
if os.path.exists(pcd_file_path):
    original_pcd = o3d.io.read_point_cloud(pcd_file_path)
else:
    raise FileNotFoundError(f"PCD file not found: {pcd_file_path}")

# Voxel Size 탐색
voxel_sizes = np.arange(0.01, 0.2, 0.01)  # 테스트할 Voxel Size 리스트
optimal_voxel_size, downsample_points = find_optimal_voxel_size(original_pcd, voxel_sizes)
print(f"Optimal Voxel Size: {optimal_voxel_size:.2f}, Points After Downsampling: {downsample_points}")

# ROR 탐색
nb_points_list = range(5, 20, 2)  # nb_points 범위
radius_list = np.arange(0.5, 2.0, 0.1)  # radius 범위
optimal_ror, filtered_points = find_optimal_ror(original_pcd, nb_points_list, radius_list)
print(f"Optimal ROR Parameters: nb_points={optimal_ror[0]}, radius={optimal_ror[1]:.2f}, Points After Filtering: {filtered_points}")

# Optimal Voxel Size: 0.01, Optimal ROR Parameters: nb_points=5, radius=0.50


# 전처리

In [None]:
import open3d as o3d
import numpy as np
import os

# PCD 파일이 저장된 폴더 경로
pcd_folder = "C:\\Users\\tntnu\\Desktop\\sampled_pcd_files"
output_folder = "C:\\Users\\tntnu\\Desktop\\processed_pcd_results"

# 결과 저장 폴더 생성
os.makedirs(output_folder, exist_ok=True)

# 전처리 및 바운딩 박스 생성 함수
def process_pcd(file_path):
    try:
        # PCD 파일 읽기
        original_pcd = o3d.io.read_point_cloud(file_path)

        # Voxel Downsampling
        voxel_size = 0.2
        downsample_pcd = original_pcd.voxel_down_sample(voxel_size=voxel_size)

        # Radius Outlier Removal (ROR)
        cl, ind = downsample_pcd.remove_radius_outlier(nb_points=6, radius=1.2)
        ror_pcd = downsample_pcd.select_by_index(ind)

        # RANSAC 평면 분리
        plane_model, inliers = ror_pcd.segment_plane(distance_threshold=0.1, ransac_n=3, num_iterations=2000)

        # 평면 제외 포인트 추출
        final_point = ror_pcd.select_by_index(inliers, invert=True)

        # DBSCAN 클러스터링
        labels = np.array(final_point.cluster_dbscan(eps=0.3, min_points=10, print_progress=False))

        # 노이즈 포인트는 검정색, 클러스터 포인트는 파란색으로 설정
        colors = np.zeros((len(labels), 3))  # 기본 검정색
        colors[labels >= 0] = [0, 0, 1]  # 파란색
        final_point.colors = o3d.utility.Vector3dVector(colors)

        # 필터링 기준
        min_points_in_cluster = 5
        max_points_in_cluster = 40
        min_z_value = -1.5
        max_z_value = 2.5
        min_height = 0.5
        max_height = 2.0
        max_distance = 30.0

        # 조건을 만족하는 클러스터의 바운딩 박스 생성
        bboxes = []
        for i in range(labels.max() + 1):
            cluster_indices = np.where(labels == i)[0]
            if min_points_in_cluster <= len(cluster_indices) <= max_points_in_cluster:
                cluster_pcd = final_point.select_by_index(cluster_indices)
                points = np.asarray(cluster_pcd.points)
                z_values = points[:, 2]
                z_min = z_values.min()
                z_max = z_values.max()
                if min_z_value <= z_min and z_max <= max_z_value:
                    height_diff = z_max - z_min
                    if min_height <= height_diff <= max_height:
                        distances = np.linalg.norm(points, axis=1)
                        if distances.max() <= max_distance:
                            bbox = cluster_pcd.get_axis_aligned_bounding_box()
                            bbox.color = (1, 0, 0)
                            bboxes.append(bbox)

        return final_point, bboxes
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None, None


In [None]:
# 모든 PCD 파일 처리
pcd_files = [f for f in os.listdir(pcd_folder) if f.endswith('.pcd')]

for i, pcd_file in enumerate(pcd_files):
    print(f"Processing file {i + 1}/{len(pcd_files)}: {pcd_file}")
    file_path = os.path.join(pcd_folder, pcd_file)

    # 포인트 클라우드와 바운딩 박스 추출
    final_point, bboxes = process_pcd(file_path)

    if final_point is not None and bboxes:
        # 결과 저장
        processed_file_path = os.path.join(output_folder, f"processed_{pcd_file}")
        o3d.io.write_point_cloud(processed_file_path, final_point)

        # 바운딩 박스 저장
        bbox_file_path = os.path.join(output_folder, f"bboxes_{pcd_file.replace('.pcd', '.txt')}")
        with open(bbox_file_path, 'w') as f:
            for bbox in bboxes:
                bbox_points = np.asarray(bbox.get_box_points())
                for point in bbox_points:
                    f.write(f"{point[0]} {point[1]} {point[2]}\n")
                f.write("\n")
    else:
        print(f"No valid clusters found in {pcd_file}")

print("Processing complete. Results are saved in:", output_folder)


In [None]:
zip_file_path = '/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/processed_pcd_results.zip'

# 3. 압축 파일 풀기
import zipfile

# 압축을 풀 디렉토리 설정
extracted_folder = '/content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/processed_pcd_results/'

# 압축 풀기
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder)

print(f"압축 해제된 파일이 {extracted_folder}에 저장되었습니다.")

# 4. 압축 풀린 파일 목록 확인
import os
extracted_files = os.listdir(extracted_folder)
print("압축 풀린 파일 목록:")
print(extracted_files)

# 5. 예시로 첫 번째 파일을 Open3D로 읽어보기
import open3d as o3d

# 예시로 첫 번째 .pcd 파일을 읽어봄
for filename in extracted_files:
    if filename.endswith('.pcd'):
        pcd_path = os.path.join(extracted_folder, filename)
        print(f"Processing {pcd_path}")
        pcd = o3d.io.read_point_cloud(pcd_path)
        o3d.visualization.draw_geometries([pcd])
        break  # 첫 번째 파일만 처리 후 종료

압축 해제된 파일이 /content/drive/MyDrive/ColabNotebooks/20242R0136COSE41600/COSE416_HW1_data_v1/processed_pcd_results/에 저장되었습니다.
압축 풀린 파일 목록:
['processed_pcd_results']


# PointNet

In [None]:
import h5py
import numpy as np
import os
import open3d as o3d

# 전처리된 PCD 데이터 경로
processed_pcd_folder = "C:\\Users\\tntnu\\Desktop\\processed_pcd_results"
output_h5_file = "C:\\Users\\tntnu\\Desktop\\pointnet_human_detection.h5"

# 최대 포인트 수 (PointNet 입력 요구사항에 맞게 조정)
max_points = 1024

# 데이터와 라벨 저장 리스트
data = []
labels = []  # 1: 사람, 0: 비사람 (라벨은 수동 설정 필요)

# 전처리된 PCD 데이터를 불러와서 HDF5 파일로 변환
for file_name in os.listdir(processed_pcd_folder):
    if file_name.endswith(".pcd"):
        file_path = os.path.join(processed_pcd_folder, file_name)

        # Open3D를 사용하여 PCD 파일 로드
        pcd = o3d.io.read_point_cloud(file_path)
        points = np.asarray(pcd.points)

        # 포인트 클라우드 정규화 (중심 이동 및 범위 조정)
        centroid = np.mean(points, axis=0)
        points -= centroid
        max_dist = np.max(np.linalg.norm(points, axis=1))
        points /= max_dist

        # 포인트 클라우드 포인트 개수를 max_points로 조정 (패딩 또는 샘플링)
        if len(points) > max_points:
            indices = np.random.choice(len(points), max_points, replace=False)
            points = points[indices]
        elif len(points) < max_points:
            pad = np.zeros((max_points - len(points), 3))
            points = np.vstack((points, pad))

        # 데이터를 리스트에 추가 (라벨은 수동 설정)
        data.append(points)

        # 라벨 추가 (예: "processed_person_001.pcd"는 사람으로 라벨링)
        label = 1 if "person" in file_name.lower() else 0
        labels.append(label)

In [None]:
# Numpy 배열로 변환
data = np.array(data, dtype=np.float32)
labels = np.array(labels, dtype=np.int64)

# HDF5 파일로 저장
with h5py.File(output_h5_file, "w") as hf:
    hf.create_dataset("data", data=data)
    hf.create_dataset("label", data=labels)

print(f"HDF5 파일 생성 완료: {output_h5_file}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PointNet(nn.Module):
    def __init__(self, num_classes=2):
        super(PointNet, self).__init__()
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(p=0.3)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = torch.max(x, 2, keepdim=True)[0]
        x = x.view(-1, 1024)
        x = F.relu(self.bn4(self.fc1(x)))
        x = F.relu(self.bn5(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# HDF5 파일 로드
with h5py.File(output_h5_file, "r") as hf:
    data = hf["data"][:]
    labels = hf["label"][:]

# PyTorch 데이터셋 준비
data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)  # (N, 3, 1024)
labels = torch.tensor(labels, dtype=torch.long)
dataset = TensorDataset(data, labels)
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

In [None]:
# 모델 초기화
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PointNet(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# 훈련 루프
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}")

In [None]:
# 모델 저장
torch.save(model.state_dict(), "pointnet_human_detection.pth")
print("훈련 완료 및 모델 저장 완료!")

In [None]:
# 평가 함수
def evaluate_model(model, dataloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    return correct / total

# 평가 데이터 준비
test_loader = DataLoader(dataset, batch_size=16, shuffle=False)

# 모델 평가
accuracy = evaluate_model(model, test_loader)
print(f"Test Accuracy: {accuracy * 100:.2f}%")

In [None]:
# PointNet 모델 로드
class PointNet(nn.Module):
    def __init__(self, num_classes=2):
        super(PointNet, self).__init__()
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(p=0.3)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = torch.max(x, 2, keepdim=True)[0]
        x = x.view(-1, 1024)
        x = F.relu(self.bn4(self.fc1(x)))
        x = F.relu(self.bn5(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# 모델 로드
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PointNet(num_classes=2).to(device)
model.load_state_dict(torch.load("pointnet_human_detection.pth", map_location=device))
model.eval()

In [None]:
# 테스트용 PCD 파일 경로
test_pcd_file = "C:\\Users\\tntnu\\Desktop\\COSE416_HW1_data_v1\\data\\01_straight_walk\\pcd\\pcd_000001.pcd"

# 포인트 클라우드 데이터 로드 및 정규화
pcd = o3d.io.read_point_cloud(test_pcd_file)
points = np.asarray(pcd.points)

In [None]:
# 정규화 (훈련 시 데이터와 동일하게 전처리)
centroid = np.mean(points, axis=0)
points -= centroid
max_dist = np.max(np.linalg.norm(points, axis=1))
points /= max_dist

# 포인트 개수 맞추기
max_points = 1024
if len(points) > max_points:
    indices = np.random.choice(len(points), max_points, replace=False)
    points = points[indices]
elif len(points) < max_points:
    pad = np.zeros((max_points - len(points), 3))
    points = np.vstack((points, pad))

In [None]:
# 모델 입력으로 변환
input_tensor = torch.tensor(points, dtype=torch.float32).unsqueeze(0).permute(0, 2, 1).to(device)

# 모델 예측
with torch.no_grad():
    output = model(input_tensor)
    prediction = torch.argmax(output, dim=1).item()

In [None]:
# 결과 출력 및 시각화
print(f"Prediction: {'Human Detected' if prediction == 1 else 'No Human Detected'}")

if prediction == 1:
    bbox = pcd.get_axis_aligned_bounding_box()
    bbox.color = (1, 0, 0)  # 빨간색

    # 헤드리스 시각화 저장
    vis = o3d.visualization.Visualizer()
    vis.create_window(visible=False)
    vis.add_geometry(pcd)
    vis.add_geometry(bbox)
    vis.capture_screen_image("output_image.png")
    vis.destroy_window()
    print("Visualization saved as output_image.png")
else:
    print("No human detected; no visualization saved.")

# 최적화 PointNet(모델 예측 시각화 오류)

In [None]:
import open3d as o3d
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import h5py

# ========= PCD 처리 및 바운딩 박스 생성 ========= #
def process_pcd(file_path):
    try:
        # PCD 파일 읽기
        original_pcd = o3d.io.read_point_cloud(file_path)

        # Voxel Downsampling
        voxel_size = 0.01
        downsample_pcd = original_pcd.voxel_down_sample(voxel_size=voxel_size)

        # Radius Outlier Removal (ROR)
        cl, ind = downsample_pcd.remove_radius_outlier(nb_points=5, radius=0.5)
        ror_pcd = downsample_pcd.select_by_index(ind)

        # RANSAC 평면 분리
        plane_model, inliers = ror_pcd.segment_plane(distance_threshold=0.1, ransac_n=3, num_iterations=2000)

        # 평면 제외 포인트 추출
        final_point = ror_pcd.select_by_index(inliers, invert=True)

        # DBSCAN 클러스터링
        labels = np.array(final_point.cluster_dbscan(eps=0.3, min_points=10, print_progress=False))

        # 노이즈 포인트는 검정색, 클러스터 포인트는 파란색으로 설정
        colors = np.zeros((len(labels), 3))  # 기본 검정색
        colors[labels >= 0] = [0, 0, 1]  # 파란색
        final_point.colors = o3d.utility.Vector3dVector(colors)

        # 필터링 기준
        min_points_in_cluster = 5
        max_points_in_cluster = 500
        min_z_value = -1.5
        max_z_value = 2.5
        min_height = 0.5
        max_height = 2.0
        max_distance = 30.0

        # 조건을 만족하는 클러스터의 바운딩 박스 생성
        bboxes = []
        for i in range(labels.max() + 1):
            cluster_indices = np.where(labels == i)[0]
            if min_points_in_cluster <= len(cluster_indices) <= max_points_in_cluster:
                cluster_pcd = final_point.select_by_index(cluster_indices)
                points = np.asarray(cluster_pcd.points)
                z_values = points[:, 2]
                z_min = z_values.min()
                z_max = z_values.max()
                if min_z_value <= z_min and z_max <= max_z_value:
                    height_diff = z_max - z_min
                    if min_height <= height_diff <= max_height:
                        distances = np.linalg.norm(points, axis=1)
                        if distances.max() <= max_distance:
                            bbox = cluster_pcd.get_axis_aligned_bounding_box()
                            bbox.color = (1, 0, 0)
                            bboxes.append((bbox, cluster_pcd))

        return final_point, bboxes
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None, None

# ========= PointNet 모델 정의 ========= #
class PointNet(nn.Module):
    def __init__(self, num_classes=2):
        super(PointNet, self).__init__()
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(p=0.3)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = torch.max(x, 2, keepdim=True)[0]
        x = x.view(-1, 1024)
        x = F.relu(self.bn4(self.fc1(x)))
        x = F.relu(self.bn5(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# ========= 포인트 클라우드 처리 및 PointNet 테스트 ========= #
def detect_humans_in_pcd(file_path, model, device, max_points=1024):
    # PCD 처리
    final_pcd, bboxes = process_pcd(file_path)
    if final_pcd is None or not bboxes:
        print("No valid clusters or bounding boxes detected.")
        return

    # 바운딩 박스 내부 포인트 처리
    for bbox, cluster_pcd in bboxes:
        points = np.asarray(cluster_pcd.points)
        if len(points) == 0:
            continue

        # 정규화
        centroid = np.mean(points, axis=0)
        points -= centroid
        max_dist = np.max(np.linalg.norm(points, axis=1))
        points /= max_dist

        # 샘플링 및 패딩
        if len(points) > max_points:
            indices = np.random.choice(len(points), max_points, replace=False)
            points = points[indices]
        elif len(points) < max_points:
            pad = np.zeros((max_points - len(points), 3))
            points = np.vstack((points, pad))

        # 모델 입력 준비
        input_tensor = torch.tensor(points, dtype=torch.float32).unsqueeze(0).permute(0, 2, 1).to(device)

        # 모델 예측
        with torch.no_grad():
            output = model(input_tensor)
            prediction = torch.argmax(output, dim=1).item()

        # 예측 결과 처리
        if prediction == 1:  # 사람 탐지
            bbox.color = (1, 0, 0)  # 빨간색
            print("Human detected in bounding box.")
        else:
            bbox.color = (0, 1, 0)  # 초록색
            print("No human detected in bounding box.")

    # 결과 시각화
    geometries = [final_pcd] + [bbox[0] for bbox in bboxes]
    o3d.visualization.draw_geometries(geometries)

# ========== mAP 계산 함수 ========== #
def compute_map(h5_file, model_save_path):
    # HDF5 파일 로드
    with h5py.File(h5_file, "r") as hf:
        data = hf["data"][:]
        labels = hf["label"][:]

    # 데이터 준비
    data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(data, labels)
    test_loader = DataLoader(dataset, batch_size=16, shuffle=False)

    # 모델 로드
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_save_path, map_location=device))
    model.eval()

    # 예측값 및 실제값 저장
    all_targets = []
    all_scores = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            scores = F.softmax(outputs, dim=1)[:, 1]  # "Human" 클래스의 확률

            all_targets.extend(targets.cpu().numpy())
            all_scores.extend(scores.cpu().numpy())

    # Precision-Recall Curve 계산
    precision, recall, _ = precision_recall_curve(all_targets, all_scores)
    ap = average_precision_score(all_targets, all_scores)

    # 그래프 그리기
    plt.figure()
    plt.step(recall, precision, where='post', label=f"AP={ap:.4f}")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision-Recall Curve")
    plt.legend(loc="lower left")
    plt.grid()
    plt.show()

    print(f"Average Precision (AP): {ap:.4f}")
    return ap

# ========== mAP 계산 함수 ========== #
def compute_map(h5_file, model_save_path):
    # HDF5 파일 로드
    with h5py.File(h5_file, "r") as hf:
        data = hf["data"][:]
        labels = hf["label"][:]

    # 데이터 준비
    data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(data, labels)
    test_loader = DataLoader(dataset, batch_size=16, shuffle=False)

    # 모델 로드
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_save_path, map_location=device))
    model.eval()

    # 예측값 및 실제값 저장
    all_targets = []
    all_scores = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            scores = F.softmax(outputs, dim=1)[:, 1]  # "Human" 클래스의 확률

            all_targets.extend(targets.cpu().numpy())
            all_scores.extend(scores.cpu().numpy())

    # Precision-Recall Curve 계산
    precision, recall, _ = precision_recall_curve(all_targets, all_scores)
    ap = average_precision_score(all_targets, all_scores)

    # 그래프 그리기
    plt.figure()
    plt.step(recall, precision, where='post', label=f"AP={ap:.4f}")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision-Recall Curve")
    plt.legend(loc="lower left")
    plt.grid()
    plt.show()

    print(f"Average Precision (AP): {ap:.4f}")
    return ap



# ========= 실행 코드 ========= #
if __name__ == "__main__":
    # 파일 경로 설정
    test_pcd_file = "C:\\Users\\tntnu\\Desktop\\COSE416_HW1_data_v1\\data\\01_straight_walk\\pcd\\pcd_000001.pcd"
    model_path = "pointnet_human_detection.pth"

    # 장치 설정
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 모델 로드
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    # mAP 계산
    compute_map(output_h5_file, model_save_path)

    # PCD 파일 처리 및 탐지 실행
    detect_humans_in_pcd(test_pcd_file, model, device)


# 최적화 PointNet (모델 예측 시각화 오류 해결 시도)

In [None]:
# 최적화 PointNet 전처리 이후부터 테스트 시각화 부분 수정

import os
import h5py
import numpy as np
import open3d as o3d
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# ========== 경로 설정 ========== #
processed_pcd_folder = "C:\\Users\\tntnu\\Desktop\\processed_pcd_results_optimal"  # 전처리된 PCD 파일 경로
output_h5_file = "C:\\Users\\tntnu\\Desktop\\pointnet_human_detection.h5"  # 학습 데이터 저장 파일
model_save_path = "C:\\Users\\tntnu\\Desktop\\pointnet_human_detection.pth"  # 학습된 모델 저장 경로
test_pcd_file = "C:\\Users\\tntnu\\Desktop\\COSE416_HW1_tutorial\\test_data" # 테스트용 PCD 파일 경로

# ========== PointNet 모델 정의 ========== #
class PointNet(nn.Module):
    def __init__(self, num_classes=2):
        super(PointNet, self).__init__()
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(p=0.3)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
        self.bn4 = nn.BatchNorm1d(512)
        self.bn5 = nn.BatchNorm1d(256)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = torch.max(x, 2, keepdim=True)[0]
        x = x.view(-1, 1024)
        x = F.relu(self.bn4(self.fc1(x)))
        x = F.relu(self.bn5(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

# ========== HDF5 데이터 생성 ========== #
def create_hdf5_from_pcd(processed_pcd_folder, output_h5_file, max_points=1024):
    data = []
    labels = []

    for file_name in os.listdir(processed_pcd_folder):
        if file_name.endswith(".pcd"):
            file_path = os.path.join(processed_pcd_folder, file_name)
            pcd = o3d.io.read_point_cloud(file_path)
            points = np.asarray(pcd.points)

            # 정규화
            centroid = np.mean(points, axis=0)
            points -= centroid
            max_dist = np.max(np.linalg.norm(points, axis=1))
            points /= max_dist

            # 샘플링
            if len(points) > max_points:
                indices = np.random.choice(len(points), max_points, replace=False)
                points = points[indices]
            elif len(points) < max_points:
                pad = np.zeros((max_points - len(points), 3))
                points = np.vstack((points, pad))

            # 라벨링 (파일명에 따라 라벨 결정)
            label = 1 if "person" in file_name.lower() else 0
            data.append(points)
            labels.append(label)

    # Numpy 배열로 변환
    data = np.array(data, dtype=np.float32)
    labels = np.array(labels, dtype=np.int64)

    # HDF5 파일로 저장
    with h5py.File(output_h5_file, "w") as hf:
        hf.create_dataset("data", data=data)
        hf.create_dataset("label", data=labels)
    print(f"HDF5 파일 생성 완료: {output_h5_file}")

# ========== PointNet 학습 ========== #
def train_pointnet(h5_file, model_save_path, num_epochs=30, batch_size=16, learning_rate=0.001):
    # HDF5 파일 로드
    with h5py.File(h5_file, "r") as hf:
        data = hf["data"][:]
        labels = hf["label"][:]

    # 데이터 준비
    data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)  # (N, 3, 1024)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(data, labels)
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 모델 초기화
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 학습 루프
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader):.4f}")

    torch.save(model.state_dict(), model_save_path)
    print(f"모델 저장 완료: {model_save_path}")

# ========== 모델 평가 ========== #
def evaluate_model(h5_file, model_save_path):
    # HDF5 파일 로드
    with h5py.File(h5_file, "r") as hf:
        data = hf["data"][:]
        labels = hf["label"][:]

    # 데이터 준비
    data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(data, labels)
    test_loader = DataLoader(dataset, batch_size=16, shuffle=False)

    # 모델 로드
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_save_path, map_location=device, weights_only=True))
    model.eval()

    # 정확도 계산
    correct, total = 0, 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()

    accuracy = correct / total
    print(f"모델 정확도: {accuracy * 100:.2f}%")

# ========== 테스트 ========== #
def test_pointnet(model_save_path, test_pcd_file, max_points=1024):
    # PCD 파일이 디렉토리일 경우, 첫 번째 PCD 파일을 선택
    if os.path.isdir(test_pcd_file):
        pcd_files = [f for f in os.listdir(test_pcd_file) if f.endswith('.pcd')]
        if len(pcd_files) == 0:
            print("Error: No PCD files found in the directory.")
            return
        test_pcd_file = os.path.join(test_pcd_file, pcd_files[0])

    # PCD 로드 및 전처리
    pcd = o3d.io.read_point_cloud(test_pcd_file)

    if len(pcd.points) == 0:
        print("Error: The point cloud is empty.")
        return

    points = np.asarray(pcd.points)

    # 정규화
    centroid = np.mean(points, axis=0)
    points -= centroid
    max_dist = np.max(np.linalg.norm(points, axis=1))
    points /= max_dist

    # 샘플링
    if len(points) > max_points:
        indices = np.random.choice(len(points), max_points, replace=False)
        points = points[indices]
    elif len(points) < max_points:
        pad = np.zeros((max_points - len(points), 3))
        points = np.vstack((points, pad))

    # 모델 로드
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_save_path, map_location=device, weights_only=True))
    model.eval()

    # 모델 예측
    input_tensor = torch.tensor(points, dtype=torch.float32).unsqueeze(0).permute(0, 2, 1).to(device)
    with torch.no_grad():
        output = model(input_tensor)
        prediction = torch.argmax(output, dim=1).item()

    # 예측 결과 출력
    print(f"Prediction: {'Human Detected' if prediction == 1 else 'No Human Detected'}")

    # 포인트 클라우드 시각화
    pcd.points = o3d.utility.Vector3dVector(points)

    if prediction == 1:
        # 사람 감지 시 빨간색으로 시각화
        color = [1, 0, 0]  # Red
    else:
        # 사람 미감지 시 파란색으로 시각화
        color = [0, 0, 1]  # Blue

    pcd.paint_uniform_color(color)  # 전체 포인트 클라우드 색상 변경

    # 시각화
    o3d.visualization.draw_geometries([pcd])

# ========== mAP 계산 함수 ========== #
def compute_map(h5_file, model_save_path):
    # HDF5 파일 로드
    with h5py.File(h5_file, "r") as hf:
        data = hf["data"][:]
        labels = hf["label"][:]

    # 데이터 준비
    data = torch.tensor(data, dtype=torch.float32).permute(0, 2, 1)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = TensorDataset(data, labels)
    test_loader = DataLoader(dataset, batch_size=16, shuffle=False)

    # 모델 로드
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = PointNet(num_classes=2).to(device)
    model.load_state_dict(torch.load(model_save_path, map_location=device))
    model.eval()

    # 예측값 및 실제값 저장
    all_targets = []
    all_scores = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            scores = F.softmax(outputs, dim=1)[:, 1]  # "Human" 클래스의 확률

            all_targets.extend(targets.cpu().numpy())
            all_scores.extend(scores.cpu().numpy())

    # Precision-Recall Curve 계산
    precision, recall, _ = precision_recall_curve(all_targets, all_scores)
    ap = average_precision_score(all_targets, all_scores)

    # 그래프 그리기
    plt.figure()
    plt.step(recall, precision, where='post', label=f"AP={ap:.4f}")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.title("Precision-Recall Curve")
    plt.legend(loc="lower left")
    plt.grid()
    plt.show()

    print(f"Average Precision (AP): {ap:.4f}")
    return ap


# ========== 실행 코드 ========== #
if __name__ == "__main__":
    # HDF5 데이터 생성
    create_hdf5_from_pcd(processed_pcd_folder, output_h5_file)

    # 모델 학습
    train_pointnet(output_h5_file, model_save_path)

    # 모델 평가
    evaluate_model(output_h5_file, model_save_path)

    # mAP 계산
    compute_map(output_h5_file, model_save_path)

    # 테스트 실행
    test_pointnet(model_save_path, test_pcd_file)

# Transformer

In [None]:
import torch
import torchvision.transforms as T
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

# 1. DETR 모델 로드 (ResNet-50 기반 pretrained 모델)
model = torch.hub.load('facebookresearch/detr', 'detr_resnet50', pretrained=True)
model.eval()

# COCO 데이터셋 클래스 (DETR이 COCO 데이터셋으로 학습됨)
CLASSES = [
    'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat',
    'traffic light', 'fire hydrant', 'N/A', 'stop sign', 'parking meter', 'bench', 'bird', 'cat',
    'dog'
]


In [None]:
# 2. 이미지 전처리 함수
transform = T.Compose([
    T.Resize(800),  # 이미지 크기를 800px로 조정
    T.ToTensor(),  # 텐서 변환
    T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # 정규화
])

In [None]:
# 3. 사람 탐지 함수 정의
def detect_person(image_path):
    # 이미지 로드 및 전처리
    image = Image.open(image_path).convert("RGB")
    img_tensor = transform(image).unsqueeze(0)  # 배치 차원 추가

    # 모델 추론
    with torch.no_grad():
        outputs = model(img_tensor)

    # 출력에서 클래스 및 바운딩 박스 정보 추출
    logits = outputs['pred_logits'][0]  # [N, num_classes]
    boxes = outputs['pred_boxes'][0]    # [N, 4]

    # Softmax를 사용해 클래스 확률 계산
    prob = logits.softmax(-1)
    labels = prob[..., :-1].argmax(-1)  # 마지막 클래스는 'no object'
    scores = prob.max(-1).values

    # "person" 클래스에 해당하는 바운딩 박스만 필터링
    person_boxes = []
    for i, label in enumerate(labels):
        if label == CLASSES.index('person') and scores[i] > 0.7:  # 신뢰도 임계값 0.7
            box = boxes[i].cpu().numpy()
            person_boxes.append((box, scores[i].item()))

    return image, person_boxes

In [None]:
# 4. 바운딩 박스 시각화 함수
def plot_results(image, person_boxes):
    # 원본 이미지를 NumPy 배열로 변환
    image_np = np.array(image)
    plt.figure(figsize=(10, 8))
    plt.imshow(image_np)

    # 바운딩 박스 그리기
    for box, score in person_boxes:
        # 바운딩 박스는 정규화된 좌표값으로 제공되므로, 이미지 크기에 맞게 변환
        h, w = image_np.shape[:2]
        xmin, ymin, xmax, ymax = box * np.array([w, h, w, h])

        # 바운딩 박스 그리기
        plt.gca().add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                          fill=False, edgecolor='red', linewidth=2))
        plt.text(xmin, ymin, f"Person: {score:.2f}", color='white',
                 bbox=dict(facecolor='red', edgecolor='none', pad=2))

    plt.axis('off')
    plt.show()

In [None]:
# 5. 이미지 파일 경로 설정
image_path = "path_to_your_image.jpg"  # 테스트할 도로 주행 이미지 경로

In [None]:
# 6. 사람 탐지 및 결과 시각화
image, person_boxes = detect_person(image_path)
if person_boxes:
    print(f"Detected {len(person_boxes)} person(s) in the image.")
    plot_results(image, person_boxes)
else:
    print("No person detected.")