In [None]:
import cv2
import mediapipe as mp
import csv
import numpy as np
import os
import time

def initialize_pose_model():
    """初始化MediaPipe Pose模型"""
    mp_pose = mp.solutions.pose
    return mp_pose.Pose(static_image_mode=False, 
                      min_detection_confidence=0.5,
                      min_tracking_confidence=0.5)

def analyze_posture(landmarks):
    """核心姿势分析逻辑"""
    # 定义关键点ID
    LEFT_SHOULDER_ID = 11
    RIGHT_SHOULDER_ID = 12
    LEFT_HIP_ID = 23
    RIGHT_HIP_ID = 24
    NOSE_ID = 0
    LEFT_WRIST_ID = 15
    RIGHT_WRIST_ID = 16

    # 计算中点
    shoulder_mid = (landmarks[LEFT_SHOULDER_ID] + landmarks[RIGHT_SHOULDER_ID]) / 2
    hip_mid = (landmarks[LEFT_HIP_ID] + landmarks[RIGHT_HIP_ID]) / 2
  
    # 计算向量
    spine_vector = hip_mid - shoulder_mid
    shoulder_vector = landmarks[RIGHT_SHOULDER_ID] - landmarks[LEFT_SHOULDER_ID]
    hip_vector = landmarks[RIGHT_HIP_ID] - landmarks[LEFT_HIP_ID]

    # 手腕位置判断
    left_wrist_vec = landmarks[LEFT_WRIST_ID] - shoulder_mid
    right_wrist_vec = landmarks[RIGHT_WRIST_ID] - shoulder_mid
  
    # 判断手部是否同侧
    cross_left = np.cross(left_wrist_vec, spine_vector)
    cross_right = np.cross(right_wrist_vec, spine_vector)
  
    if (cross_left > 0 and cross_right > 0) or (cross_left < 0 and cross_right < 0):
        avg_hand_vec = (left_wrist_vec + right_wrist_vec) / 2
        dot_product = np.dot(avg_hand_vec, spine_vector)
        angle = np.arccos(dot_product / (np.linalg.norm(avg_hand_vec) * np.linalg.norm(spine_vector) + 1e-6)) * 180/np.pi
      
        cross_product = np.cross(avg_hand_vec, spine_vector)
        if cross_product > 0:
            return "左侧睡" if angle < 180 else "右侧睡"
        else:
            return "右侧睡" if angle < 180 else "左侧睡"
    else:
        # 头部姿势判断
        nose_vec = landmarks[NOSE_ID] - shoulder_mid
        dot_nose = np.dot(nose_vec, spine_vector)
        angle_nose = np.arccos(dot_nose / (np.linalg.norm(nose_vec) * np.linalg.norm(spine_vector) + 1e-6)) * 180/np.pi
      
        cross_nose = np.cross(nose_vec, spine_vector)
        if cross_nose > 0:
            if angle_nose > 10:
                return "左偏头仰睡"
            else:
                return "正常仰睡"
        else:
            if angle_nose > 10:
                return "右偏头仰睡"
            else:
                return "正常仰睡"

def process_video(video_path, output_csv, output_video=None, process_fps=5):
    """视频处理主函数"""
    # 初始化模型和工具
    pose = initialize_pose_model()
    mp_drawing = mp.solutions.drawing_utils
  
    # 打开视频文件
    cap = cv2.VideoCapture(video_path)
    original_fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(round(original_fps / process_fps))
  
    # 准备输出视频
    if output_video:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        frame_size = (int(cap.get(3)), int(cap.get(4)))
        out = cv2.VideoWriter(output_video, fourcc, process_fps, frame_size)
  
    # 创建CSV文件
    with open(output_csv, 'w', newline='') as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow(['Timestamp', 'Posture'])
      
        frame_count = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
              
            frame_count += 1
            if frame_count % frame_interval != 0:
                continue
          
            # 计算时间戳
            timestamp = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
          
            # 姿势检测
            results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
          
            if results.pose_landmarks:
                # 提取关键点坐标
                landmarks = {}
                for idx, lm in enumerate(results.pose_landmarks.landmark):
                    landmarks[idx] = np.array([lm.x, lm.y])
              
                # 姿势分析
                posture = analyze_posture(landmarks)
              
                # 写入CSV
                csv_writer.writerow([f"{timestamp:.2f}", posture])
              
                # 在画面上标注
                mp_drawing.draw_landmarks(
                    frame, results.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS)
                cv2.putText(frame, posture, (20, 50), 
                          cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
              
                # 写入输出视频
                if output_video:
                    out.write(frame)
          
            # 实时显示（可选）
            cv2.imshow('Posture Analysis', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    # 释放资源
    cap.release()
    if output_video:
        out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    # 使用示例
    video_path = "input_video.mp4"
    output_csv = "posture_timeline.csv"
    output_video = "annotated_output.mp4"  # 设为None则不输出视频
  
    process_video(
        video_path=video_path,
        output_csv=output_csv,
        output_video=output_video,
        process_fps=5  # 处理帧率
    )