In [32]:
import cv2
import tensorflow as tf
import numpy as np
import time
import mediapipe as mp
from collections import deque, Counter
import joblib
from IPython.display import display, clear_output, HTML
from PIL import Image
import io
import matplotlib.pyplot as plt
from matplotlib import patches
import matplotlib
matplotlib.use('TkAgg')

In [31]:
# Initialize MediaPipe
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [33]:
# Landmark configuration
LANDMARK_INDICES = [0, 2, 5, 7, 8, 9, 10, 11, 12]  # Key landmarks for posture
POSE_CLASSES = ['leaning_backward', 'leaning_forward', 'leaning_left', 'leaning_right', 'upright']
POSE_COLORS = {
    'leaning_backward': (0, 0, 255),      # Red
    'leaning_forward': (0, 255, 0),       # Green
    'leaning_left': (255, 0, 0),          # Blue
    'leaning_right': (0, 255, 255),       # Yellow
    'upright': (255, 0, 255)              # Magenta
}


In [34]:

class EnhancedPostureDetector:
    def __init__(self, model_path, scaler_path, label_encoder_path):
        """Initialize with trained models matching the training architecture"""
        try:
            # Load preprocessing
            self.scaler = joblib.load(scaler_path)
            self.label_encoder = joblib.load(label_encoder_path)
            
            # Rebuild exact model architecture from training code
            self.model = self._build_identical_model()
            
            # Load weights
            self.model.load_weights(model_path)
            
            # Calibration and smoothing
            self.calibrated_upright = None
            self.calibration_samples = deque(maxlen=30)
            self.prediction_history = deque(maxlen=5)
            
            print("‚úÖ Model loaded with identical architecture to training")
            
        except Exception as e:
            raise Exception(f"Model initialization failed: {str(e)}")
    
    def _build_identical_model(self):
        """Rebuild the EXACT architecture from training code"""
        input_layer = tf.keras.layers.Input(shape=(9, 4), name='input_layer')
        
        # Conv Blocks (exactly as in training)
        x = tf.keras.layers.Conv1D(128, 2, activation='relu')(input_layer)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Conv1D(128, 2, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.MaxPooling1D(2)(x)
        x = tf.keras.layers.Dropout(0.4)(x)
        
        x = tf.keras.layers.Conv1D(64, 2, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Conv1D(64, 2, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(0.3)(x)
        
        # LSTM Blocks (exactly as in training)
        x = tf.keras.layers.LSTM(100, return_sequences=True, 
                                dropout=0.4, recurrent_dropout=0.4)(x)
        x = tf.keras.layers.LSTM(80, return_sequences=True,
                               dropout=0.4, recurrent_dropout=0.4)(x)
        x = tf.keras.layers.LSTM(50, dropout=0.4, recurrent_dropout=0.4)(x)
        
        # Dense Layers (exactly as in training)
        x = tf.keras.layers.Dense(256, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(0.5)(x)
        
        x = tf.keras.layers.Dense(128, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(0.4)(x)
        
        x = tf.keras.layers.Dense(64, activation='relu')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dropout(0.3)(x)
        
        output_layer = tf.keras.layers.Dense(5, activation='softmax')(x)
        
        model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
        model.compile(optimizer='adam',
                     loss='sparse_categorical_crossentropy',
                     metrics=['accuracy'])
        return model
    
    def extract_landmarks(self, pose_landmarks):
        """Extract and normalize key landmarks (matches training preprocessing)"""
        if not pose_landmarks:
            return None
            
        landmarks = []
        valid_points = 0
        
        for idx in LANDMARK_INDICES:
            if idx < len(pose_landmarks.landmark):
                lm = pose_landmarks.landmark[idx]
                if lm.visibility > 0.3:  # Same threshold as training
                    valid_points += 1
                landmarks.append([lm.x, lm.y, lm.z, lm.visibility])
            else:
                landmarks.append([0.0, 0.0, 0.0, 0.0])
        
        return np.array(landmarks, dtype=np.float32) if valid_points >= 5 else None
    
    def preprocess_input(self, landmarks):
        """Identical preprocessing to training pipeline"""
        if landmarks is None:
            return None
            
        # Flatten and scale exactly like training
        flattened = landmarks.flatten()
        scaled = self.scaler.transform([flattened])
        return scaled.reshape(1, 9, 4)
    
    def predict_posture(self, landmarks):
        """Make prediction with identical processing as training"""
        processed = self.preprocess_input(landmarks)
        if processed is None:
            return "No Pose", 0.0
            
        # Predict with identical model architecture
        predictions = self.model.predict(processed, verbose=0)
        class_idx = np.argmax(predictions[0])
        confidence = np.max(predictions[0])
        posture = self.label_encoder.inverse_transform([class_idx])[0]
        
        # Temporal smoothing
        self.prediction_history.append(posture)
        if len(self.prediction_history) >= 3:
            posture = Counter(self.prediction_history).most_common(1)[0][0]
        
        return posture, confidence

    def save_calibration(self):
        if len(self.calibration_samples) == 0:
            return
        self.calibrated_upright = np.mean(np.array(self.calibration_samples), axis=0)
        self.calibration_samples.clear()
    
    def calibrate_sample(self, landmarks):
        if landmarks is not None:
            self.calibration_samples.append(landmarks)
    
    def delta_from_calibrated(self, landmarks):
        if self.calibrated_upright is None or landmarks is None:
            return landmarks
        return landmarks - self.calibrated_upright


class PostureDetectionApp:
    """Standalone posture detection app with matplotlib display"""
    
    def __init__(self, model_path='enhanced_cnn_lstm_model.h5',
                 scaler_path='enhanced_cnn_lstm_scaler.pkl',
                 label_encoder_path='enhanced_cnn_lstm_label_encoder.pkl'):
        
        print("="*60)
        print("POSTURE DETECTION SYSTEM")
        print("="*60)
        print("\nInitializing...")
        
        try:
            self.detector = EnhancedPostureDetector(
                model_path=model_path,
                scaler_path=scaler_path,
                label_encoder_path=label_encoder_path
            )
        except Exception as e:
            print(f"‚ùå Failed to load model: {e}")
            print("\nPlease check that these files exist:")
            print(f"  - {model_path}")
            print(f"  - {scaler_path}")
            print(f"  - {label_encoder_path}")
            raise
        
        self.pose = mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5,
            model_complexity=1
        )
        
        self.cap = None
        self.is_running = False
        self.is_calibrating = False
        self.calibrated = False
        
        # Create matplotlib figure for results display
        plt.ion()  # Interactive mode
        self.fig, (self.ax_main, self.ax_status) = plt.subplots(1, 2, figsize=(14, 6))
        self.fig.canvas.manager.set_window_title('Posture Detection Results')
        
        self.setup_display()
        
        print("‚úÖ App initialized successfully!")
    
    def setup_display(self):
        """Setup the matplotlib display"""
        # Main video display
        self.ax_main.set_title('Live Feed with Pose Detection', fontsize=14, fontweight='bold')
        self.ax_main.axis('off')
        
        # Status panel
        self.ax_status.set_xlim(0, 10)
        self.ax_status.set_ylim(0, 10)
        self.ax_status.axis('off')
        self.ax_status.set_title('Status & Controls', fontsize=14, fontweight='bold')
        
        plt.tight_layout()
    
    def update_status_panel(self, posture=None, confidence=0.0, fps=0):
        """Update the status panel with current information"""
        self.ax_status.clear()
        self.ax_status.set_xlim(0, 10)
        self.ax_status.set_ylim(0, 10)
        self.ax_status.axis('off')
        
        y_pos = 9.5
        
        # Title
        self.ax_status.text(5, y_pos, 'POSTURE STATUS', ha='center', fontsize=16, 
                           fontweight='bold', color='#2c3e50')
        y_pos -= 1.2
        
        # Calibration status
        cal_color = '#27ae60' if self.calibrated else '#e67e22'
        cal_text = '‚úì CALIBRATED' if self.calibrated else '‚ö† NOT CALIBRATED'
        self.ax_status.text(5, y_pos, cal_text, ha='center', fontsize=12, 
                           fontweight='bold', color=cal_color)
        y_pos -= 1
        
        # Current posture
        if posture and self.calibrated:
            posture_display = posture.replace('_', ' ').upper()
            color_bgr = POSE_COLORS.get(posture, (128, 128, 128))
            color_rgb = (color_bgr[2]/255, color_bgr[1]/255, color_bgr[0]/255)
            
            # Draw colored box behind posture
            rect = patches.Rectangle((1, y_pos-0.5), 8, 1.2, 
                                    linewidth=2, edgecolor=color_rgb, 
                                    facecolor=(*color_rgb, 0.2))
            self.ax_status.add_patch(rect)
            
            self.ax_status.text(5, y_pos, posture_display, ha='center', fontsize=14, 
                               fontweight='bold', color=color_rgb)
            y_pos -= 1.5
            
            # Confidence bar
            self.ax_status.text(2, y_pos, 'Confidence:', ha='left', fontsize=10)
            
            # Background bar
            rect_bg = patches.Rectangle((2, y_pos-0.4), 6, 0.3, 
                                       linewidth=1, edgecolor='gray', facecolor='lightgray')
            self.ax_status.add_patch(rect_bg)
            
            # Confidence bar
            bar_width = 6 * confidence
            rect_conf = patches.Rectangle((2, y_pos-0.4), bar_width, 0.3, 
                                         linewidth=0, facecolor=color_rgb)
            self.ax_status.add_patch(rect_conf)
            
            self.ax_status.text(8.2, y_pos, f'{confidence:.0%}', ha='left', fontsize=10, 
                               fontweight='bold')
            y_pos -= 1.2
        
        # Separator line
        self.ax_status.plot([1, 9], [y_pos, y_pos], 'k-', linewidth=1)
        y_pos -= 0.8
        
        # Controls
        self.ax_status.text(5, y_pos, 'KEYBOARD CONTROLS', ha='center', fontsize=12, 
                           fontweight='bold', color='#2c3e50')
        y_pos -= 0.8
        
        controls = [
            ('SPACE', 'Start/Stop Calibration'),
            ('Q', 'Quit Application'),
            ('R', 'Reset & Recalibrate')
        ]
        
        for key, action in controls:
            self.ax_status.text(2, y_pos, f'[{key}]', ha='left', fontsize=10, 
                               fontweight='bold', color='#3498db')
            self.ax_status.text(3.5, y_pos, action, ha='left', fontsize=10)
            y_pos -= 0.7
        
        # Separator
        y_pos -= 0.3
        self.ax_status.plot([1, 9], [y_pos, y_pos], 'k-', linewidth=1)
        y_pos -= 0.8
        
        # System info
        self.ax_status.text(5, y_pos, f'FPS: {fps:.1f}', ha='center', fontsize=10, 
                           color='#7f8c8d')
        y_pos -= 0.6
        
        # Current state
        if self.is_calibrating:
            self.ax_status.text(5, y_pos, '‚è≥ CALIBRATING...', ha='center', fontsize=11, 
                               fontweight='bold', color='#e67e22')
        elif not self.calibrated:
            self.ax_status.text(5, y_pos, 'Press SPACE to calibrate', ha='center', 
                               fontsize=10, color='#95a5a6', style='italic')
    
    def start_camera(self):
        """Start camera and run detection loop"""
        
        print("\n" + "="*60)
        print("STARTING DETECTION")
        print("="*60)
        print("\nüìπ Initializing camera...")
        
        self.cap = cv2.VideoCapture(0)
        
        if not self.cap.isOpened():
            print("‚ùå Error: Could not open camera!")
            print("\nTroubleshooting:")
            print("1. Check if another app is using the camera")
            print("2. Restart the kernel")
            print("3. Check camera permissions")
            return
        
        print("‚úÖ Camera initialized")
        print("\n" + "="*60)
        print("INSTRUCTIONS:")
        print("="*60)
        print("1. Press SPACE to start calibration")
        print("2. Sit in proper upright posture during calibration")
        print("3. System will calibrate for ~1 second")
        print("4. Press Q to quit at any time")
        print("="*60 + "\n")
        print("üìä Results window opened - check your taskbar!\n")
        
        self.is_running = True
        frame_count = 0
        calibration_frame_count = 0
        last_time = time.time()
        fps = 0
        
        try:
            while self.is_running:
                ret, frame = self.cap.read()
                if not ret:
                    print("‚ùå Could not read frame")
                    break
                
                # Calculate FPS
                current_time = time.time()
                fps = 1.0 / (current_time - last_time) if (current_time - last_time) > 0 else 0
                last_time = current_time
                
                # Flip frame for mirror effect
                frame = cv2.flip(frame, 1)
                
                # Process with MediaPipe
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = self.pose.process(rgb_frame)
                landmarks = self.detector.extract_landmarks(results.pose_landmarks)
                
                # Draw pose landmarks on frame
                if results.pose_landmarks:
                    mp_drawing.draw_landmarks(
                        frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                        mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                        mp_drawing.DrawingSpec(color=(255, 255, 255), thickness=2)
                    )
                
                posture = None
                confidence = 0.0
                
                # Calibration logic
                if self.is_calibrating:
                    cv2.putText(frame, "CALIBRATING: Hold upright posture...", 
                               (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 122, 255), 3)
                    
                    progress = int((calibration_frame_count / 30) * 100)
                    cv2.rectangle(frame, (10, 80), (10 + int(progress * 6), 110), 
                                 (0, 255, 0), -1)
                    cv2.rectangle(frame, (10, 80), (610, 110), (255, 255, 255), 2)
                    
                    self.detector.calibrate_sample(landmarks)
                    calibration_frame_count += 1
                    
                    if calibration_frame_count > 30:  # ~1 second
                        self.is_calibrating = False
                        self.calibrated = True
                        self.detector.save_calibration()
                        calibration_frame_count = 0
                        print("‚úÖ Calibration complete!")
                
                elif not self.calibrated:
                    cv2.putText(frame, "Press SPACE to calibrate", 
                               (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 0), 3)
                
                # Posture detection (after calibration)
                if self.calibrated and landmarks is not None:
                    adj_landmarks = self.detector.delta_from_calibrated(landmarks)
                    posture, confidence = self.detector.predict_posture(adj_landmarks)
                    
                    # Display posture on frame
                    color = POSE_COLORS.get(posture, (255, 255, 255))
                    cv2.putText(frame, f"{posture.upper()}", 
                               (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.5, color, 4)
                    cv2.putText(frame, f"Confidence: {confidence:.0%}", 
                               (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
                
                # Display FPS
                cv2.putText(frame, f"FPS: {fps:.1f}", 
                           (frame.shape[1] - 150, 30), cv2.FONT_HERSHEY_SIMPLEX, 
                           0.7, (0, 255, 0), 2)
                
                # Update matplotlib display
                self.ax_main.clear()
                self.ax_main.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                self.ax_main.axis('off')
                self.ax_main.set_title('Live Feed with Pose Detection', 
                                      fontsize=14, fontweight='bold')
                
                self.update_status_panel(posture, confidence, fps)
                
                plt.draw()
                plt.pause(0.001)
                
                # Handle keyboard input
                if plt.waitforbuttonpress(timeout=0.001):
                    pass
                
                # Check for matplotlib window close
                if not plt.fignum_exists(self.fig.number):
                    print("\nüõë Window closed")
                    break
                
                frame_count += 1
                
        except KeyboardInterrupt:
            print("\nüõë Stopped by user")
        except Exception as e:
            print(f"\n‚ùå Error during detection: {e}")
            import traceback
            traceback.print_exc()
        finally:
            self.cleanup()
    
    def cleanup(self):
        """Release resources"""
        print("\n" + "="*60)
        print("CLEANING UP")
        print("="*60)
        
        if self.cap:
            self.cap.release()
            print("‚úì Camera released")
        
        self.pose.close()
        print("‚úì MediaPipe resources released")
        
        plt.close('all')
        print("‚úì Display windows closed")
        
        print("\n" + "="*60)
        print("SESSION COMPLETE")
        print("="*60)


# Function to handle keyboard events
def on_key_press(event, app):
    """Handle keyboard events"""
    if event.key == ' ':  # Space bar
        if not app.calibrated or not app.is_calibrating:
            print("\nüéØ Starting calibration - sit upright!")
            app.is_calibrating = True
            app.calibrated = False
    elif event.key == 'q':  # Q key
        print("\nüõë Quit requested")
        app.is_running = False
    elif event.key == 'r':  # R key
        print("\nüîÑ Resetting calibration")
        app.calibrated = False
        app.is_calibrating = False
        app.detector.calibrated_upright = None


# 1. Configuration - Set to 17 landmarks to reach 68 features
# Indices 0-16 cover the face, shoulders, elbows, and wrists.
LANDMARK_INDICES = list(range(17)) 
POSE_CLASSES = ['leaning_backward', 'leaning_forward', 'leaning_left', 'leaning_right', 'upright']

def run_posture_detection(model_path, scaler_path, label_encoder_path):
    # Initialize MediaPipe
    mp_pose = mp.solutions.pose
    mp_drawing = mp.solutions.drawing_utils
    pose_engine = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
    # Load Models
    print("Loading AI models...")
    model = tf.keras.models.load_model(model_path)
    scaler = joblib.load(scaler_path)
    
    # Initialize Time Tracking
    posture_timers = {pose: 0.0 for pose in POSE_CLASSES}
    last_time = time.time()
    
    cap = cv2.VideoCapture(0)
    print("‚úì System Active. Use the 'Stop' button (square icon) in the toolbar to exit.")

    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret: break

            # 2. Extract exactly 17 landmarks (68 features)
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose_engine.process(frame_rgb)
            
            current_pose = "Scanning..."
            landmark_canvas = np.zeros_like(frame)

            if results.pose_landmarks:
                features = []
                # Extract 17 landmarks * 4 values (x, y, z, v) = 68 total
                for idx in LANDMARK_INDICES:
                    lm = results.pose_landmarks.landmark[idx]
                    features.extend([lm.x, lm.y, lm.z, lm.visibility])
                
                # 3. Prediction Logic
                features_arr = np.array(features).reshape(1, -1)
                
                # Step A: Scale the 68 features
                scaled_features = scaler.transform(features_arr)
                
                # Step B: Predict using the Model (now receiving the expected 68)
                prediction = model.predict(scaled_features, verbose=0)
                current_pose = POSE_CLASSES[np.argmax(prediction)]
                
                # 4. Time Tracking Update
                now = time.time()
                posture_timers[current_pose] += (now - last_time)
                last_time = now
                
                # Draw skeleton on the landmark canvas
                mp_drawing.draw_landmarks(landmark_canvas, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # 5. Create Split View
            combined = np.hstack((frame, landmark_canvas))
            
            # HUD Overlay
            h, w, _ = frame.shape
            cv2.putText(combined, f"Pose: {current_pose.upper()}", (20, 40), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
            
            y_offset = 40
            for p_class, sec in posture_timers.items():
                active = (p_class == current_pose)
                color = (0, 255, 0) if active else (200, 200, 200)
                cv2.putText(combined, f"{p_class}: {sec:.1f}s", (w + 20, y_offset), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2 if active else 1)
                y_offset += 30

            # 6. Jupyter-Safe Display
            _, buffer = cv2.imencode('.png', combined)
            display(Image.open(io.BytesIO(buffer)))
            clear_output(wait=True)

    except KeyboardInterrupt:
        pass
    finally:
        cap.release()
        pose_engine.close()
        print("\nSession Summary:")
        for p, s in posture_timers.items():
            print(f"- {p}: {s:.2f} seconds")

In [35]:
# Example usage
if __name__ == "__main__":
    # Run with default paths
    #run_posture_detection()
    
    # Or specify custom paths:
    run_posture_detection(
         model_path='high_accuracy_posture_model_1758965690.h5',
         scaler_path='high_accuracy_scaler_1758965690.pkl',
         label_encoder_path='high_accuracy_encoder_1758965690.pkl'
     )




Session Summary:
- leaning_backward: 45.40 seconds
- leaning_forward: 70.96 seconds
- leaning_left: 17.46 seconds
- leaning_right: 107.67 seconds
- upright: 1.02 seconds
