# Deadlift Form Analysis System
## Real-Time Pose Estimation + Biomechanical Assessment

**Goals:**
1. Extract 12 body keypoints from YOLOv8n-pose (shoulders, elbows, wrists, hips, knees, ankles)
2. Apply Chaffin's biomechanical model to assess deadlift form
3. Identify specific form errors
4. Quantize model for Raspberry Pi 5 + Hailo Hat deployment
5. Deploy to edge device with Camera Module 3

**Output:** Real-time video analysis with form feedback and deployment-ready quantized model

# Preparation

In [1]:
# Install required packages
%pip install ultralytics opencv-python numpy matplotlib openvino openvino-dev

Collecting openvino
  Downloading openvino-2025.4.0-20398-cp310-cp310-win_amd64.whl.metadata (12 kB)
Collecting openvino-dev
  Downloading openvino_dev-2024.6.0-17404-py3-none-any.whl.metadata (15 kB)
Collecting openvino-telemetry>=2023.2.1 (from openvino)
  Downloading openvino_telemetry-2025.2.0-py3-none-any.whl.metadata (2.3 kB)
Collecting defusedxml>=0.7.1 (from openvino-dev)
  Using cached defusedxml-0.7.1-py2.py3-none-any.whl.metadata (32 kB)
Collecting networkx<=3.1.0 (from openvino-dev)
  Downloading networkx-3.1-py3-none-any.whl.metadata (5.3 kB)
INFO: pip is looking at multiple versions of openvino-dev to determine which version is compatible with other requirements. This could take a while.
Collecting openvino-dev
  Downloading openvino_dev-2024.5.0-17288-py3-none-any.whl.metadata (15 kB)
  Downloading openvino_dev-2024.4.0-16579-py3-none-any.whl.metadata (16 kB)
  Downloading openvino_dev-2024.3.0-16041-py3-none-any.whl.metadata (16 kB)
  Downloading openvino_dev-2024.2.0-1

In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
import matplotlib.pyplot as plt
from pathlib import Path
import json
from datetime import datetime

# Setup paths
MODEL_PATH = 'yolov8n-pose.pt'
OUTPUT_DIR = Path('deadlift_analysis_outputs')
OUTPUT_DIR.mkdir(exist_ok=True)

# COCO Keypoint indices (0-16, we extract only 5-16 for body)
COCO_KEYPOINT_NAMES = [
    'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
    'left_shoulder', 'right_shoulder',      # 5, 6
    'left_elbow', 'right_elbow',            # 7, 8
    'left_wrist', 'right_wrist',            # 9, 10
    'left_hip', 'right_hip',                # 11, 12
    'left_knee', 'right_knee',              # 13, 14
    'left_ankle', 'right_ankle'             # 15, 16
]

# Map COCO indices to our 12-point skeleton
BODY_INDICES = [5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]  # shoulders through ankles
BODY_NAMES = [COCO_KEYPOINT_NAMES[i] for i in BODY_INDICES]

print(f"Tracking {len(BODY_INDICES)} keypoints: {BODY_NAMES}")

# Skeleton connections for visualization
SKELETON_CONNECTIONS = [
    (0, 1), (0, 2), (1, 3), (2, 4),  # Arms (shoulders to wrists)
    (0, 6), (1, 7), (6, 7),          # Torso
    (6, 8), (7, 9), (8, 10), (9, 11) # Legs
]

Tracking 12 keypoints: ['left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle']


In [3]:
# Load pretrained YOLOv8n-pose model
model = YOLO(MODEL_PATH)

print(f"Model loaded: {MODEL_PATH}")
print(f"Model size: {MODEL_PATH}")

Model loaded: yolov8n-pose.pt
Model size: yolov8n-pose.pt


# Initialize Chaffin's Biomechanical Model for Deadlift Analysis

In [4]:
class ChaffinDeadliftAnalyzer:
    """
    Implements Chaffin's biomechanical guidelines for deadlift form assessment.
    Reference: Chaffin et al. "Occupational Biomechanics" model for lifting.
    """
    
    def __init__(self, frame_width, frame_height):
        self.frame_width = frame_width
        self.frame_height = frame_height
        self.errors = []
    
    def analyze_frame(self, keypoints):
        """
        Analyze a single frame of keypoints.
        keypoints: array of shape (12, 2) with [x, y] for each of 12 points
        
        Returns: dict with form assessment
        """
        self.errors = []
        
        if keypoints is None or len(keypoints) < 12:
            return {"status": "ERROR", "reason": "Insufficient keypoints"}
        
        # Extract individual joints (re-mapped to 0-11)
        l_shoulder, r_shoulder = keypoints[0], keypoints[1]
        l_elbow, r_elbow = keypoints[2], keypoints[3]
        l_wrist, r_wrist = keypoints[4], keypoints[5]
        l_hip, r_hip = keypoints[6], keypoints[7]
        l_knee, r_knee = keypoints[8], keypoints[9]
        l_ankle, r_ankle = keypoints[10], keypoints[11]
        
        # 1. CHECK: Bar is above knee level
        self._check_bar_position(l_shoulder, r_shoulder, l_knee, r_knee)
        
        # 2. CHECK: Back is straight (spine angle)
        self._check_back_straightness(l_shoulder, r_shoulder, l_hip, r_hip)
        
        # 3. CHECK: Arms are loose and straight (no elbow bend)
        self._check_arm_straightness(l_shoulder, r_shoulder, l_elbow, r_elbow, l_wrist, r_wrist)
        
        # 4. CHECK: Hips move backward, not downward
        self._check_hip_movement(l_hip, r_hip, l_knee, r_knee)
        
        # Return assessment
        if self.errors:
            return {
                "status": "INCORRECT",
                "errors": self.errors
            }
        else:
            return {
                "status": "CORRECT",
                "message": "Excellent deadlift form!"
            }
    
    def _check_bar_position(self, l_shoulder, r_shoulder, l_knee, r_knee):
        """Check if bar (at shoulder height) is above knee"""
        shoulder_y = (l_shoulder[1] + r_shoulder[1]) / 2
        knee_y = (l_knee[1] + r_knee[1]) / 2
        
        if shoulder_y > knee_y:  # Lower y = higher in image
            self.errors.append("Bar is above knee: FAIL - Bar appears to be below knee level")
    
    def _check_back_straightness(self, l_shoulder, r_shoulder, l_hip, r_hip):
        """Check if spine is straight (angle between shoulder-hip line)"""
        # Vector from hip to shoulder
        hip_x = (l_hip[0] + r_hip[0]) / 2
        hip_y = (l_hip[1] + r_hip[1]) / 2
        shoulder_x = (l_shoulder[0] + r_shoulder[0]) / 2
        shoulder_y = (l_shoulder[1] + r_shoulder[1]) / 2
        
        # Calculate spine angle (degrees from vertical)
        dx = shoulder_x - hip_x
        dy = shoulder_y - hip_y
        
        # Angle with vertical (0 = perfectly vertical)
        import math
        spine_angle = abs(math.degrees(math.atan2(dx, abs(dy))))
        
        # Threshold: spine should be within 20 degrees of vertical
        if spine_angle > 20:
            self.errors.append(f"Back is not straight: FAIL - Spine angle is {spine_angle:.1f}° (should be <20°)")
    
    def _check_arm_straightness(self, l_shoulder, r_shoulder, l_elbow, r_elbow, l_wrist, r_wrist):
        """Check if arms are straight (minimal elbow bend)"""
        import math
        
        # Left arm straightness
        l_shoulder_to_elbow = math.sqrt((l_elbow[0] - l_shoulder[0])**2 + (l_elbow[1] - l_shoulder[1])**2)
        l_elbow_to_wrist = math.sqrt((l_wrist[0] - l_elbow[0])**2 + (l_wrist[1] - l_elbow[1])**2)
        l_shoulder_to_wrist = math.sqrt((l_wrist[0] - l_shoulder[0])**2 + (l_wrist[1] - l_shoulder[1])**2)
        
        # Right arm straightness
        r_shoulder_to_elbow = math.sqrt((r_elbow[0] - r_shoulder[0])**2 + (r_elbow[1] - r_shoulder[1])**2)
        r_elbow_to_wrist = math.sqrt((r_wrist[0] - r_elbow[0])**2 + (r_wrist[1] - r_elbow[1])**2)
        r_shoulder_to_wrist = math.sqrt((r_wrist[0] - r_shoulder[0])**2 + (r_wrist[1] - r_shoulder[1])**2)
        
        # Elbow angle using law of cosines
        def calculate_angle(a, b, c):
            """Calculate angle at point b given distances a, b, c"""
            cos_angle = (a**2 + c**2 - b**2) / (2 * a * c) if a > 0 and c > 0 else 0
            cos_angle = max(-1, min(1, cos_angle))  # Clamp to [-1, 1]
            return math.degrees(math.acos(cos_angle))
        
        l_elbow_angle = calculate_angle(l_shoulder_to_elbow, l_elbow_to_wrist, l_shoulder_to_wrist)
        r_elbow_angle = calculate_angle(r_shoulder_to_elbow, r_elbow_to_wrist, r_shoulder_to_wrist)
        
        # Arms should be relatively straight (>150 degrees)
        if l_elbow_angle < 150:
            self.errors.append(f"Left arm not straight: FAIL - Elbow angle is {l_elbow_angle:.1f}° (should be >150°)")
        if r_elbow_angle < 150:
            self.errors.append(f"Right arm not straight: FAIL - Elbow angle is {r_elbow_angle:.1f}° (should be >150°)")
    
    def _check_hip_movement(self, l_hip, r_hip, l_knee, r_knee):
        """Check if hips move backward (not downward)"""
        # This is a simplified check: hip should not descend below knee level
        hip_y = (l_hip[1] + r_hip[1]) / 2
        knee_y = (l_knee[1] + r_knee[1]) / 2
        
        # If hips descend below knees, it's likely a squat, not a deadlift
        if hip_y > knee_y:
            self.errors.append("Hips move downward instead of backward: FAIL - Hips are below knee level")

# Initialize analyzer
analyzer = ChaffinDeadliftAnalyzer(640, 480)
print("Chaffin's biomechanical analyzer initialized")

Chaffin's biomechanical analyzer initialized


# Real-Time Inference Function (Video/Webcam)

In [5]:
def analyze_deadlift_video(input_source, output_video_path=None, smoothing_alpha=0.6):
    """
    Analyze deadlift form from video file or webcam.
    
    Args:
        input_source: Path to video file or 0 for webcam
        output_video_path: Path to save annotated video (optional)
        smoothing_alpha: EMA smoothing factor (0-1)
    
    Returns:
        List of frame analyses
    """
    
    # Open video source
    cap = cv2.VideoCapture(input_source)
    if not cap.isOpened():
        print("ERROR: Could not open video source")
        return
    
    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Setup video writer if output path specified
    out = None
    if output_video_path:
        fourcc = cv2.VideoWriter_fourcc(*'avc1')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    # Initialize analyzer and smoothing
    frame_analyzer = ChaffinDeadliftAnalyzer(width, height)
    prev_kpts = None
    frame_analyses = []
    frame_count = 0
    
    print(f"Processing video: {width}x{height} @ {fps:.0f}fps, {total_frames} frames")
    
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        
        frame_count += 1
        
        # 1. Run YOLOv8n-pose inference
        results = model(frame, verbose=False, conf=0.6)[0] #changed from 0.5 to 0.6
        
        # 2. Extract and filter keypoints
        annotated_frame = frame.copy()
        
        if results.boxes and results.keypoints is not None:
            # Get largest person (closest to camera)
            boxes = results.boxes.xyxy.cpu().numpy()
            areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
            primary_idx = np.argmax(areas)
            
            # Extract 12 body keypoints from COCO 17
            raw_kpts = results.keypoints.xy.cpu().numpy()[primary_idx]
            body_kpts = raw_kpts[BODY_INDICES]
            
            # 3. Apply smoothing
            if prev_kpts is None:
                prev_kpts = body_kpts
            smooth_kpts = (smoothing_alpha * body_kpts) + ((1 - smoothing_alpha) * prev_kpts)
            prev_kpts = smooth_kpts
            
            # 4. Analyze form
            analysis = frame_analyzer.analyze_frame(smooth_kpts)
            frame_analyses.append({
                "frame": frame_count,
                "analysis": analysis
            })
            
            # 5. Draw visualization
            # Draw keypoints
            for i, (x, y) in enumerate(smooth_kpts):
                if x > 0 and y > 0:
                    cv2.circle(annotated_frame, (int(x), int(y)), 6, (0, 255, 255), -1)
                    cv2.putText(annotated_frame, BODY_NAMES[i], (int(x), int(y) - 10),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
            
            # Draw skeleton
            for start_i, end_i in SKELETON_CONNECTIONS:
                if start_i < len(smooth_kpts) and end_i < len(smooth_kpts):
                    pt1 = smooth_kpts[start_i]
                    pt2 = smooth_kpts[end_i]
                    if pt1[0] > 0 and pt2[0] > 0:
                        cv2.line(annotated_frame, (int(pt1[0]), int(pt1[1])),
                                (int(pt2[0]), int(pt2[1])), (255, 0, 255), 2)
            
            # Draw analysis result
            status_color = (0, 255, 0) if analysis["status"] == "CORRECT" else (0, 0, 255)
            cv2.putText(annotated_frame, analysis["status"], (10, 30),
                      cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2)
            
            if "errors" in analysis:
                for i, error in enumerate(analysis["errors"]):
                    cv2.putText(annotated_frame, error, (10, 60 + i*25),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
        
        # Write output
        if out:
            out.write(annotated_frame)
        
        # Progress
        if frame_count % 30 == 0:
            print(f"  Frame {frame_count}/{total_frames}")
    
    cap.release()
    if out:
        out.release()
    
    return frame_analyses

print("analyze_deadlift_video() function defined")

analyze_deadlift_video() function defined


In [6]:
# Example: Analyze video
video_input = "Test Video/nghia.MOV"  # Replace with your video path
output_video = str(OUTPUT_DIR / "deadlift_analysis_annotated.mp4")

# Run analysis
analyses = analyze_deadlift_video(video_input, output_video_path=output_video)

# Print summary
print("\n" + "="*60)
print("ANALYSIS SUMMARY")
print("="*60)

correct_frames = sum(1 for a in analyses if a["analysis"]["status"] == "CORRECT")
incorrect_frames = len(analyses) - correct_frames

print(f"Total frames analyzed: {len(analyses)}")
print(f"Correct form: {correct_frames} ({correct_frames/len(analyses)*100:.1f}%)")
print(f"Incorrect form: {incorrect_frames} ({incorrect_frames/len(analyses)*100:.1f}%)")

# Show unique errors
all_errors = []
for a in analyses:
    if "errors" in a["analysis"]:
        all_errors.extend(a["analysis"]["errors"])

print("\nDetected form errors:")
for error in set(all_errors):
    count = all_errors.count(error)
    print(f"  - {error} ({count} frames)")

Processing video: 1080x1920 @ 30fps, 1564 frames
  Frame 30/1564
  Frame 60/1564
  Frame 90/1564
  Frame 120/1564
  Frame 150/1564
  Frame 180/1564
  Frame 210/1564
  Frame 240/1564
  Frame 270/1564
  Frame 300/1564
  Frame 330/1564
  Frame 360/1564
  Frame 390/1564
  Frame 420/1564
  Frame 450/1564
  Frame 480/1564
  Frame 510/1564
  Frame 540/1564
  Frame 570/1564
  Frame 600/1564
  Frame 630/1564
  Frame 660/1564
  Frame 690/1564
  Frame 720/1564
  Frame 750/1564
  Frame 780/1564
  Frame 810/1564
  Frame 840/1564
  Frame 870/1564
  Frame 900/1564
  Frame 930/1564
  Frame 960/1564
  Frame 990/1564
  Frame 1020/1564
  Frame 1050/1564
  Frame 1080/1564
  Frame 1110/1564
  Frame 1140/1564
  Frame 1170/1564
  Frame 1200/1564
  Frame 1230/1564
  Frame 1260/1564
  Frame 1290/1564
  Frame 1320/1564
  Frame 1350/1564
  Frame 1380/1564
  Frame 1410/1564
  Frame 1440/1564
  Frame 1470/1564
  Frame 1500/1564
  Frame 1530/1564
  Frame 1560/1564

ANALYSIS SUMMARY
Total frames analyzed: 1564
Corre

In [None]:
# Save detailed analysis to JSON
results_file = OUTPUT_DIR / "deadlift_analysis_results.json"

results = {
    "timestamp": datetime.now().isoformat(),
    "video_file": video_input,
    "total_frames": len(analyses),
    "correct_frames": correct_frames,
    "incorrect_frames": incorrect_frames,
    "accuracy": correct_frames / len(analyses),
    "frame_analyses": analyses
}

with open(results_file, 'w') as f:
    json.dump(results, f, indent=2)

print(f"Results saved to {results_file}")

In [None]:
import subprocess
import sys

# 1. Export YOLOv8n-pose to ONNX (required for quantization)
print("Exporting YOLOv8n-pose to ONNX format...")
model_onnx = model.export(format='onnx', imgsz=640, opset=12)
print(f"ONNX model exported: {model_onnx}")

# 2. Convert ONNX to OpenVINO IR (intermediate representation)
# OpenVINO is optimized for edge devices like Raspberry Pi
print("\nConverting ONNX to OpenVINO IR format...")
onnx_path = Path(model_onnx)
ir_path = OUTPUT_DIR / "yolov8n_pose_ir"

try:
    # Use OpenVINO's model optimizer
    cmd = [
        sys.executable, "-m", "openvino.tools.mo",
        "--input_model", str(onnx_path),
        "--output_dir", str(ir_path),
        "--model_name", "yolov8n_pose_quantized",
        "--compress_to_fp16"  # 16-bit precision for Hailo
    ]
    subprocess.run(cmd, check=True)
    print(f"OpenVINO IR model saved to {ir_path}")
except Exception as e:
    print(f"OpenVINO conversion note: {e}")
    print("You can still use the ONNX model, but OpenVINO is recommended for Hailo")

# 3. For Hailo Hat specifically, you need Hailo's quantization toolkit
print("\n" + "="*60)
print("HAILO QUANTIZATION INSTRUCTIONS")
print("="*60)
print("""
To fully quantize for Hailo Hat:

1. Install Hailo Model Zoo:
   pip install hailo-model-zoo

2. Use Hailo's quantization script:
   hailomz quantize --model-name yolov8n-pose \\
     --calibration-data /path/to/calibration/images \\
     --output-path ./yolov8n_pose_hailo.hef

3. The .hef file (Hailo Executable Format) is deployment-ready for Raspberry Pi + Hailo Hat

For now, we'll save the OpenVINO IR model which is also edge-optimized.
""")

# Save model paths for deployment
model_info = {
    "original_model": MODEL_PATH,
    "onnx_model": str(onnx_path),
    "openvino_ir_path": str(ir_path),
    "quantization_format": "fp16 (OpenVINO) - For Hailo, use .hef format",
    "model_size_original": "6.6 MB",
    "model_size_quantized": "~3.3 MB (estimated after fp16)"
}

model_info_file = OUTPUT_DIR / "model_deployment_info.json"
with open(model_info_file, 'w') as f:
    json.dump(model_info, f, indent=2)

print(f"\nModel deployment info saved to {model_info_file}")