In [None]:
### CSV  ####

import os
import cv2
import mediapipe as mp
import pandas as pd

# Initialize MediaPipe
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True)
landmark_list = []

# Path to your plank dataset
base_path = r"C:\Users\USER\plank_image_dataset"  # ✅ update if needed

# Process each image and extract landmarks
for label in os.listdir(base_path):
    folder_path = os.path.join(base_path, label)
    for img_name in os.listdir(folder_path):
        img_path = os.path.join(folder_path, img_name)
        img = cv2.imread(img_path)
        if img is None:
            continue

        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        results = pose.process(img_rgb)

        if results.pose_landmarks:
            row = []
            for lm in results.pose_landmarks.landmark:
                row += [lm.x, lm.y, lm.z, lm.visibility]
            row.append(label)
            landmark_list.append(row)

pose.close()

# Column names for 33 landmarks (x, y, z, visibility)
columns = []
for i in range(33):
    columns += [f'x{i}', f'y{i}', f'z{i}', f'v{i}']
columns.append('label')

# Save to CSV
df = pd.DataFrame(landmark_list, columns=columns)
df.to_csv('pose_plank_dataset.csv', index=False)
print("✅ CSV dataset created as 'pose_plank_dataset.csv'")

In [None]:
### TRAINING ###

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib

# Load dataset
df = pd.read_csv("pose_plank_dataset.csv")
X = df.drop('label', axis=1)
y = df['label']

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
print("✅ Accuracy:", accuracy_score(y_test, y_pred))

# Save model
joblib.dump(model, 'pose_plank_classifier.pkl')
print("✅ Model saved as 'pose_plank_classifier.pkl'")

In [5]:
## TESTING ###

import cv2
import mediapipe as mp
import numpy as np
import joblib
import time
import pyttsx3
import threading

# Load trained model
model = joblib.load("pose_plank_classifier.pkl")

# Voice feedback setup
engine = pyttsx3.init()
def speak(text):
    threading.Thread(target=lambda: engine.say(text) or engine.runAndWait()).start()

# MediaPipe pose
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Load reference image and resize
ref_img = cv2.imread("plank_guide.png")  # Must be in same folder
if ref_img is not None:
    ref_img = cv2.resize(ref_img, (150, 150))  # Small corner size

# Video input
cap = cv2.VideoCapture(r"C:\Users\USER\plank_videos\plank_5.mp4")  # Or use 0 for webcam

plank_started = False
prep_start = None
hold_start_time = 0
hold_duration = 0
prep_duration = 10

# For summary
correct_frame_count = 0
total_frame_count = 0
posture_alerts = 0
start_time = time.time()

with mp_pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.resize(frame, (1280, 720))
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        total_frame_count += 1
        prediction = "unknown"

        if results.pose_landmarks:
            landmarks = []
            for lm in results.pose_landmarks.landmark:
                landmarks += [lm.x, lm.y, lm.z, lm.visibility]

            prediction = model.predict([landmarks])[0]

            if prediction == "plank":
                correct_frame_count += 1
                if not prep_start:
                    prep_start = time.time()
                elif time.time() - prep_start >= prep_duration:
                    if not plank_started:
                        hold_start_time = time.time()
                        plank_started = True
                    else:
                        hold_duration = int(time.time() - hold_start_time)
                        if hold_duration % 10 == 0 and hold_duration > 0:
                            speak(f"Keep holding. {hold_duration} seconds.")
                else:
                    countdown = prep_duration - int(time.time() - prep_start)
                    cv2.putText(image, f"Get Ready: {countdown}s", (50, 100),
                                cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 255), 3)
            else:
                posture_alerts += 1
                plank_started = False
                prep_start = None
                hold_duration = 0
                speak("Fix your posture")
                cv2.putText(image, "Posture Incorrect", (50, 100),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 0, 255), 3)

            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        # Overlay guide image on top-right
        if ref_img is not None:
            h, w = ref_img.shape[:2]
            image[10:10 + h, -10 - w:-10] = ref_img

        # Info display
        cv2.putText(image, f"Pose: {prediction}", (50, 140),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 0), 2)
        cv2.putText(image, f"Hold: {hold_duration}s", (50, 50),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)

        cv2.imshow("Plank Timer + Posture Guide", image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

# Session summary
session_duration = int(time.time() - start_time)
accuracy = (correct_frame_count / total_frame_count) * 100 if total_frame_count > 0 else 0

print("\n🧾 Session Summary:")
print(f"🕒 Duration: {session_duration} seconds")
print(f"✅ Correct Plank Frames: {correct_frame_count}/{total_frame_count}")
print(f"📉 Accuracy: {accuracy:.2f}%")
print(f"⚠️ Posture Alerts: {posture_alerts}")
speak("Session complete. Well done!")

Exception in thread Thread-6:
Traceback (most recent call last):
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\USER\AppData\Local\Temp\ipykernel_1272\3743843847.py", line 17, in <lambda>
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\site-packages\pyttsx3\engine.py", line 180, in runAndWait
    raise RuntimeError('run loop already started')
RuntimeError: run loop already started
Exception in thread Thread-7:
Traceback (most recent call last):
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\USER\anaconda3\envs\ml_env\lib\site-packages\ipykernel\ipkernel.py", line 766, 


🧾 Session Summary:
🕒 Duration: 38 seconds
✅ Correct Plank Frames: 752/752
📉 Accuracy: 100.00%
⚠️ Posture Alerts: 0


