In [1]:
import os
import cv2
import numpy as np
import mediapipe as mp
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.models import Sequential
from tqdm import tqdm

2025-04-07 12:29:37.832061: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-07 12:29:37.836024: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-04-07 12:29:37.844752: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1744009177.860069   29796 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1744009177.864556   29796 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1744009177.875553   29796 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

In [2]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["TF_NUM_INTRAOP_THREADS"] = "1"
os.environ["TF_NUM_INTEROP_THREADS"] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"


In [3]:
SEQUENCE_LENGTH = 15  # 0.5s clips at 30fps
NUM_KEYPOINTS = 33    # MediaPipe pose points
CLASSES = ['backhand_drive','backhand_net_shot', 'forehand_clear', 'forehand_drive', 'forehand_lift', 'forehand_net_shot']

In [4]:
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose

In [5]:
def process_keypoints(keypoints):
    """Normalize keypoints relative to hip center"""
    keypoints = np.array(keypoints)
    
    # Get hip coordinates (index 23) - shape (N_frames, 2)
    hip_coords = keypoints[:, 23*3:23*3+2]
    
    # Reshape for broadcasting - (N_frames, 1, 2)
    hip_coords = hip_coords.reshape(-1, 1, 2)
    
    # Normalize coordinates
    keypoints[:, 0::3] -= hip_coords[:, :, 0]  # X coordinates
    keypoints[:, 1::3] -= hip_coords[:, :, 1]  # Y coordinates
    
    return keypoints

In [6]:
def extract_keypoints(video_path):
    """Extract pose keypoints from video"""
    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return []
        keypoint_sequence = []
        
        with mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5) as pose:
            
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                    
                # Process frame
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = pose.process(frame_rgb)
                
                if results.pose_landmarks:
                    kps = []
                    for landmark in results.pose_landmarks.landmark:
                        kps.extend([landmark.x, landmark.y, landmark.visibility])
                    keypoint_sequence.append(kps)
                else:
                    keypoint_sequence.append([0]*(NUM_KEYPOINTS*3))  # Zero-pad missing frames
        
        cap.release()
        
        # Normalize and align sequence
        sequence = process_keypoints(keypoint_sequence)
        return sequence[:SEQUENCE_LENGTH]

    except Exception as e:  # Now properly aligned with try
        print(f"Error processing {video_path}: {str(e)}")
        return []

In [7]:
def build_dataset(dataset_path):
    """Create dataset from video directory"""
    X, y = [], []
    
    for class_idx, class_name in enumerate(CLASSES):
        class_dir = os.path.join(dataset_path, class_name)
        if not os.path.exists(class_dir):
            print(f"Missing directory: {class_dir}")
            continue
            
        videos = [v for v in os.listdir(class_dir) 
                if v.endswith(('.mp4', '.avi', '.MOV'))]
        
        for video in tqdm(videos, desc=f'Processing {class_name}'):
            video_path = os.path.join(class_dir, video)
            sequence = extract_keypoints(video_path)
            
            # Only keep valid sequences
            if len(sequence) == SEQUENCE_LENGTH:
                X.append(sequence)
                y.append(class_idx)
                
    return np.array(X), np.array(y)

In [8]:
# # Build dataset
# X, y = build_dataset('/kaggle/input/data-set-updated/data set updated')

# # Train-test split
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.2, stratify=y)

# # Convert labels to one-hot
# y_train = tf.keras.utils.to_categorical(y_train)
# y_test = tf.keras.utils.to_categorical(y_test)

In [9]:
# # Build LSTM model
# model = Sequential([
#     LSTM(64, input_shape=(SEQUENCE_LENGTH, NUM_KEYPOINTS*3), return_sequences=True),
#     Dropout(0.3),
#     LSTM(32),
#     Dense(32, activation='relu'),
#     Dense(len(CLASSES), activation='softmax')
# ])

In [10]:
# model.compile(
#     optimizer='adam',
#     loss='categorical_crossentropy',
#     metrics=['accuracy']
# )

In [11]:
# with tf.device('/GPU:0'):
#     history = model.fit(
#     X_train, y_train,
#     validation_data=(X_test, y_test),
#     epochs=40,
#     batch_size=32,
#     callbacks=[
#         tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
#         tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3),
#         tf.keras.callbacks.ModelCheckpoint(
#             'pose_model.keras',
#             save_best_only=True,
#             monitor='val_accuracy'
#         )
#     ]
# )

In [12]:
# # Evaluation
# loss, accuracy = model.evaluate(X_test, y_test)
# print(f"Test Accuracy: {accuracy*100:.2f}%")

In [None]:
class PoseClassifier:
    def __init__(self, model_path, sequence_length=15):
        try:
            self.model = tf.keras.models.load_model(model_path)
        except Exception as e:
            print(f"Error loading model: {str(e)}")
            raise
        self.sequence = []
        self.sequence_length = sequence_length
        self.pose = mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )

    def classify(self, frame):
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.pose.process(frame_rgb)

        if results.pose_landmarks:
            kps = []
            for landmark in results.pose_landmarks.landmark:
                kps.extend([landmark.x, landmark.y, landmark.visibility])
            self.sequence.append(kps)
        else:
            self.sequence.append([0]*(NUM_KEYPOINTS*3))

        # Maintain fixed sequence length
        if len(self.sequence) > self.sequence_length:
            self.sequence = self.sequence[-self.sequence_length:]

        if len(self.sequence) == self.sequence_length:
            processed = process_keypoints(np.array(self.sequence))
            processed = np.expand_dims(processed, axis=0)

            prediction = self.model.predict(processed, verbose=0)[0]
            return CLASSES[np.argmax(prediction)], np.max(prediction)
        
        return "", 0

    def __del__(self):
        self.pose.close()

: 

In [None]:
# Usage
classifier = PoseClassifier('pose_model_large_dataset.keras')

cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("Error: Could not open camera")
    exit()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    pose_label, confidence = classifier.classify(frame)
    
    cv2.putText(frame, f"{pose_label} ({confidence:.2f})", 
               (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
    cv2.imshow('Badminton Pose Classification', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

try:
    cap.release()
    cv2.destroyAllWindows()
except Exception as e:
    print("Cleanup error:", e)

