In [17]:
# Import required libraries
import cv2
import time
import torch
import numpy as np
import os
from utils.datasets import letterbox
from utils.torch_utils import select_device
from models.experimental import attempt_load
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from utils.general import non_max_suppression_kpt, strip_optimizer
from torchvision import transforms

In [18]:
class FallDetector:
    def __init__(self, poseweights='yolov7-w6-pose.pt', device='0'):
        """
        Initialize the Fall Detector
        
        Args:
            poseweights: Path to the YOLOv7 pose weights
            device: Device to run inference on ('cpu' or '0', '1', etc. for GPU)
        """
        print(f"Initializing Fall Detector with weights: {poseweights} on device: {device}")
        
        # Select the appropriate device
        self.device = select_device(device)
        self.half = self.device.type != 'cpu'
        
        # Load model
        self.model = attempt_load(poseweights, map_location=self.device)
        self.model.eval()
        
        # Create output directory if it doesn't exist
        os.makedirs('output', exist_ok=True)
        
        # Set thresholds for fall detection
        self.fall_detected = False
        self.prev_key_points = None
        self.fall_count = 0  # Counter for consecutive fall detections
        self.non_fall_count = 0  # Counter for consecutive non-fall detections
        self.fall_threshold = 3  # Number of consecutive detections required to trigger fall alert
        self.fall_history = []  # To store history of fall detections
        
        # Time tracking
        self.last_fall_time = 0
        
    def detect_fall(self, key_points):
        """
        Detect falls using the algorithm described in the journal
        
        Args:
            key_points: Array of key points from YOLOv7-W6-Pose model
            
        Returns:
            bool: True if fall detected, False otherwise
        """
        # Extract coordinates of key points as described in journal
        # The key points are organized as [x1, y1, conf1, x2, y2, conf2, ...]
        # We need to reorganize them to get the specific joints
        
        # Note: COCO keypoint format:
        # 0: nose, 1: left_eye, 2: right_eye, 3: left_ear, 4: right_ear, 
        # 5: left_shoulder, 6: right_shoulder, 7: left_elbow, 8: right_elbow,
        # 9: left_wrist, 10: right_wrist, 11: left_hip, 12: right_hip,
        # 13: left_knee, 14: right_knee, 15: left_ankle, 16: right_ankle
        
        # If we don't have all key points needed for fall detection, return False
        if key_points.shape[0] < 17:
            return False
            
        # Extract key points as per journal's algorithm
        # Left and right shoulders
        left_shoulder = key_points[5]
        right_shoulder = key_points[6]
        
        # Torso (left and right hip)
        left_torso = key_points[11]  # Left hip
        right_torso = key_points[12]  # Right hip
        
        # Feet (left and right ankles)
        left_foot = key_points[15]  # Left ankle
        right_foot = key_points[16]  # Right ankle
        
        # Extract coordinates
        xl, yl = left_shoulder[0], left_shoulder[1]  # Left shoulder coordinates
        xr, yr = right_shoulder[0], right_shoulder[1]  # Right shoulder coordinates
        xTl, yTl = left_torso[0], left_torso[1]  # Left hip coordinates
        xTr, yTr = right_torso[0], right_torso[1]  # Right hip coordinates
        xFl, yFl = left_foot[0], left_foot[1]  # Left ankle coordinates
        xFr, yFr = right_foot[0], right_foot[1]  # Right ankle coordinates
        
        # Calculate length factor as described in the journal formula (1)
        # Equation 1: Lfactor = sqrt((xl - xTl)^2 + (yl - yTl)^2)
        length_factor = np.sqrt((xl - xTl) ** 2 + (yl - yTl) ** 2)
        
        # Check fall conditions as per journal's criteria
        
        # Condition 1: Check if shoulders are lower than feet (adjusted by length factor)
        # Equation 2: yl ≤ yFl + α·Lfactor
        alpha = 0.1  # Small adjustment factor
        shoulder_below_feet = yl <= yFl + alpha * length_factor
        
        # Calculate body dimensions
        # Equation 3: Hbody = |yl - yFl|
        body_height = abs(yl - yFl)
        
        # Equation 4: Wbody = |xl - xr|
        body_width = abs(xl - xr)
        
        # Condition 2: Check if body is "stretched" horizontally
        # Equation 5: Hbody < Wbody
        horizontally_stretched = body_height < body_width
        
        # Calculate vertical speed if we have previous key points
        speed_threshold = 15.0  # Threshold for vertical speed
        rapid_movement = False
        
        if self.prev_key_points is not None:
            prev_yl = self.prev_key_points[5][1]  # Previous y-coordinate of left shoulder
            vertical_displacement = abs(yl - prev_yl)
            # Calculate vertical speed (displacement per frame)
            vertical_speed = vertical_displacement
            rapid_movement = vertical_speed > speed_threshold
        
        # Store current key points for next frame comparison
        self.prev_key_points = key_points
        
        # Calculate angle between torso and legs
        # Using the angle between the line connecting hip to shoulder and the line connecting hip to ankle
        angle_threshold = 45  # degrees
        
        # Calculate vectors
        torso_vector = [xl - xTl, yl - yTl]
        leg_vector = [xFl - xTl, yFl - yTl]
        
        # Calculate magnitude of vectors
        torso_magnitude = np.sqrt(torso_vector[0]**2 + torso_vector[1]**2)
        leg_magnitude = np.sqrt(leg_vector[0]**2 + leg_vector[1]**2)
        
        # Calculate the dot product
        dot_product = torso_vector[0] * leg_vector[0] + torso_vector[1] * leg_vector[1]
        
        # Calculate the angle in degrees
        if torso_magnitude * leg_magnitude != 0:  # Avoid division by zero
            angle = np.arccos(dot_product / (torso_magnitude * leg_magnitude)) * 180 / np.pi
            acute_angle = abs(angle < angle_threshold)
        else:
            acute_angle = False
            
        # Determine if a fall has occurred based on all conditions
        # A fall is detected if:
        # 1. Shoulders are below feet (adjusted by length factor), or
        # 2. Body is more horizontal than vertical, and
        # 3. There's a rapid vertical movement, or
        # 4. The angle between torso and legs is acute
        
        is_fall = (shoulder_below_feet or horizontally_stretched) and (rapid_movement or acute_angle)
        
        return is_fall

    def process_frame(self, frame):
        """
        Process a single frame for fall detection
        
        Args:
            frame: Video frame to process
            
        Returns:
            frame: Processed frame with detections
            is_fall: Boolean indicating whether a fall was detected
        """
        # Preprocess image
        orig_image = frame.copy()
        image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB)
        
        # Resize image while maintaining aspect ratio
        frame_height, frame_width = orig_image.shape[:2]
        image = letterbox(image, (frame_width), stride=64, auto=True)[0]
        
        # Convert to tensor
        image_ = image.copy()
        image = transforms.ToTensor()(image)
        image = torch.tensor(np.array([image.numpy()]))
        
        image = image.to(self.device)
        image = image.float()
        
        # Inference
        with torch.no_grad():
            output, _ = self.model(image)
            
        # Post-process
        output = non_max_suppression_kpt(output, 0.5, 0.65, nc=self.model.yaml['nc'], nkpt=self.model.yaml['nkpt'], kpt_label=True)
        output = output_to_keypoint(output)
        
        # Convert back to BGR for display
        img = image[0].permute(1, 2, 0) * 255
        img = img.cpu().numpy().astype(np.uint8)
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
        
        # Initialize fall status for this frame
        is_fall = False
        
        # Process each person detected
        for idx in range(output.shape[0]):
            # Draw skeleton
            plot_skeleton_kpts(img, output[idx, 7:].T, 3)
            
            # Get key points for this person
            key_points = output[idx, 7:].reshape(-1, 3)  # reshape to (17, 3) [x, y, conf]
            
            # Detect fall for this person
            person_fall = self.detect_fall(key_points)
            
            # If any person is falling, set is_fall to True
            if person_fall:
                is_fall = True
                
                # Add visual indication of fall
                cv2.putText(img, "FALL DETECTED", (50, 50), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                
                # Draw red bounding box around the person
                x1, y1, x2, y2 = output[idx, 0], output[idx, 1], output[idx, 2], output[idx, 3]
                cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 2)
        
        # Update fall state based on consecutive detections
        self.update_fall_state(is_fall)
        
        return img, self.fall_detected

    def update_fall_state(self, is_fall):
        """
        Update the fall state based on consecutive detections
        
        Args:
            is_fall: Boolean indicating whether a fall was detected in current frame
        """
        if is_fall:
            self.fall_count += 1
            self.non_fall_count = 0
            
            # If we have enough consecutive fall detections, set fall_detected to True
            if self.fall_count >= self.fall_threshold:
                current_time = time.time()
                
                # Check if 2 minutes have passed since last fall alert
                if current_time - self.last_fall_time > 120:  # 120 seconds = 2 minutes
                    self.fall_detected = True
                    self.last_fall_time = current_time
        else:
            self.fall_count = 0
            self.non_fall_count += 1
            
            # If we have enough consecutive non-fall detections, reset fall_detected
            if self.non_fall_count >= self.fall_threshold:
                self.fall_detected = False

usage: ipykernel_launcher.py [-h] [--weights WEIGHTS] --source SOURCE [--device DEVICE] [--no-display] [--no-save]
ipykernel_launcher.py: error: the following arguments are required: --source


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
@torch.no_grad()
def run_fall_detection(poseweights='yolov7-w6-pose.pt', source='pose.mp4', device='cpu', display=True, save_output=True):
    """
    Run fall detection on a video or webcam feed
    
    Args:
        poseweights: Path to the YOLOv7 pose weights
        source: Path to video file or webcam ID (0, 1, etc.)
        device: Device to run inference on ('cpu' or '0', '1', etc. for GPU)
        display: Whether to show video with detections in real-time
        save_output: Whether to save the output video
    """
    # Initialize the fall detector
    detector = FallDetector(poseweights=poseweights, device=device)
    
    # Parse the input source
    input_path = source
    if source.isnumeric():
        input_path = int(source)
    
    # Open video capture
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"Error: Could not open video source {source}")
        return
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Setup output video writer if requested
    out = None
    if save_output:
        if isinstance(input_path, int):
            # For webcam
            output_path = os.path.join('output', f"webcam_fall_detection.mp4")
        else:
            # For video file
            filename = os.path.basename(input_path).split('.')[0]
            output_path = os.path.join('output', f"{filename}_fall_detection.mp4")
        
        # Create VideoWriter
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
        print(f"Output will be saved to: {output_path}")
    
    # Process video frames
    frame_count = 0
    total_fps = 0
    
    print("Starting fall detection...")
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        print(f"Processing frame {frame_count}")
        
        # Process frame for fall detection
        start_time = time.time()
        processed_frame, is_fall = detector.process_frame(frame)
        end_time = time.time()
        
        # Calculate FPS
        processing_fps = 1 / (end_time - start_time)
        total_fps += processing_fps
        
        # Resize processed frame to match original dimensions for display and saving
        processed_frame_resized = cv2.resize(processed_frame, (frame_width, frame_height))
        
        # Display fall status on frame
        if is_fall:
            # Red text for fall alert
            cv2.putText(processed_frame_resized, "FALL DETECTED!", (50, 100), 
                       cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), 3)
        
        # Add FPS info
        cv2.putText(processed_frame_resized, f"FPS: {processing_fps:.2f}", (50, 150), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # Display the frame if requested
        if display:
            cv2.imshow('Fall Detection', processed_frame_resized)
            
            # Exit on 'q' press
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        # Save frame to output video if requested
        if save_output and out is not None:
            out.write(processed_frame_resized)
    
    # Release resources
    cap.release()
    if save_output and out is not None:
        out.release()
    cv2.destroyAllWindows()
    
    # Print statistics
    if frame_count > 0:
        avg_fps = total_fps / frame_count
        print(f"Processed {frame_count} frames")
        print(f"Average FPS: {avg_fps:.2f}")
        if save_output:
            print(f"Output saved to: {output_path}")

In [None]:
def run_interactive():
    """
    Interactive function to run fall detection with user input
    """
    # Get the weights file
    poseweights = input("Enter path to weights file [default: yolov7-w6-pose.pt]: ") or "yolov7-w6-pose.pt"
    
    # Get device type
    use_gpu = input("Use GPU? (y/n) [default: y]: ").lower() or "y"
    if use_gpu == "y":
        device = input("Enter GPU device ID [default: 0]: ") or "0"
    else:
        device = "cpu"
    
    # Get source type
    print("\nSelect input source:")
    print("1: Video file")
    print("2: Webcam")
    source_choice = input("Enter choice [1/2]: ")
    
    if source_choice == "1":
        # Video file
        default_video = "sample_video.mp4"
        source = input(f"Enter video file path [default: {default_video}]: ") or default_video
        # Ask if user wants to display the processed video in real-time
        display_video = input("Display video with pose estimation in real-time? (y/n) [default: y]: ").lower() or "y"
    else:
        # Webcam
        cam_id = input("Enter webcam ID [default: 0]: ") or "0"
        source = cam_id
        display_video = "y"  # Always display for webcam
    
    # Ask if user wants to save the output video
    save_video = input("Save output video? (y/n) [default: y]: ").lower() or "y"
        
    print(f"\nRunning fall detection with:")
    print(f"- Weights: {poseweights}")
    print(f"- Device: {device}")
    print(f"- Source: {source}")
    print(f"- Display: {'Yes' if display_video == 'y' else 'No'}")
    print(f"- Save output: {'Yes' if save_video == 'y' else 'No'}")
    confirmation = input("\nConfirm? (y/n) [default: y]: ").lower() or "y"
    
    if confirmation == "y":
        # Run the model
        run_with_display = (display_video == "y")
        save_output = (save_video == "y")
        
        # First strip optimizer to ensure model works correctly
        strip_optimizer(device, poseweights)
        
        # Run fall detection
        run_fall_detection(
            poseweights=poseweights,
            source=source,
            device=device,
            display=run_with_display,
            save_output=save_output
        )
    else:
        print("Operation cancelled")

In [None]:
# Run interactively
run_interactive()

In [None]:
# Alternatively, you can run directly with specific parameters
# Uncomment and modify the line below to run with specific parameters

# run_fall_detection(poseweights='yolov7-w6-pose.pt', source='sample_video.mp4', device='0', display=True, save_output=True)

In [None]:
# Display a saved video result (if available)
def play_video_result(video_path):
    """Play a video file in the notebook"""
    if not os.path.exists(video_path):
        print(f"Error: Video file {video_path} not found.")
        return
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}.")
        return
    
    print(f"Playing video: {video_path}")
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_delay = int(1000 / fps)  # Delay between frames in ms
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Display frame
        cv2.imshow('Video Result', frame)
        
        # Break on 'q' press
        if cv2.waitKey(frame_delay) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

In [None]:
def analyze_performance(metrics_file=None):
    """
    Analyze performance metrics from the fall detection system
    
    Args:
        metrics_file: Optional path to a file containing performance metrics
    """
    print("Fall Detection System Performance Analysis")
    print("==========================================")
    
    # Sample metrics (replace with actual metrics from your tests)
    # These metrics represent the values from the paper
    accuracy = 95.7
    precision = 94.7
    recall = 96.4
    specificity = 95.0
    f1_score = 95.5
    
    print(f"Accuracy:    {accuracy}%")
    print(f"Precision:   {precision}%")
    print(f"Recall:      {recall}%")
    print(f"Specificity: {specificity}%")
    print(f"F1 Score:    {f1_score}")
    
    # Comparison with other methods (as per the paper)
    print("\nComparison with other methods:")
    print("-----------------------------")
    print("Model          | Accuracy | Precision | Recall  | Specificity | F1 Score")
    print("---------------|----------|-----------|---------|-------------|--------")
    print("Chamle et al.  | 79.30%   | 84.30%    | 73.07%  | 79.40%      | 78.28%")
    print("Alaoui et al.  | 90.90%   | 94.56%    | 81.08%  | 90.84%      | 87.30%")
    print("Poonsri et al. | 86.21%   | 89.1%     | 93.18%  | 64.29%      | 91.1%")
    print("Our method     | 96.15%   | 97%       | 97.98%  | 90.32%      | 97.48%")
    
    # Plot metrics for visualization (if matplotlib is available)
    try:
        import matplotlib.pyplot as plt
        
        methods = ['Chamle et al.', 'Alaoui et al.', 'Poonsri et al.', 'Our method']
        accuracy_values = [79.30, 90.90, 86.21, 96.15]
        precision_values = [84.30, 94.56, 89.1, 97.0]
        recall_values = [73.07, 81.08, 93.18, 97.98]
        specificity_values = [79.40, 90.84, 64.29, 90.32]
        f1_values = [78.28, 87.30, 91.1, 97.48]
        
        metrics = ['Accuracy', 'Precision', 'Recall', 'Specificity', 'F1 Score']
        
        # Bar chart for all metrics
        plt.figure(figsize=(12, 8))
        
        x = np.arange(len(methods))
        width = 0.15
        
        plt.bar(x - 2*width, accuracy_values, width, label='Accuracy')
        plt.bar(x - width, precision_values, width, label='Precision')
        plt.bar(x, recall_values, width, label='Recall')
        plt.bar(x + width, specificity_values, width, label='Specificity')
        plt.bar(x + 2*width, f1_values, width, label='F1 Score')
        
        plt.ylabel('Percentage (%)')
        plt.title('Performance Comparison of Fall Detection Methods')
        plt.xticks(x, methods)
        plt.legend()
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        
        # Add value labels on top of each bar
        for i in range(len(methods)):
            plt.text(i - 2*width, accuracy_values[i] + 1, f"{accuracy_values[i]:.1f}", ha='center', va='bottom', fontsize=8)
            plt.text(i - width, precision_values[i] + 1, f"{precision_values[i]:.1f}", ha='center', va='bottom', fontsize=8)
            plt.text(i, recall_values[i] + 1, f"{recall_values[i]:.1f}", ha='center', va='bottom', fontsize=8)
            plt.text(i + width, specificity_values[i] + 1, f"{specificity_values[i]:.1f}", ha='center', va='bottom', fontsize=8)
            plt.text(i + 2*width, f1_values[i] + 1, f"{f1_values[i]:.1f}", ha='center', va='bottom', fontsize=8)
        
        plt.tight_layout()
        plt.show()
        
    except ImportError:
        print("\nNote: Install matplotlib to visualize performance metrics.")