In [1]:
import cv2
import time
import torch
import numpy as np
import mediapipe as mp
import pickle
from collections import deque
from CNN_LSTM_classification import CNN_LSTM 

In [2]:
# === Load pretrained CNN-LSTM model ===
with open("model_config.pkl", "rb") as f:
    model_config = pickle.load(f)
with open("label_encoder.pkl", "rb") as f:
    label_encoder = pickle.load(f)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN_LSTM(**model_config)
model.load_state_dict(torch.load("cnn_lstm_model.pth", map_location=device))
model.to(device)
model.eval()

CNN_LSTM(
  (conv1): Conv1d(10, 64, kernel_size=(5,), stride=(1,), padding=(2,))
  (relu): ReLU()
  (lstm): LSTM(64, 64, num_layers=2, batch_first=True, dropout=0.3)
  (fc): Linear(in_features=64, out_features=5, bias=True)
)

In [3]:
# Load MediaPipe Pose landmark recognition
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils


In [4]:
# Calculate angles between points a->b->c 
def calculate_angle(a, b, c):
    a, b, c = np.array(a), np.array(b), np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else angle

In [5]:
# Calculate ground angles
def calculate_ground_angle(pt1, pt2):
    dx, dy = pt2[0] - pt1[0], pt2[1] - pt1[1]
    angle = np.arctan2(dy, dx) * 180.0 / np.pi
    return abs(angle)  # angle to horizontal

In [6]:
# === App config ===
SEQUENCE_LENGTH = 120
FPS = 40
COOLDOWN = 5
FEATURE_NAMES = [
    "Shoulder", "Elbow", "Hip", "Knee", "Ankle",
    "Shoulder_Ground", "Elbow_Ground", "Hip_Ground", "Knee_Ground", "Ankle_Ground"
]

In [7]:
# === Smoothing buffers ===
# using moving avg to smooth the data
angle_buffers = {name: deque(maxlen=5) for name in FEATURE_NAMES}
sequence = []
collecting = True
predicted_label = ""
start_time = time.time()
last_collected = 0

In [None]:
# Initialize webcam instance and pose model
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read() # Grab frame from the webcam
        if not ret:
            break
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert into RGB format
        image.flags.writeable = False
        results = pose.process(image) # Processes the image to get the landmark
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        h, w, _ = image.shape

        current_time = time.time()

        if results.pose_landmarks:
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) # Draw the landmark on the screen
            lm = results.pose_landmarks.landmark

            def vis(i): return lm[i].visibility > 0.0
            def pt(i): return [int(lm[i].x * w), int(lm[i].y * h)]

            # Validate visibility of all keypoints
            required = [
                mp_pose.PoseLandmark.LEFT_SHOULDER,
                mp_pose.PoseLandmark.LEFT_ELBOW,
                mp_pose.PoseLandmark.LEFT_WRIST,
                mp_pose.PoseLandmark.LEFT_HIP,
                mp_pose.PoseLandmark.LEFT_KNEE,
                mp_pose.PoseLandmark.LEFT_ANKLE,
                mp_pose.PoseLandmark.LEFT_FOOT_INDEX
            ]
            if all(vis(p.value) for p in required): # Validate the Body angles actually extract to confirm the vector into the sequence matrix
                ls, le, lw = pt(mp_pose.PoseLandmark.LEFT_SHOULDER.value), pt(mp_pose.PoseLandmark.LEFT_ELBOW.value), pt(mp_pose.PoseLandmark.LEFT_WRIST.value)
                lh, lk = pt(mp_pose.PoseLandmark.LEFT_HIP.value), pt(mp_pose.PoseLandmark.LEFT_KNEE.value)
                la, lf = pt(mp_pose.PoseLandmark.LEFT_ANKLE.value), pt(mp_pose.PoseLandmark.LEFT_FOOT_INDEX.value)

                angles = {
                "Shoulder": calculate_angle(le, ls, lh),
                "Elbow": calculate_angle(ls, le, lw),
                "Hip": calculate_angle(ls, lh, lk),
                "Knee": calculate_angle(lh, lk, la),
                "Ankle": calculate_angle(lk, la, lf),
            
                # Ground Angles (e.g., Shoulder to Elbow, Elbow to Wrist, etc.)
                "Shoulder_Ground": calculate_ground_angle(ls, le),
                "Elbow_Ground": calculate_ground_angle(le, lw),
                "Hip_Ground": calculate_ground_angle(lh, lk),
                "Knee_Ground": calculate_ground_angle(lk, la),
                "Ankle_Ground": calculate_ground_angle(la, lf)
                }

                # Smooth angles
                smoothed = []
                for k in FEATURE_NAMES:
                    angle_buffers[k].append(angles[k])
                    smoothed.append(np.mean(angle_buffers[k])) # Add the vector to the 120 sequence after smoothed

                if collecting and current_time - last_collected >= 1 / FPS:
                    if len(sequence) < SEQUENCE_LENGTH:
                        sequence.append(smoothed)
                        last_collected = current_time

                # Draw angle values
                for i, name in enumerate(FEATURE_NAMES):
                    cv2.putText(image, f"{name}: {int(smoothed[i])}", (20, 40 + i * 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        # === Classification ===
        if collecting:
            text1 = f"Collecting: {len(sequence)}/{SEQUENCE_LENGTH}"
            text_size1 = cv2.getTextSize(text1, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 3)[0]
            cv2.putText(image, text1, (w - text_size1[0] - 20, h - 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 3)
            if len(sequence) == SEQUENCE_LENGTH:
                X = np.array(sequence).reshape(1, SEQUENCE_LENGTH, 10)
                X_tensor = torch.tensor(X, dtype=torch.float32).to(device)
                with torch.no_grad(): # Add the collected sequence into the classification model (LSTM+CNN)
                    logits = model(X_tensor)
                    pred_idx = torch.argmax(logits, dim=1).item()
                    predicted_label = label_encoder.inverse_transform([pred_idx])[0]
                collecting = False
                sequence = []
                start_time = time.time()
        else:
            cooldown = COOLDOWN - (current_time - start_time) # Cooldown for 5 sec before building new sequence
            text2 = f"Prediction: {predicted_label}"
            text3 = f"Cooldown: {int(max(0, cooldown))}s"

            text_size2 = cv2.getTextSize(text2, cv2.FONT_HERSHEY_SIMPLEX, 1.1, 3)[0]
            text_size3 = cv2.getTextSize(text3, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)[0]
        
            cv2.putText(image, text2, (w - text_size2[0] - 20, h - 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.1, (0, 255, 0), 3)
            cv2.putText(image, text3, (w - text_size3[0] - 20, h - 40),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)

            if cooldown <= 0:
                collecting = True
                last_collected = time.time()

        cv2.imshow("CNN-LSTM Exercise Classifier", image)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()