In [1]:
# 객체 클래스 탐색

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "KF_vid3.mp4"
VIDEO_OUTPUT_PATH = "detection_check_output.mp4"
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.7  # 임계값을 조절하여 탐지 결과를 필터링할 수 있습니다.

# --- Hugging Face 모델 및 프로세서 로드 ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print(f"Hugging Face 모델 '{HF_MODEL_NAME}' 로드 성공.")
except Exception as e:
    print(f"오류: Hugging Face 모델 '{HF_MODEL_NAME}' 로드 실패. ({e})")
    exit()

def detect_and_draw_all_objects(frame_color_cv):
    """
    프레임 내에서 신뢰도 임계값을 넘는 모든 객체를 탐지하고,
    바운딩 박스와 클래스 이름을 프레임에 직접 그립니다.
    탐지된 모든 클래스 이름의 집합(set)을 반환합니다.
    """
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")

    with torch.no_grad():
        outputs = model(**inputs)

    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs,
                                                             threshold=CONFIDENCE_THRESHOLD,
                                                             target_sizes=target_sizes)[0]

    detected_classes_in_frame = set()

    # 탐지된 모든 객체에 대해 루프를 돕니다.
    for score, label_id, box in zip(results["scores"], results["labels"], results["boxes"]):
        # 클래스 이름 가져오기
        label_name = model.config.id2label[label_id.item()]
        detected_classes_in_frame.add(label_name)

        # 바운딩 박스 좌표 계산
        startX, startY, endX, endY = map(int, box.tolist())

        # 프레임에 바운딩 박스 그리기 (클래스별로 다른 색상)
        color = np.random.randint(0, 255, size=3).tolist()
        cv2.rectangle(frame_color_cv, (startX, startY), (endX, endY), color, 2)

        # 클래스 이름과 신뢰도 점수 표시
        text = f"{label_name}: {score:.2f}"
        cv2.putText(frame_color_cv, text, (startX, startY - 10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    return detected_classes_in_frame

# ==============================================================
#  메인 실행 루프
# ==============================================================
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
if not cap.isOpened():
    print(f"오류: '{VIDEO_INPUT_PATH}' 영상을 열 수 없습니다.")
    exit()

# 비디오 저장 설정
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, fourcc, fps, (frame_width, frame_height))

print("영상에서 탐지되는 모든 클래스 확인을 시작합니다...")

frame_idx = 0
last_detected_classes = set()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    output_frame = frame.copy()

    # 현재 프레임에서 모든 객체 탐지 및 그리기
    detected_classes = detect_and_draw_all_objects(output_frame)

    # 콘솔에 탐지된 클래스 목록 출력 (이전 프레임과 다를 경우에만)
    if detected_classes and detected_classes != last_detected_classes:
        print(f"[프레임 {frame_idx}] 탐지된 클래스: {sorted(list(detected_classes))}")
        last_detected_classes = detected_classes
    elif not detected_classes and last_detected_classes:
        print(f"[프레임 {frame_idx}] 탐지된 클래스 없음")
        last_detected_classes = set()


    # 화면 좌측 상단에 현재 프레임에서 탐지된 클래스 목록 표시
    y_offset = 30
    cv2.putText(output_frame, "Detected Classes:", (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
    for cls_name in sorted(list(detected_classes)):
        y_offset += 25
        cv2.putText(output_frame, f"- {cls_name}", (15, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)


    # 결과 비디오에 프레임 쓰기
    out_video.write(output_frame)
    frame_idx += 1

# --- 종료 ---
cap.release()
out_video.release()
cv2.destroyAllWindows()
print("\n작업 완료.")
print(f"탐지 결과가 표시된 비디오가 다음 경로에 저장되었습니다: {VIDEO_OUTPUT_PATH}")

KeyboardInterrupt: 

In [2]:
# 핀홀 카메라 등속도 모델

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "bowling.mp4" # 분석한 영상 파일명으로 변경
VIDEO_OUTPUT_PATH = "bowling_P_o.mp4"
USER_TARGET_CLASS_NAME = 'sports ball' # ★★★ 타겟 변경
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- EKF를 위한 물리 파라미터 ---
FOCAL_LENGTH_PIXELS = 1200
REAL_OBJECT_WIDTH_M = 0.22
REAL_OBJECT_HEIGHT_M = 0.22

# --- 시스템 상태 정의 ---
# <<< [OPTIMIZED] 반응성 개선을 위해 노이즈 파라미터 대폭 수정 >>>
# Q: 프로세스 노이즈. 모델 예측의 불확실성. 값을 높여 반응성을 키움.
q_pos_std = 5.0   # 픽셀 속도 변화의 불확실성 증가 (기존 0.1)
q_depth_std = 10.0 # 깊이 속도 변화의 불확실성 대폭 증가 (기존 1.0)
Q = np.diag([0, 0, 0, q_pos_std**2, q_pos_std**2, q_depth_std**2])

# R: 측정 노이즈. 측정값의 불확실성. 값을 낮춰 측정값을 더 신뢰하게 함.
r_pos_std = 3.0 # 픽셀 측정 정확도 신뢰도 증가 (기존 5.0)
r_size_std = 5.0 # 크기 측정 정확도 신뢰도 증가 (기존 10.0)
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 및 프로세서 로드 ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e:
    print(f"오류: Hugging Face 모델 로드 실패. ({e})")
    exit()

# --- EKF 관련 함수 정의 ---
def h_measurement_function(x_state):
    cx, cy, Z, _, _, _ = x_state
    
    # <<< [OPTIMIZED] 필터 안정성 강화를 위한 Z값 하한 설정 >>>
    Z = np.clip(Z, 0.1, 1000) # Z가 최소 10cm 이상, 최대 1km 라고 제한

    w_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / Z
    h_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / Z
    
    return np.array([cx, cy, w_pred, h_pred])

def calculate_jacobian_Hj(x_eval):
    _, _, Z, _, _, _ = x_eval

    # <<< [OPTIMIZED] 필터 안정성 강화를 위한 Z값 하한 설정 >>>
    Z = np.clip(Z, 0.1, 1000)
    
    H_j = np.zeros((4, 6))
    H_j[0, 0] = 1
    H_j[1, 1] = 1
    H_j[2, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / (Z**2)
    H_j[3, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / (Z**2)
    
    return H_j

# (객체 탐지 함수는 변경 없음)
def detect_object_huggingface(frame_color_cv):
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")
    with torch.no_grad(): outputs = model(**inputs)
    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs, threshold=CONFIDENCE_THRESHOLD, target_sizes=target_sizes)[0]
    best_target_detection = None
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if model.config.id2label[label.item()].lower() == USER_TARGET_CLASS_NAME:
            box = [round(i, 2) for i in box.tolist()]
            w, h = box[2] - box[0], box[3] - box[1]
            cx, cy = box[0] + w/2, box[1] + h/2
            current_score = score.item()
            if best_target_detection is None or current_score > best_target_detection[0]:
                 best_target_detection = (current_score, np.array([cx, cy, w, h]))
    return best_target_detection[1] if best_target_detection else None

# ==============================================================
# 단계 1: 비디오 및 EKF 초기화
# ==============================================================
print("단계 1: 비디오 및 EKF 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, fourcc, fps, (frame_width, frame_height))

A = np.eye(6)
A[0, 3], A[1, 4], A[2, 5] = dt, dt, dt

x_est = np.zeros(6)
P = np.eye(6) * 1000.0
is_initialized = False
print("단계 1 완료.")

# ==============================================================
# 단계 2: 영상 처리 및 EKF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 EKF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        cx_meas, cy_meas, w_meas, h_meas = measurement_k
        if h_meas > 1: # <<< [OPTIMIZED] 너무 작은 박스는 초기화에 사용하지 않음
            initial_Z = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / h_meas
            x_est[0], x_est[1], x_est[2] = cx_meas, cy_meas, initial_Z
            P = np.eye(6) * 100.0
            is_initialized = True
            print(f"프레임 {frame_idx}에서 EKF 초기화. 추정된 초기 깊이 Z = {initial_Z:.2f} m")

    if is_initialized:
        x_pred = A @ x_est
        P_pred = A @ P @ A.T + Q

        if measurement_k is not None:
            H_j_k = calculate_jacobian_Hj(x_pred)
            z_pred = h_measurement_function(x_pred)
            y = measurement_k - z_pred
            S = H_j_k @ P_pred @ H_j_k.T + R
            K = P_pred @ H_j_k.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(6) - K @ H_j_k) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # ==============================================================
    # 단계 3: 시각화
    # ==============================================================
    if measurement_k is not None:
        cx, cy, w, h = measurement_k
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 255, 0), 2)
        cv2.putText(output_frame, "Detection", (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

    if is_initialized:
        est_box_params = h_measurement_function(x_est)
        cx, cy, w, h = est_box_params
        est_Z = x_est[2]
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 0, 255), 2)
        cv2.putText(output_frame, f"EKF (Z:{est_Z:.1f}m)", (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# ==============================================================
# 단계 4: 종료
# ==============================================================
print("단계 4: 처리 완료 및 종료.")
cap.release()
out_video.release()
cv2.destroyAllWindows()
print(f"EKF 처리 완료. 결과 비디오가 다음 경로에 저장되었습니다: {VIDEO_OUTPUT_PATH}")

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Hugging Face 모델 로드 성공.
단계 1: 비디오 및 EKF 초기화 중...
단계 1 완료.
단계 2: 영상 처리 및 EKF 적용 시작...
프레임 4에서 EKF 초기화. 추정된 초기 깊이 Z = 3.13 m
프레임 30 처리 중...
프레임 60 처리 중...
프레임 90 처리 중...
프레임 120 처리 중...
프레임 150 처리 중...
프레임 180 처리 중...
프레임 210 처리 중...
프레임 240 처리 중...
프레임 270 처리 중...
프레임 300 처리 중...
프레임 330 처리 중...
프레임 360 처리 중...
프레임 390 처리 중...
단계 4: 처리 완료 및 종료.
EKF 처리 완료. 결과 비디오가 다음 경로에 저장되었습니다: bowling_P_o.mp4


In [None]:
# 핀홀 카메라 등가속도 모델

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "KF_vid2.mp4" # 분석한 영상 파일명으로 변경
VIDEO_OUTPUT_PATH = "EKF_vid2_o2.mp4"
USER_TARGET_CLASS_NAME = 'sports ball'
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- EKF를 위한 물리 파라미터 ---
FOCAL_LENGTH_PIXELS = 1200
REAL_OBJECT_WIDTH_M = 0.22
REAL_OBJECT_HEIGHT_M = 0.22

# --- 시스템 상태 정의 (9-DOF) ---
# 상태 변수 x = [cx, cy, Z, v_cx, v_cy, v_Z, a_cx, a_cy, a_Z]' (9x1)
# 측정값 z = [cx, cy, w, h]' (4x1)

# <<< [9-DOF] Q: 프로세스 노이즈. 가속도의 불확실성(Jerk)에 해당. >>>
# 이 값을 조절하여 가속도 변화에 대한 민감도를 튜닝.
q_accel_pos_std = 50.0  # 픽셀 가속도 변화의 불확실성 (더 높은 값으로 시작)
q_accel_depth_std = 100.0 # 깊이 가속도 변화의 불확실성 (더 높은 값으로 시작)
Q = np.diag([0, 0, 0, 0, 0, 0, q_accel_pos_std**2, q_accel_pos_std**2, q_accel_depth_std**2])

# R: 측정 노이즈. 이전과 동일.
r_pos_std = 3.0
r_size_std = 5.0
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 로드 (변경 없음) ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e: exit(f"모델 로드 실패: {e}")

# --- EKF 관련 함수 정의 ---
def h_measurement_function(x_state_9d):
    # <<< [9-DOF] 9차원 상태 벡터를 입력받지만, 위치 항만 사용 >>>
    cx, cy, Z = x_state_9d[0], x_state_9d[1], x_state_9d[2]
    Z = np.clip(Z, 0.1, 1000)
    w_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / Z
    h_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / Z
    return np.array([cx, cy, w_pred, h_pred])

def calculate_jacobian_Hj(x_eval_9d):
    # <<< [9-DOF] 자코비안 행렬이 4x9 차원으로 확장 >>>
    Z = x_eval_9d[2]
    Z = np.clip(Z, 0.1, 1000)
    H_j = np.zeros((4, 9)) # 4x9 행렬
    H_j[0, 0] = 1
    H_j[1, 1] = 1
    H_j[2, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / (Z**2)
    H_j[3, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / (Z**2)
    return H_j

# (객체 탐지 함수는 변경 없음)
def detect_object_huggingface(frame_color_cv):
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")
    with torch.no_grad(): outputs = model(**inputs)
    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs, threshold=CONFIDENCE_THRESHOLD, target_sizes=target_sizes)[0]
    best_target_detection = None
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if model.config.id2label[label.item()].lower() == USER_TARGET_CLASS_NAME:
            box = [round(i, 2) for i in box.tolist()]
            w, h = box[2] - box[0], box[3] - box[1]
            cx, cy = box[0] + w/2, box[1] + h/2
            current_score = score.item()
            if best_target_detection is None or current_score > best_target_detection[0]:
                 best_target_detection = (current_score, np.array([cx, cy, w, h]))
    return best_target_detection[1] if best_target_detection else None


# ==============================================================
# 단계 1: 비디오 및 EKF 초기화
# ==============================================================
print("단계 1: 비디오 및 EKF 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

# <<< [9-DOF] 상태 전이 행렬 A를 9x9로 정의 >>>
A = np.eye(9)
for i in range(3):
    A[i, i+3] = dt
    A[i, i+6] = 0.5 * dt**2
    A[i+3, i+6] = dt

x_est = np.zeros(9) # 9차원 상태 벡터
P = np.eye(9) * 1000.0 # 9x9 공분산 행렬
is_initialized = False
print("단계 1 완료.")

# ==============================================================
# 단계 2: 영상 처리 및 EKF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 EKF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        cx_meas, cy_meas, w_meas, h_meas = measurement_k
        if h_meas > 1:
            initial_Z = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / h_meas
            # <<< [9-DOF] 9차원 상태 벡터 초기화 >>>
            x_est[0], x_est[1], x_est[2] = cx_meas, cy_meas, initial_Z
            # 속도, 가속도는 0으로 시작
            P = np.eye(9) * 100.0
            is_initialized = True
            print(f"프레임 {frame_idx}에서 9-DOF EKF 초기화. 추정된 초기 깊이 Z = {initial_Z:.2f} m")

    if is_initialized:
        # EKF 예측 단계
        x_pred = A @ x_est
        P_pred = A @ P @ A.T + Q

        # EKF 갱신 단계
        if measurement_k is not None:
            H_j_k = calculate_jacobian_Hj(x_pred)
            z_pred = h_measurement_function(x_pred)
            y = measurement_k - z_pred
            S = H_j_k @ P_pred @ H_j_k.T + R
            K = P_pred @ H_j_k.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(9) - K @ H_j_k) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # ==============================================================
    # 단계 3: 시각화
    # ==============================================================
    if measurement_k is not None:
        cx, cy, w, h = measurement_k
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 255, 0), 1)
        # cv2.putText(output_frame, "Detection", (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

    if is_initialized:
        est_box_params = h_measurement_function(x_est)
        cx, cy, w, h = est_box_params
        est_Z = x_est[2]
        # <<< [9-DOF] 추정된 가속도 값 시각화 >>>
        est_a_cx, est_a_cy, est_a_Z = x_est[6], x_est[7], x_est[8]
        
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 0, 255), 2)
        info_text = f"EKF Z:{est_Z:.1f}m"
        accel_text = f"a(px/s2):({int(est_a_cx)}, {int(est_a_cy)}) aZ:{est_a_Z:.1f}"
        cv2.putText(output_frame, info_text, (int(cx-w/2), int(cy-h/2)-25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
        cv2.putText(output_frame, accel_text, (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 255), 1)


    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# 단계 4: 종료
print("단계 4: 처리 완료 및 종료.")
cap.release(), out_video.release(), cv2.destroyAllWindows()
print(f"9-DOF EKF 처리 완료. 결과: {VIDEO_OUTPUT_PATH}")

In [1]:
# 핀홀 카메라 등가속도 + 공기저항 모델

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "KF_vid2.mp4"
VIDEO_OUTPUT_PATH = "EKF_vid2_Drag.mp4"
USER_TARGET_CLASS_NAME = 'sports ball'
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- EKF를 위한 물리 파라미터 ---
FOCAL_LENGTH_PIXELS = 1200
REAL_OBJECT_WIDTH_M = 0.22
REAL_OBJECT_HEIGHT_M = 0.22
K_DRAG = 0.001  # <<< [DRAG] 공기 저항 계수 (튜닝 필요)

# --- 시스템 상태 정의 (9-DOF) ---
q_accel_pos_std = 50.0
q_accel_depth_std = 100.0
Q = np.diag([0, 0, 0, 0, 0, 0, q_accel_pos_std**2, q_accel_pos_std**2, q_accel_depth_std**2])

r_pos_std = 3.0
r_size_std = 5.0
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 로드 (변경 없음) ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e: exit(f"모델 로드 실패: {e}")

# --- EKF 관련 함수 정의 ---
def state_transition_function_f(x_prev, dt, k_d):
    """ <<< [DRAG] 공기 저항을 포함한 비선형 상태 전이 함수 """
    px, py, Z, vx, vy, vZ, ax, ay, aZ = x_prev
    
    # 3D 속도 벡터 및 속력 계산
    v_vec = np.array([vx, vy, vZ])
    speed = np.linalg.norm(v_vec)

    # 공기 저항에 의한 가속도 계산 (a_drag = -k_d * |v| * v)
    a_drag = -k_d * speed * v_vec
    
    # 최종 가속도 = 기본 가속도 + 저항 가속도
    ax_net = ax + a_drag[0]
    ay_net = ay + a_drag[1]
    aZ_net = aZ + a_drag[2]

    # 등가속도 운동 공식 적용
    px_next = px + vx * dt + 0.5 * ax_net * dt**2
    py_next = py + vy * dt + 0.5 * ay_net * dt**2
    Z_next = Z + vZ * dt + 0.5 * aZ_net * dt**2
    
    vx_next = vx + ax_net * dt
    vy_next = vy + ay_net * dt
    vZ_next = vZ + aZ_net * dt
    
    # 기본 가속도는 변하지 않는다고 가정
    ax_next, ay_next, aZ_next = ax, ay, aZ
    
    return np.array([px_next, py_next, Z_next, vx_next, vy_next, vZ_next, ax_next, ay_next, aZ_next])

def calculate_jacobian_F(x_eval, dt, k_d):
    """ <<< [DRAG] 비선형 상태 전이 함수 f(x)의 자코비안 F_k 계산 """
    _, _, _, vx, vy, vZ, _, _, _ = x_eval
    
    F = np.eye(9)
    # 위치, 속도, 가속도 간의 기본 관계 설정
    for i in range(3):
        F[i, i+3] = dt
        F[i, i+6] = 0.5 * dt**2
        F[i+3, i+6] = dt

    # 공기 저항 항의 편미분 추가
    s = np.sqrt(vx**2 + vy**2 + vZ**2)
    v = {'x': vx, 'y': vy, 'z': vZ}
    dims = ['x', 'y', 'z']
    
    epsilon = 1e-6
    if s > epsilon:
        for i, dim1 in enumerate(dims): # a_net_dim1
            for j, dim2 in enumerate(dims): # v_dim2
                # d(a_drag_i)/d(v_j)
                term1 = -k_d * (v[dim1] * v[dim2] / s)
                if i == j:
                    term1 += -k_d * s
                
                # F의 해당 위치에 편미분 값 추가
                # d(p_i)/d(v_j), d(v_i)/d(v_j)
                F[i, j+3] += 0.5 * term1 * dt**2
                F[i+3, j+3] += term1 * dt
    return F

# (h_measurement_function, calculate_jacobian_Hj, detect_object_huggingface 함수는 이전과 동일)
def h_measurement_function(x_state_9d):
    cx, cy, Z = x_state_9d[0], x_state_9d[1], x_state_9d[2]
    Z = np.clip(Z, 0.1, 1000); w_pred = (FOCAL_LENGTH_PIXELS*REAL_OBJECT_WIDTH_M)/Z
    h_pred = (FOCAL_LENGTH_PIXELS*REAL_OBJECT_HEIGHT_M)/Z; return np.array([cx, cy, w_pred, h_pred])
def calculate_jacobian_Hj(x_eval_9d):
    Z = np.clip(x_eval_9d[2], 0.1, 1000); H_j = np.zeros((4, 9)); H_j[0, 0]=1; H_j[1, 1]=1
    H_j[2, 2]=-(FOCAL_LENGTH_PIXELS*REAL_OBJECT_WIDTH_M)/(Z**2)
    H_j[3, 2]=-(FOCAL_LENGTH_PIXELS*REAL_OBJECT_HEIGHT_M)/(Z**2); return H_j
def detect_object_huggingface(frame_color_cv):
    image_pil=Image.fromarray(cv2.cvtColor(frame_color_cv,cv2.COLOR_BGR2RGB)); inputs=image_processor(images=image_pil,return_tensors="pt")
    with torch.no_grad(): outputs=model(**inputs)
    target_sizes=torch.tensor([image_pil.size[::-1]]); results=image_processor.post_process_object_detection(outputs,threshold=CONFIDENCE_THRESHOLD,target_sizes=target_sizes)[0]
    best_target_detection=None
    for s,l,b in zip(results["scores"],results["labels"],results["boxes"]):
        if model.config.id2label[l.item()].lower()==USER_TARGET_CLASS_NAME:
            b=[round(i,2) for i in b.tolist()]; w,h=b[2]-b[0],b[3]-b[1]; cx,cy=b[0]+w/2,b[1]+h/2
            if best_target_detection is None or s.item()>best_target_detection[0]: best_target_detection=(s.item(),np.array([cx,cy,w,h]))
    return best_target_detection[1] if best_target_detection else None

# ==============================================================
# 단계 1: 비디오 및 EKF 초기화
# ==============================================================
print("단계 1: 비디오 및 EKF 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

x_est = np.zeros(9); P = np.eye(9) * 1000.0; is_initialized = False
print("단계 1 완료.")

# ==============================================================
# 단계 2: 영상 처리 및 EKF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 EKF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        cx_meas, cy_meas, w_meas, h_meas = measurement_k
        if h_meas > 1:
            initial_Z = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / h_meas
            x_est[0], x_est[1], x_est[2] = cx_meas, cy_meas, initial_Z
            P = np.eye(9) * 100.0; is_initialized = True
            print(f"프레임 {frame_idx}에서 EKF 초기화. 초기 깊이 Z = {initial_Z:.2f} m")

    if is_initialized:
        # <<< [DRAG] EKF 예측 단계 수정 >>>
        F_k = calculate_jacobian_F(x_est, dt, K_DRAG)
        x_pred = state_transition_function_f(x_est, dt, K_DRAG)
        P_pred = F_k @ P @ F_k.T + Q

        # EKF 갱신 단계 (이전과 동일)
        if measurement_k is not None:
            H_j_k = calculate_jacobian_Hj(x_pred)
            z_pred = h_measurement_function(x_pred)
            y = measurement_k - z_pred
            S = H_j_k @ P_pred @ H_j_k.T + R
            K = P_pred @ H_j_k.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(9) - K @ H_j_k) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # 시각화 (이전과 동일)
    if measurement_k is not None:
        cx,cy,w,h=measurement_k; cv2.rectangle(output_frame,(int(cx-w/2),int(cy-h/2)),(int(cx+w/2),int(cy+h/2)),(0,255,0),1)
    if is_initialized:
        est_box=h_measurement_function(x_est); cx,cy,w,h=est_box; est_Z=x_est[2]; est_a=x_est[6:9]
        cv2.rectangle(output_frame,(int(cx-w/2),int(cy-h/2)),(int(cx+w/2),int(cy+h/2)),(0,0,255),2)
        info1=f"EKF Z:{est_Z:.1f}m"; info2=f"a(px/s2):({int(est_a[0])},{int(est_a[1])}) aZ:{est_a[2]:.1f}"
        cv2.putText(output_frame,info1,(int(cx-w/2),int(cy-h/2)-25),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,255),2)
        cv2.putText(output_frame,info2,(int(cx-w/2),int(cy-h/2)-10),cv2.FONT_HERSHEY_SIMPLEX,0.4,(0,0,255),1)

    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# 단계 4: 종료
print("단계 4: 처리 완료 및 종료.")
cap.release(), out_video.release(), cv2.destroyAllWindows()
print(f"공기 저항 모델 EKF 처리 완료. 결과: {VIDEO_OUTPUT_PATH}")

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Hugging Face 모델 로드 성공.
단계 1: 비디오 및 EKF 초기화 중...
단계 1 완료.
단계 2: 영상 처리 및 EKF 적용 시작...
프레임 0에서 EKF 초기화. 초기 깊이 Z = 4.49 m
프레임 30 처리 중...
프레임 60 처리 중...
프레임 90 처리 중...
프레임 120 처리 중...
프레임 150 처리 중...
프레임 180 처리 중...
프레임 210 처리 중...
프레임 240 처리 중...
프레임 270 처리 중...
프레임 300 처리 중...
프레임 330 처리 중...
프레임 360 처리 중...
프레임 390 처리 중...
프레임 420 처리 중...
프레임 450 처리 중...
프레임 480 처리 중...
프레임 510 처리 중...
프레임 540 처리 중...
프레임 570 처리 중...
프레임 600 처리 중...
프레임 630 처리 중...
프레임 660 처리 중...
프레임 690 처리 중...
프레임 720 처리 중...
프레임 750 처리 중...
프레임 780 처리 중...
단계 4: 처리 완료 및 종료.
공기 저항 모델 EKF 처리 완료. 결과: EKF_vid2_Drag.mp4


In [4]:
# 선형 바운딩 등가속도 모델

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "KF_vid2.mp4" # 분석한 영상 파일명으로 변경
VIDEO_OUTPUT_PATH = "EKF_vid2_o2.mp4"
USER_TARGET_CLASS_NAME = 'sports ball'
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- 시스템 상태 정의 (12-DOF Linear) ---
# <<< [LINEAR] 상태 변수가 12차원으로 확장: [cx, cy, w, h, v_cx, v_cy, v_w, v_h, a_cx, a_cy, a_w, a_h] >>>
# <<< [LINEAR] 물리 파라미터(초점거리, 실제크기)는 더 이상 필요 없음 >>>

# Q: 프로세스 노이즈. 가속도의 불확실성(Jerk).
q_accel_pos_std = 10.0  # 픽셀 위치 가속도 변화의 불확실성
q_accel_size_std = 10.0 # 픽셀 크기 가속도 변화의 불확실성
Q = np.diag([0,0,0,0, 0,0,0,0, q_accel_pos_std**2, q_accel_pos_std**2, q_accel_size_std**2, q_accel_size_std**2])

# R: 측정 노이즈.
r_pos_std = 3.0
r_size_std = 5.0
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 로드 (변경 없음) ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e: exit(f"모델 로드 실패: {e}")

# (객체 탐지 함수는 변경 없음)
def detect_object_huggingface(frame_color_cv):
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")
    with torch.no_grad(): outputs = model(**inputs)
    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs, threshold=CONFIDENCE_THRESHOLD, target_sizes=target_sizes)[0]
    best_target_detection = None
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if model.config.id2label[label.item()].lower() == USER_TARGET_CLASS_NAME:
            box = [round(i, 2) for i in box.tolist()]
            w, h = box[2] - box[0], box[3] - box[1]
            cx, cy = box[0] + w/2, box[1] + h/2
            current_score = score.item()
            if best_target_detection is None or current_score > best_target_detection[0]:
                 best_target_detection = (current_score, np.array([cx, cy, w, h]))
    return best_target_detection[1] if best_target_detection else None

# ==============================================================
# 단계 1: 비디오 및 칼만 필터 초기화
# ==============================================================
print("단계 1: 비디오 및 선형 칼만 필터 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

# <<< [LINEAR] 상태 전이 행렬 A를 12x12로 정의 >>>
A = np.eye(12)
for i in range(4):
    A[i, i+4] = dt
    A[i, i+8] = 0.5 * dt**2
    A[i+4, i+8] = dt

# <<< [LINEAR] 측정 행렬 H를 4x12 상수로 정의 (자코비안 불필요) >>>
# 측정값 [cx, cy, w, h]는 상태 벡터의 첫 4개 요소에 해당
H = np.zeros((4, 12))
H[0, 0] = 1
H[1, 1] = 1
H[2, 2] = 1
H[3, 3] = 1

x_est = np.zeros(12) # 12차원 상태 벡터
P = np.eye(12) * 1000.0 # 12x12 공분산 행렬
is_initialized = False
print("단계 1 완료.")

# ==============================================================
# 단계 2: 영상 처리 및 선형 KF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 선형 KF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        # <<< [LINEAR] 12차원 상태 벡터 초기화 >>>
        x_est[0:4] = measurement_k # 위치(cx,cy,w,h)는 측정값으로 초기화
        # 속도, 가속도는 0으로 시작
        P = np.eye(12) * 100.0
        is_initialized = True
        print(f"프레임 {frame_idx}에서 선형 KF 초기화.")

    if is_initialized:
        # 예측 단계
        x_pred = A @ x_est
        P_pred = A @ P @ A.T + Q

        # 갱신 단계
        if measurement_k is not None:
            # <<< [LINEAR] 표준 선형 칼만 필터 업데이트 공식 사용 >>>
            y = measurement_k - H @ x_pred
            S = H @ P_pred @ H.T + R
            K = P_pred @ H.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(12) - K @ H) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # ==============================================================
    # 단계 3: 시각화
    # ==============================================================
    if measurement_k is not None:
        cx, cy, w, h = measurement_k
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 255, 0), 1)

    if is_initialized:
        # <<< [LINEAR] 상태 벡터에서 직접 cx,cy,w,h 값을 가져옴 >>>
        cx, cy, w, h = x_est[0], x_est[1], x_est[2], x_est[3]
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 0, 255), 2)
        cv2.putText(output_frame, "Linear KF", (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# ==============================================================
# 단계 4: 종료
# ==============================================================
print("단계 4: 처리 완료 및 종료.")
cap.release(), out_video.release(), cv2.destroyAllWindows()
print(f"선형 KF 처리 완료. 결과: {VIDEO_OUTPUT_PATH}")

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Hugging Face 모델 로드 성공.
단계 1: 비디오 및 선형 칼만 필터 초기화 중...
단계 1 완료.
단계 2: 영상 처리 및 선형 KF 적용 시작...
프레임 0에서 선형 KF 초기화.
프레임 30 처리 중...
프레임 60 처리 중...
프레임 90 처리 중...
프레임 120 처리 중...
프레임 150 처리 중...
프레임 180 처리 중...
프레임 210 처리 중...
프레임 240 처리 중...
프레임 270 처리 중...
프레임 300 처리 중...
프레임 330 처리 중...
프레임 360 처리 중...
프레임 390 처리 중...
프레임 420 처리 중...
프레임 450 처리 중...
프레임 480 처리 중...
프레임 510 처리 중...
프레임 540 처리 중...
프레임 570 처리 중...
프레임 600 처리 중...
프레임 630 처리 중...
프레임 660 처리 중...
프레임 690 처리 중...
프레임 720 처리 중...
프레임 750 처리 중...
프레임 780 처리 중...
단계 4: 처리 완료 및 종료.
선형 KF 처리 완료. 결과: EKF_vid2_o2.mp4


In [1]:
# 선형 바운딩 등속도 모델

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "bowling.mp4"
VIDEO_OUTPUT_PATH = "bowling_o.mp4"
USER_TARGET_CLASS_NAME = 'sports ball'
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- 시스템 상태 정의 (8-DOF Linear CV) ---
# <<< [CV MODEL] 상태 변수: [cx, cy, w, h, v_cx, v_cy, v_w, v_h] >>>
# <<< [CV MODEL] 물리 파라미터 및 가속도 항 모두 제거 >>>

# Q: 프로세스 노이즈. 등속도 모델의 불확실성 (즉, 모델링되지 않은 가속도).
# 이 값을 키우면 반응성이, 줄이면 안정성이 높아짐.
q_vel_pos_std = 5.0  # 픽셀 위치 속도의 불확실성
q_vel_size_std = 5.0 # 픽셀 크기 속도의 불확실성
Q = np.diag([0, 0, 0, 0, q_vel_pos_std**2, q_vel_pos_std**2, q_vel_size_std**2, q_vel_size_std**2])

# R: 측정 노이즈.
r_pos_std = 3.0
r_size_std = 5.0
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 로드 (변경 없음) ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e: exit(f"모델 로드 실패: {e}")

# (객체 탐지 함수는 변경 없음)
def detect_object_huggingface(frame_color_cv):
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")
    with torch.no_grad(): outputs = model(**inputs)
    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs, threshold=CONFIDENCE_THRESHOLD, target_sizes=target_sizes)[0]
    best_target_detection = None
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if model.config.id2label[label.item()].lower() == USER_TARGET_CLASS_NAME:
            box = [round(i, 2) for i in box.tolist()]
            w, h = box[2] - box[0], box[3] - box[1]
            cx, cy = box[0] + w/2, box[1] + h/2
            current_score = score.item()
            if best_target_detection is None or current_score > best_target_detection[0]:
                 best_target_detection = (current_score, np.array([cx, cy, w, h]))
    return best_target_detection[1] if best_target_detection else None


# ==============================================================
# 단계 1: 비디오 및 칼만 필터 초기화
# ==============================================================
print("단계 1: 비디오 및 선형 칼만 필터 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

# <<< [CV MODEL] 상태 전이 행렬 A를 8x8로 정의 >>>
A = np.eye(8)
for i in range(4):
    A[i, i+4] = dt

# <<< [CV MODEL] 측정 행렬 H를 4x8 상수로 정의 >>>
H = np.zeros((4, 8))
for i in range(4):
    H[i, i] = 1

x_est = np.zeros(8) # 8차원 상태 벡터
P = np.eye(8) * 1000.0 # 8x8 공분산 행렬
is_initialized = False
print("단계 1 완료.")

# ==============================================================
# 단계 2: 영상 처리 및 선형 KF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 선형 KF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        # <<< [CV MODEL] 8차원 상태 벡터 초기화 >>>
        x_est[0:4] = measurement_k # 위치(cx,cy,w,h)는 측정값으로 초기화
        # 속도는 0으로 시작
        P = np.eye(8) * 100.0
        is_initialized = True
        print(f"프레임 {frame_idx}에서 선형 KF 초기화.")

    if is_initialized:
        # 예측 단계
        x_pred = A @ x_est
        P_pred = A @ P @ A.T + Q

        # 갱신 단계
        if measurement_k is not None:
            # <<< [CV MODEL] 표준 선형 칼만 필터 업데이트 공식 사용 >>>
            y = measurement_k - H @ x_pred
            S = H @ P_pred @ H.T + R
            K = P_pred @ H.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(8) - K @ H) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # ==============================================================
    # 단계 3: 시각화
    # ==============================================================
    if measurement_k is not None:
        cx, cy, w, h = measurement_k
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 255, 0), 1)

    if is_initialized:
        cx, cy, w, h = x_est[0], x_est[1], x_est[2], x_est[3]
        cv2.rectangle(output_frame, (int(cx-w/2), int(cy-h/2)), (int(cx+w/2), int(cy+h/2)), (0, 0, 255), 2)
        cv2.putText(output_frame, "Linear CV Model", (int(cx-w/2), int(cy-h/2)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# ==============================================================
# 단계 4: 종료
# ==============================================================
print("단계 4: 처리 완료 및 종료.")
cap.release(), out_video.release(), cv2.destroyAllWindows()
print(f"선형 등속도 모델 처리 완료. 결과: {VIDEO_OUTPUT_PATH}")

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Hugging Face 모델 로드 성공.
단계 1: 비디오 및 선형 칼만 필터 초기화 중...
단계 1 완료.
단계 2: 영상 처리 및 선형 KF 적용 시작...
프레임 4에서 선형 KF 초기화.
프레임 30 처리 중...
프레임 60 처리 중...
프레임 90 처리 중...
프레임 120 처리 중...
프레임 150 처리 중...
프레임 180 처리 중...
프레임 210 처리 중...
프레임 240 처리 중...
프레임 270 처리 중...
프레임 300 처리 중...
프레임 330 처리 중...
프레임 360 처리 중...
프레임 390 처리 중...
단계 4: 처리 완료 및 종료.
선형 등속도 모델 처리 완료. 결과: bowling_o.mp4


In [5]:
# 핀홀 카메라 등속도 모델 (로그 기록 기능 추가)

import cv2
import numpy as np
from transformers import AutoImageProcessor, AutoModelForObjectDetection
import torch
from PIL import Image
import csv  # <<< [LOG] CSV 라이브러리 추가

# --- 설정 변수 ---
VIDEO_INPUT_PATH = "bowling.mp4"
VIDEO_OUTPUT_PATH = "bowling_P_o_log.mp4"
USER_TARGET_CLASS_NAME = 'sports ball'
HF_MODEL_NAME = "facebook/detr-resnet-50"
CONFIDENCE_THRESHOLD = 0.5

# --- EKF를 위한 물리 파라미터 ---
FOCAL_LENGTH_PIXELS = 1200
REAL_OBJECT_WIDTH_M = 0.22
REAL_OBJECT_HEIGHT_M = 0.22

# --- 시스템 상태 정의 ---
q_pos_std = 5.0
q_depth_std = 5.0
Q = np.diag([0, 0, 0, q_pos_std**2, q_pos_std**2, q_depth_std**2])
r_pos_std = 3.0
r_size_std = 5.0
R = np.diag([r_pos_std**2, r_pos_std**2, r_size_std**2, r_size_std**2])

# --- Hugging Face 모델 로드 ---
try:
    image_processor = AutoImageProcessor.from_pretrained(HF_MODEL_NAME)
    model = AutoModelForObjectDetection.from_pretrained(HF_MODEL_NAME)
    print("Hugging Face 모델 로드 성공.")
except Exception as e: exit(f"모델 로드 실패: {e}")

# --- EKF 관련 함수 정의 (변경 없음) ---
def h_measurement_function(x_state):
    cx, cy, Z, _, _, _ = x_state
    Z = np.clip(Z, 0.1, 1000)
    w_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / Z
    h_pred = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / Z
    return np.array([cx, cy, w_pred, h_pred])
def calculate_jacobian_Hj(x_eval):
    _, _, Z, _, _, _ = x_eval
    Z = np.clip(Z, 0.1, 1000)
    H_j = np.zeros((4, 6)); H_j[0, 0] = 1; H_j[1, 1] = 1
    H_j[2, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_WIDTH_M) / (Z**2)
    H_j[3, 2] = -(FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / (Z**2)
    return H_j
def detect_object_huggingface(frame_color_cv):
    image_pil = Image.fromarray(cv2.cvtColor(frame_color_cv, cv2.COLOR_BGR2RGB))
    inputs = image_processor(images=image_pil, return_tensors="pt")
    with torch.no_grad(): outputs = model(**inputs)
    target_sizes = torch.tensor([image_pil.size[::-1]])
    results = image_processor.post_process_object_detection(outputs, threshold=CONFIDENCE_THRESHOLD, target_sizes=target_sizes)[0]
    best_target_detection = None
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        if model.config.id2label[label.item()].lower() == USER_TARGET_CLASS_NAME:
            box=[round(i,2) for i in box.tolist()]; w,h=box[2]-box[0],box[3]-box[1]; cx,cy=box[0]+w/2,box[1]+h/2
            if best_target_detection is None or score.item() > best_target_detection[0]:
                 best_target_detection = (score.item(), np.array([cx, cy, w, h]))
    return best_target_detection[1] if best_target_detection else None

# ==============================================================
# 단계 1: 비디오 및 EKF 초기화
# ==============================================================
print("단계 1: 비디오 및 EKF 초기화 중...")
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30
dt = 1.0 / fps
frame_width, frame_height = int(cap.get(3)), int(cap.get(4))
out_video = cv2.VideoWriter(VIDEO_OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

A = np.eye(6); A[0, 3], A[1, 4], A[2, 5] = dt, dt, dt
x_est = np.zeros(6); P = np.eye(6) * 1000.0; is_initialized = False

log_data = [] # <<< [LOG] 로그 데이터를 저장할 리스트 초기화
log_header = [ # <<< [LOG] CSV 파일의 헤더 정의
    'frame', 'is_detected',
    'pos_cx', 'pos_cy', 'pos_Z',
    'vel_cx', 'vel_cy', 'vel_Z',
    'P_diag_cx', 'P_diag_cy', 'P_diag_Z',
    'P_diag_vcx', 'P_diag_vcy', 'P_diag_vZ'
]

print("단계 1 완료.")
# ==============================================================
# 단계 2: 영상 처리 및 EKF 적용 루프
# ==============================================================
print("단계 2: 영상 처리 및 EKF 적용 시작...")
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret: break
    output_frame = frame.copy()
    
    measurement_k = detect_object_huggingface(frame)

    if not is_initialized and measurement_k is not None:
        if measurement_k[3] > 1:
            initial_Z = (FOCAL_LENGTH_PIXELS * REAL_OBJECT_HEIGHT_M) / measurement_k[3]
            x_est[0], x_est[1], x_est[2] = measurement_k[0], measurement_k[1], initial_Z
            P = np.eye(6) * 100.0; is_initialized = True
            print(f"프레임 {frame_idx}에서 EKF 초기화. 추정된 초기 깊이 Z = {initial_Z:.2f} m")

    if is_initialized:
        x_pred = A @ x_est
        P_pred = A @ P @ A.T + Q

        if measurement_k is not None:
            H_j_k = calculate_jacobian_Hj(x_pred)
            z_pred = h_measurement_function(x_pred)
            y = measurement_k - z_pred
            S = H_j_k @ P_pred @ H_j_k.T + R
            K = P_pred @ H_j_k.T @ np.linalg.inv(S)
            x_est = x_pred + K @ y
            P = (np.eye(6) - K @ H_j_k) @ P_pred
        else:
            x_est = x_pred
            P = P_pred

    # ==============================================================
    # 단계 3.5: 로그 기록
    # ==============================================================
    if is_initialized:
        P_diag = np.diag(P)
        current_log = {
            'frame': frame_idx,
            'is_detected': 1 if measurement_k is not None else 0,
            'pos_cx': x_est[0], 'pos_cy': x_est[1], 'pos_Z': x_est[2],
            'vel_cx': x_est[3], 'vel_cy': x_est[4], 'vel_Z': x_est[5],
            'P_diag_cx': P_diag[0], 'P_diag_cy': P_diag[1], 'P_diag_Z': P_diag[2],
            'P_diag_vcx': P_diag[3], 'P_diag_vcy': P_diag[4], 'P_diag_vZ': P_diag[5]
        }
        log_data.append(current_log)

    # (시각화 단계는 변경 없음)
    if measurement_k is not None:
        cx,cy,w,h=measurement_k; cv2.rectangle(output_frame,(int(cx-w/2),int(cy-h/2)),(int(cx+w/2),int(cy+h/2)),(0,255,0),2)
        cv2.putText(output_frame,"Detection",(int(cx-w/2),int(cy-h/2)-10),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,255,0),1)
    if is_initialized:
        est_box=h_measurement_function(x_est); cx,cy,w,h=est_box; est_Z=x_est[2]
        cv2.rectangle(output_frame,(int(cx-w/2),int(cy-h/2)),(int(cx+w/2),int(cy+h/2)),(0,0,255),2)
        cv2.putText(output_frame,f"EKF (Z:{est_Z:.1f}m)",(int(cx-w/2),int(cy-h/2)-10),cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,255),2)

    out_video.write(output_frame)
    frame_idx += 1
    if frame_idx % 30 == 0: print(f"프레임 {frame_idx} 처리 중...")

# ==============================================================
# 단계 4: 종료
# ==============================================================
print("단계 4: 처리 완료 및 종료.")
cap.release()
out_video.release()
cv2.destroyAllWindows()

# ==============================================================
# 단계 5: 로그 파일 저장
# ==============================================================
try:
    with open('log.csv', 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=log_header)
        writer.writeheader()
        writer.writerows(log_data)
    print(f"로그 데이터 저장 완료. -> log.csv")
except IOError:
    print("오류: 로그 파일을 저장할 수 없습니다.")

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Hugging Face 모델 로드 성공.
단계 1: 비디오 및 EKF 초기화 중...
단계 1 완료.
단계 2: 영상 처리 및 EKF 적용 시작...
프레임 4에서 EKF 초기화. 추정된 초기 깊이 Z = 3.13 m
프레임 30 처리 중...
프레임 60 처리 중...
프레임 90 처리 중...
프레임 120 처리 중...
프레임 150 처리 중...
프레임 180 처리 중...
프레임 210 처리 중...
프레임 240 처리 중...
프레임 270 처리 중...
프레임 300 처리 중...
프레임 330 처리 중...
프레임 360 처리 중...
프레임 390 처리 중...
단계 4: 처리 완료 및 종료.
오류: 로그 파일을 저장할 수 없습니다.
