# 공통 모델 등 세팅

In [1]:
import cv2
import numpy as np
import mediapipe as mp
import torch
import torch.nn.functional as F
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv, global_mean_pool
from sklearn.preprocessing import LabelEncoder
import os
import torch.nn as nn

# --- GCN 모델 클래스 ---
class GCNNet(nn.Module):
    def __init__(self, in_channels=3, hidden_channels=128, num_classes=3):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = nn.Linear(hidden_channels, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = F.relu(self.conv1(x, edge_index))
        x = F.relu(self.conv2(x, edge_index))
        x = F.relu(self.conv3(x, edge_index))
        x = global_mean_pool(x, batch)
        out = self.lin(x)
        return out

# --- MediaPipe 스켈레톤 엣지 (33개 노드) ---
edge_index = torch.tensor([
    [0,1, 1,2, 2,3, 3,7, 7,9, 9,11, 11,13, 13,15,
     2,4, 4,6, 6,8, 8,10, 10,12, 12,14, 14,16,
     2,17, 17,19, 19,21, 21,23, 23,25, 25,27,
     2,18, 18,20, 20,22, 22,24, 24,26, 26,28],
    [1,0, 2,1, 3,2, 7,3, 9,7, 11,9, 13,11, 15,13,
     4,2, 6,4, 8,6, 10,8, 12,10, 14,12, 16,14,
     17,2, 19,17, 21,19, 23,21, 25,23, 27,25,
     18,2, 20,18, 22,20, 24,22, 26,24, 28,26]
], dtype=torch.long)

edge_index_vis = torch.tensor([
    [1,2], [2,3], [3,7], [4,5], [5,6], [6,8], # 얼굴
    [1,9], [11,13], [13,15], [15,17], [15,19], [15,21], # 왼팔
    [4,10], [12,14], [14,16], [16,18], [16,20], [16,22], # 오른팔
    [23,25], [25,27], [27,29], [27,31], [29,31], # 왼다리
    [24,26], [26,28], [28,30], [28,32], [30,32], # 오른다리
    [11,12], [23,24], [11,23], [12,24]
], dtype=torch.long).t()

In [None]:
# --- 관절 완전성 체크 ---
VISIBILITY_THRESHOLD = 0.6
LEFT_LINE = [11, 13, 15, 23, 25, 27]
RIGHT_LINE = [12, 14, 16, 24, 26, 28]

def is_full_body(landmarks):
    def check_line(line):
        return all(landmarks[i].visibility >= VISIBILITY_THRESHOLD for i in line)
    return check_line(LEFT_LINE) or check_line(RIGHT_LINE)

def draw_skeleton(image, landmarks, edges):
    h, w, _ = image.shape
    for edge in edges.t().tolist():
        start_idx, end_idx = edge
        start = landmarks[start_idx]
        end = landmarks[end_idx]
        x1, y1 = int(start.x * w), int(start.y * h)
        x2, y2 = int(end.x * w), int(end.y * h)
        cv2.line(image, (x1, y1), (x2, y2), (255, 255, 0), 2)

# --- 모델 및 레이블 인코더 로드 ---
model_path = "./best_gcn_model.pth"
label_encoder_path = "./label_encoder.pkl"

model = GCNNet()  # 반드시 정의/임포트 필요
model.load_state_dict(torch.load(model_path, map_location='cpu'))
model.eval()

classes = np.load(label_encoder_path, allow_pickle=True)
label_encoder = LabelEncoder()
label_encoder.classes_ = classes

mp_pose = mp.solutions.pose.Pose(
    static_image_mode=False,
    model_complexity=2,
    min_detection_confidence=0.7,
    min_tracking_confidence=0.7
)

I0000 00:00:1754197651.037724  272493 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M3


W0000 00:00:1754197651.187561  319665 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1754197651.241055  319669 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [56]:
# --- 각도 계산 함수 (z 이진값 가중치 반영, use_z 플래그 포함) ---
def calculate_angle_weighted(a, b, c, z_binary_a=1, z_binary_b=1, z_binary_c=1, use_z=True):
    if not use_z:
        # 2D 벡터 (Z축 무시)
        ba = np.array([a.x - b.x, a.y - b.y])
        bc = np.array([c.x - b.x, c.y - b.y])
    else:
        # 3D 벡터 (Z축 포함)
        ba = np.array([
            a.x - b.x,
            a.y - b.y,
            (a.z - b.z) * ((z_binary_a + z_binary_b) / 2)
        ])
        bc = np.array([
            c.x - b.x,
            c.y - b.y,
            (c.z - b.z) * ((z_binary_c + z_binary_b) / 2)
        ])

    norm_ba = np.linalg.norm(ba)
    norm_bc = np.linalg.norm(bc)
    if norm_ba < 1e-6 or norm_bc < 1e-6:
        return 0.0

    cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc)
    return np.degrees(np.arccos(np.clip(cosine_angle, -1.0, 1.0)))

def calculate_neck_tilt_weighted(landmarks, z_binary, use_z):
    ear = landmarks[7] if landmarks[7].visibility > landmarks[8].visibility else landmarks[8]
    shoulder = landmarks[11] if landmarks[11].visibility > landmarks[12].visibility else landmarks[12]
    hip = landmarks[23] if landmarks[23].visibility > landmarks[24].visibility else landmarks[24]
    return calculate_angle_weighted(
        ear, shoulder, hip,
        z_binary[7], z_binary[11], z_binary[23],
        use_z
    )

def calculate_waist_angle_weighted(landmarks, z_binary, use_z):
    shoulder = landmarks[11] if landmarks[11].visibility > landmarks[12].visibility else landmarks[12]
    hip = landmarks[23] if landmarks[23].visibility > landmarks[24].visibility else landmarks[24]
    knee = landmarks[25] if landmarks[25].visibility > landmarks[26].visibility else landmarks[26]
    return calculate_angle_weighted(
        shoulder, hip, knee,
        z_binary[11], z_binary[23], z_binary[25],
        use_z
    )

def binary_z_values(landmarks, threshold=0.0):
    return [1 if lm.z > threshold else 0 for lm in landmarks]

POSTURE_ANGLE_THRESHOLDS = {
    'stand': {
        'front': {'neck_min': 165, 'neck_max': 180, 'waist_min': 165, 'waist_max': 180},
        'side':  {'neck_min': 170, 'neck_max': 180, 'waist_min': 170, 'waist_max': 180}
    },
    'sit': {
        'front': {'neck_min': 165, 'neck_max': 180, 'waist_min': 100, 'waist_max': 125},
        'side':  {'neck_min': 170, 'neck_max': 180, 'waist_min': 100, 'waist_max': 110}
    }
}

def check_posture_angles(posture, neck, waist, direction):
    t = POSTURE_ANGLE_THRESHOLDS.get(posture, {}).get(direction)
    if not t:
        return False
    return (
        t['neck_min'] <= neck <= t['neck_max'] and
        t['waist_min'] <= waist <= t['waist_max']
    )

# --- 프레임 처리 함수 ---
def process_frame(image, model, label_encoder, edge_index, mp_pose, threshold_z=0.0):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = mp_pose.process(image_rgb)
    if not results.pose_landmarks:
        return None, image

    landmarks = results.pose_landmarks.landmark
    z_binary = binary_z_values(landmarks, threshold=threshold_z)

    left_pattern = [z_binary[i] for i in LEFT_LINE]
    right_pattern = [z_binary[i] for i in RIGHT_LINE]
    left_consistent = all(v == left_pattern[0] for v in left_pattern)
    right_consistent = all(v == right_pattern[0] for v in right_pattern)
    is_side_pose = (left_consistent and right_consistent and (left_pattern[0] != right_pattern[0]))
    
    use_z = not is_side_pose  # 정면일 때만 z 반영, 측면은 False

    angle_text = ""
    if not is_full_body(landmarks):
        pred_class = 'etc'
        posture_ok = None
    else:
        data = Data(
            x=torch.tensor([[lm.x, lm.y, lm.z] for lm in landmarks], dtype=torch.float),
            edge_index=edge_index
        )
        data.batch = torch.zeros(data.x.size(0), dtype=torch.long)
        with torch.no_grad():
            out = model(data)
            probs = F.softmax(out, dim=1).cpu().numpy()[0]
            pred_index = probs.argmax()
            pred_class = label_encoder.inverse_transform([pred_index])[0]

        if pred_class == 'etc':
            posture_ok = None  # etc는 평가 제외
        else:
            neck_angle = calculate_neck_tilt_weighted(landmarks, z_binary, use_z)
            waist_angle = calculate_waist_angle_weighted(landmarks, z_binary, use_z)

            # print(f"[DEBUG] pred_class: {pred_class}, neck_angle: {neck_angle}, waist_angle: {waist_angle}")

            posture_ok = check_posture_angles(pred_class, neck_angle, waist_angle, 'side' if is_side_pose else 'front')

            # print(f"[DEBUG] posture_ok: {posture_ok}")

    h, w, _ = image.shape

    if pred_class == 'etc':
        posture_ok = None
        color = (0, 255, 255)
        status_text = "Class: etc (Not Evaluated)"
        pose_direction_text = ""
        # angle_mode_text = ""
    else:
        if posture_ok == True or posture_ok is True or bool(posture_ok) == True:
            color = (0, 255, 0)
            status_desc = 'Good'
        else:
            color = (0, 0, 255)
            status_desc = 'Bad'
        status_text = f"Class: {pred_class} ({status_desc})"
        angle_text = f"Neck: {neck_angle:.1f}°, Waist: {waist_angle:.1f}°"
        pose_direction_text = f"Pose: {'Side' if is_side_pose else 'Front'}"
        # angle_mode_text = f"Angle Mode: {'2D' if not use_z else '3D'}"

    draw_skeleton(image, landmarks, edge_index)
    # print(f"[DEBUG] status_text: {status_text}, color: {color}, posture_ok: {posture_ok}")

    cv2.putText(image, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
    if angle_text:
        cv2.putText(image, angle_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
    cv2.putText(image, pose_direction_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    # cv2.putText(image, angle_mode_text, (10, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    return posture_ok, image

# 웹캠 실시간 처리 및 녹화 저장

In [3]:
# --- 웹캠 실시간 처리 및 녹화 저장 함수 ---
def run_on_webcam_save(model, label_encoder, output_path="webcam_result.mp4"):
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("웹캠을 열 수 없습니다.")
        return

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0 or fps != fps:  # fps가 0이나 NaN인 경우 기본값 지정
        fps = 30.0

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print("웹캠 자세 인식 및 녹화 시작. 'q' 키를 눌러 종료하세요.")

    while True:
        ret, frame = cap.read()
        if not ret:
            print("프레임을 읽을 수 없습니다.")
            break

        posture_ok, annotated = process_frame(frame, model, label_encoder, edge_index_vis, mp_pose)
        if annotated is not None:
            out.write(annotated)
            cv2.imshow("Webcam Posture Detection", annotated)
        else:
            out.write(frame)
            cv2.imshow("Webcam Posture Detection", frame)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"녹화가 완료되었습니다: {output_path}")

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1754194150.680810  273490 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1754194150.719537  273497 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [58]:
if __name__ == "__main__":
    run_on_webcam_save(model, label_encoder)

웹캠 자세 인식 및 녹화 시작. 'q' 키를 눌러 종료하세요.
녹화가 완료되었습니다: webcam_result.mp4


# 맥 화면 실시간 자세 분석

In [None]:
from mss import mss
import time

# QuickTime 영상 위치와 크기 (수동 입력 예시)
monitor = {"top": 20, "left": 0, "width": 800, "height": 500}  # 필요 시 수정

def run_on_screen_capture(model, label_encoder):
    sct = mss()
    print("QuickTime 화면에서 실시간 자세 분석 중... (ESC or Ctrl+C 로 종료)")

    while True:
        start_time = time.time()

        # 화면 캡처
        sct_img = sct.grab(monitor)
        frame = np.array(sct_img)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)

        # 자세 분석
        _, annotated = process_frame(frame, model, label_encoder)

        # 결과 출력
        cv2.imshow("iPhone Camera Posture Detection", annotated)

        # ESC 누르면 종료
        if cv2.waitKey(1) == 27:
            break

        # FPS 제한
        elapsed = time.time() - start_time
        time.sleep(max(1.0 / 30 - elapsed, 0))  # 30fps로 제한

    cv2.destroyAllWindows()

In [None]:
if __name__ == "__main__":
    run_on_screen_capture(model, label_encoder)