Required libraries

In [1]:
import numpy as np
import cv2
import mediapipe as mp
import simpleaudio as sa
from langchain_groq import ChatGroq
from dotenv import load_dotenv
import os
from langchain_core.prompts import ChatPromptTemplate
from feedbackbot import bot
from relevancy_checker import check_relevancy
from followup_bot import run_chatbot
import subprocess
import threading

  from .autonotebook import tqdm as notebook_tqdm


Importing model

In [2]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose = mp_pose.Pose(model_complexity=1)

Caluculate Angle

In [3]:
def calculate_angle(a, b, c):
    """Calculate the angle between three points."""
    a, b, c = np.array(a), np.array(b), np.array(c)
    
    ba = a - b
    bc = c - b

    mag_ba = np.linalg.norm(ba)
    mag_bc = np.linalg.norm(bc)

    if mag_ba * mag_bc < 1e-6:
        return 0

    dot_product = np.dot(ba, bc)
    angle_rad = np.arccos(np.clip(dot_product / (mag_ba * mag_bc), -1.0, 1.0))
    return np.degrees(angle_rad)

Evaluate Pushups

In [4]:
class PushupAnalyzer:
    def __init__(self):
        self.prev_elbow_angle = 180  # Start at top position
        self.pushup_count = 0
        self.min_elbow_angle = 180
        self.lockout_threshold = 160
        self.bottom_threshold = 90
        self.is_bottom_reached = False  # Track state internally

    def process_frame(self, shoulder, elbow, wrist, hip):
        pushup_completed = False
        feedback = ""
        elbow_angle = calculate_angle(shoulder, elbow, wrist)
        torso_angle = calculate_angle(shoulder, hip, [hip[0], shoulder[1]])

        # Phase 1: Descending (check for bottom)
        if elbow_angle < self.prev_elbow_angle and not self.is_bottom_reached:
            if elbow_angle <= self.bottom_threshold:
                self.is_bottom_reached = True
                self.min_elbow_angle = elbow_angle
                feedback = "Perfect depth!"
            else:
                feedback = f"Go lower! Current: {elbow_angle}°"

        # Phase 2: Ascending (check for top)
        elif elbow_angle > self.prev_elbow_angle and self.is_bottom_reached:
            if elbow_angle >= self.lockout_threshold:
                pushup_completed = True
                self.pushup_count += 1
                self.is_bottom_reached = False
                feedback = "Good rep! " if self.min_elbow_angle <= 90 else "Shallow rep! "
                feedback += "Perfect lockout!" if elbow_angle >= 175 else "Fully extend arms!"
            else:
                feedback = "Keep pushing up!"

        # Torso stability check
        if abs(torso_angle) > 20:
            feedback += " Hips too high!" if torso_angle > 0 else " Hips sagging!"

        self.prev_elbow_angle = elbow_angle
        return pushup_completed, feedback.strip(), elbow_angle, torso_angle

Text to speech for feedback

In [5]:

#from playsound import playsound  

class PiperTTS:
    def __init__(self):
        self.piper_path = r"C:\Users\tejat\Desktop\FitnessApp\piper\piper.exe"
        self.model_path = r"C:\Users\tejat\Desktop\FitnessApp\piper\en_US-kathleen-low.onnx"
        
    def speak(self, text):
        """Non-blocking speech synthesis"""
        def _run():
            output = "feedback.wav"
            cmd = f'echo "{text}" | "{self.piper_path}" --model "{self.model_path}" --output_file {output}'
            subprocess.run(cmd, shell=True, stdout=subprocess.PIPE)
            #playsound(output)
            
        thread = threading.Thread(target=_run)
        thread.start()

# Usage
tts = PiperTTS()

In [6]:
def play_audio_async():
    wave_obj = sa.WaveObject.from_wave_file("feedback.wav")
    play_obj = wave_obj.play()
    play_obj.wait_done()

Drawing Pose

In [7]:
def draw_pose(frame, landmarks):
    """Draw landmarks on the frame."""
    mp_drawing.draw_landmarks(
        frame,
        landmarks,
        mp_pose.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2),
        mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2),
    )


Display Information

In [8]:
def display_info(frame, pushup_count, feedback, elbow_angle, torso_angle):
    """Display pushup count, angles, and feedback."""
    cv2.putText(frame, f"Pushups: {pushup_count}", (50, 50), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.putText(frame, f"Elbow: {elbow_angle:.1f}°", (50, 100), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
    cv2.putText(frame, f"Torso: {torso_angle:.1f}°", (50, 140), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    if feedback:
        color = (0, 255, 0) if "Perfect" in feedback else (0, 0, 255)
        cv2.putText(frame, feedback, (50, 180), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

# Main

In [9]:
#video_path = r"C:\Users\tejat\Desktop\FitnessApp\wrong_pushups2.mp4"
cap = cv2.VideoCapture(0)
analyzer=PushupAnalyzer()

# Check if video is opened
if not cap.isOpened():
    print("Error: Could not open video file!")
    exit()

pushup_count = 0
is_bottom_reached = False
current_feedback = ""
feedback_display_frames = 0
feedbacks=[]
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        draw_pose(frame, results.pose_landmarks)

        landmarks = results.pose_landmarks.landmark
        h, w, _ = frame.shape

        # Extract key landmarks
        try:
            shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].x * w,
                        landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].y * h]

            elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].x * w,
                     landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].y * h]

            wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST].x * w,
                     landmarks[mp_pose.PoseLandmark.LEFT_WRIST].y * h]

            hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP].x * w,
                   landmarks[mp_pose.PoseLandmark.LEFT_HIP].y * h]

            # Evaluate pushup
            completed, feedback, elbow_angle, torso_angle = analyzer.process_frame(shoulder, elbow, wrist, hip)

            if completed:
                pushup_count =analyzer.pushup_count
                current_feedback = feedback
                feedbacks.append(current_feedback)
                feedback_display_frames = 30  
                tts.speak(current_feedback)
                threading.Thread(target=play_audio_async).start()

            display_info(frame, pushup_count, 
                        current_feedback if feedback_display_frames > 0 else "",
                        elbow_angle, torso_angle)

            if feedback_display_frames > 0:
                feedback_display_frames -= 1
        except IndexError:
            print("Error: Landmark detection failed!")
            
            

    cv2.imshow('Pushup Counter', frame)
    
    if cv2.waitKey(10) == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [10]:
feedbacks=set(feedbacks)

In [11]:
load_dotenv()
api_key = os.environ.get("GROQ_API_KEY")

Langchain Bro

In [12]:
if feedbacks:
    analysis = bot(feedbacks, exercise="pushups")
    for i, response in enumerate(analysis):
        print(f"Feedback {i+1}:")
        print(response)
        print("-" * 50)
else:
    print("No feedback was generated from the video.")

No feedback was generated from the video.


In [13]:
analysis="".join(analysis)
text1=""
text1=text1+analysis

NameError: name 'analysis' is not defined

In [None]:
while True:
    text2=input("You Query:" )
    relevancy=check_relevancy(text1,text2)
    if relevancy!="Relevant":
        print("Ask relevant questions")
        continue
    chatbot_reply=run_chatbot(text2,text1)
    text1=text1+chatbot_reply
    print(text2)
    print(chatbot_reply)