<a href="https://colab.research.google.com/github/sid040703/Fall-Detection-For-Elderly-People/blob/main/Fall_Detection_For_Elderly_People.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install tensorflow opencv-python



In [None]:
import os
import cv2
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

In [None]:
def extract_frames_from_video(video_path, output_dir, frame_rate=1):
    """
    Extract frames from a video and save them to the specified directory.
    Args:
        video_path (str): Path to the input video.
        output_dir (str): Directory to save the extracted frames.
        frame_rate (int): Number of frames to extract per second.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_interval = max(1, fps // frame_rate)
    frame_count = 0
    saved_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_interval == 0:
            frame_filename = os.path.join(output_dir, f"frame_{saved_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)
            saved_count += 1

        frame_count += 1

    cap.release()
    print(f"Extracted {saved_count} frames from {video_path}.")

# Extract frames from a sample video
video_path = '/content/WhatsApp Video 2024-11-29 at 15.32.47.mp4'
frame_dir = '/content/drive/My Drive/Colab Notebooks/dataset/ur_fall/frames/fall'
extract_frames_from_video(video_path, frame_dir)

Extracted 0 frames from /content/WhatsApp Video 2024-11-29 at 15.32.47.mp4.


In [None]:


def load_frames_and_labels(frame_dir):
    """
    Load paths to frames and their corresponding labels.
    Args:
        frame_dir (str): Path to the directory containing fall and non-fall frames.
    Returns:
        list, list: Paths to frames and corresponding labels.
    """
    frame_paths, labels = [], []
    for label_dir in os.listdir(frame_dir):
        label_path = os.path.join(frame_dir, label_dir)
        label = 0 if 'fall' in label_dir.lower() else 1  # 0: Fall, 1: Non-Fall
        for frame in os.listdir(label_path):
            frame_paths.append(os.path.join(label_path, frame))
            labels.append(label)
    return frame_paths, labels

# Load frame paths and labels
frame_dir = '/content/drive/MyDrive/Colab Notebooks/ur_fall/frames'
frame_paths, labels = load_frames_and_labels(frame_dir)
print(f"Loaded {len(frame_paths)} frames with labels.")




import cv2
import numpy as np

class OpenPoseLight:
    def detect(self, frame):
        """
        Simulate pose detection. Replace with OpenPose-light implementation.
        Args:
            frame: Input image frame.
        Returns:
            dict: Simulated pose keypoints and aspect ratio.
        """
        keypoints = np.random.rand(18, 2)  # Simulated 18 keypoints (x, y)
        aspect_ratio = 1.0  # Simulated aspect ratio
        return {'keypoints': keypoints, 'aspect_ratio': aspect_ratio}

# Initialize pose detector
pose_detector = OpenPoseLight()



def generate_sequences_from_frames(frame_paths, labels, pose_detector, sequence_length=16):
    """
    Generate sequences of pose keypoints from frames.
    Args:
        frame_paths (list): List of frame paths.
        labels (list): List of corresponding labels.
        pose_detector (object): Pose detection model.
        sequence_length (int): Number of frames per sequence.
    Returns:
        np.array, np.array: Sequences and labels.
    """
    sequences, sequence_labels = [], []
    sequence, label_sequence = [], []

    for frame_path, label in zip(frame_paths, labels):
        frame = cv2.imread(frame_path)
        if frame is None:
            continue  # Skip invalid frames

        keypoints = pose_detector.detect(frame)['keypoints'].flatten()
        sequence.append(keypoints)
        label_sequence.append(label)

        if len(sequence) == sequence_length:
            sequences.append(sequence.copy())
            sequence_labels.append(label_sequence[-1])
            sequence.pop(0)  # Sliding window
            label_sequence.pop(0)

    return np.array(sequences), np.array(sequence_labels)

# Generate sequences
sequence_length = 16
X_seq, y_seq = generate_sequences_from_frames(frame_paths, labels, pose_detector, sequence_length)
print(f"Generated {len(X_seq)} sequences.")

Loaded 2371 frames with labels.
Generated 2356 sequences.


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

def create_lstm_model():
    """
    Create an LSTM model for fall detection.
    Returns:
        keras.Model: Compiled LSTM model.
    """
    model = Sequential([
        LSTM(64, input_shape=(16, 36), return_sequences=True),  # 16 frames, 36 keypoints
        LSTM(64),
        Dense(64, activation='relu'),
        Dense(2, activation='softmax')  # Output: Fall or Non-Fall
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Train the model
lstm_model = create_lstm_model()
lstm_model.fit(X_seq, y_seq, epochs=50, batch_size=32, validation_split=0.2)

# Save the trained model
lstm_model.save('/content/drive/My Drive/Colab Notebooks/dataset/lstm_fall_detection.h5')
print("Model saved to Google Drive.")







Epoch 1/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 50ms/step - accuracy: 0.9927 - loss: 0.1470 - val_accuracy: 1.0000 - val_loss: 2.2175e-05
Epoch 2/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 23ms/step - accuracy: 1.0000 - loss: 2.0373e-05 - val_accuracy: 1.0000 - val_loss: 1.6827e-05
Epoch 3/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 23ms/step - accuracy: 1.0000 - loss: 1.5948e-05 - val_accuracy: 1.0000 - val_loss: 1.3392e-05
Epoch 4/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 22ms/step - accuracy: 1.0000 - loss: 1.2677e-05 - val_accuracy: 1.0000 - val_loss: 1.0690e-05
Epoch 5/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 22ms/step - accuracy: 1.0000 - loss: 1.0148e-05 - val_accuracy: 1.0000 - val_loss: 8.6394e-06
Epoch 6/50
[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 21ms/step - accuracy: 1.0000 - loss: 8.2317e-06 - val_accuracy: 1.0000 - val_loss: 7



Model saved to Google Drive.


In [None]:
# Example reshaping
!pip install mediapipe



Collecting mediapipe
  Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.20-cp311-cp311-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.20 sounddevice-0.5.1


In [None]:
!pip install Mediapipe



In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode
import cv2
import numpy as np
from tensorflow.keras.models import load_model
from collections import deque
from mediapipe.python.solutions.pose import Pose  # Mediapipe for pose detection
from google.colab import files  # For downloading files
import os

def record_video(filename='recorded_video.webm'):
    """
    Records a video from the webcam and saves it locally.
    """
    js = Javascript('''
    async function recordVideo() {
        const div = document.createElement('div');
        const recordButton = document.createElement('button');
        const stopButton = document.createElement('button');
        const video = document.createElement('video');

        recordButton.textContent = 'Start Recording';
        stopButton.textContent = 'Stop Recording';
        stopButton.style.display = 'none';

        div.appendChild(recordButton);
        div.appendChild(stopButton);
        div.appendChild(video);

        document.body.appendChild(div);

        const stream = await navigator.mediaDevices.getUserMedia({video: true});
        video.srcObject = stream;
        video.style.display = 'block';
        await video.play();

        const recorder = new MediaRecorder(stream);
        const chunks = [];

        recorder.ondataavailable = (e) => chunks.push(e.data);
        recorder.onstop = () => {
            const blob = new Blob(chunks, {type: 'video/webm'});
            const reader = new FileReader();
            reader.readAsDataURL(blob);
            reader.onloadend = () => {
                const base64data = reader.result.split(',')[1];
                google.colab.kernel.invokeFunction('notebook.save_video', [base64data], {});
            };
        };

        recordButton.onclick = () => {
            recorder.start();
            recordButton.style.display = 'none';
            stopButton.style.display = 'inline-block';
        };

        stopButton.onclick = () => {
            recorder.stop();
            stream.getTracks().forEach(track => track.stop());
            div.remove();
        };

        google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);
    }
    ''')
    display(js)
    eval_js('recordVideo()')


def save_video(data, filename='recorded_video.webm'):
    """
    Save base64-encoded video data to a file.
    """
    binary = b64decode(data)
    with open(filename, 'wb') as f:
        f.write(binary)
    print(f"Video saved as {filename}")


def process_recorded_video(video_path, lstm_model_path, output_path='/content/processed_video.avi'):
    """
    Processes the recorded video and performs fall detection, saving the output video.
    """
    # Load LSTM model
    lstm_model = load_model(lstm_model_path)

    # Buffer for storing features for LSTM input
    feature_buffer = deque(maxlen=16)

    # Initialize Mediapipe Pose
    pose_detector = Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

    # Open the recorded video
    cap = cv2.VideoCapture(video_path)

    # Get video properties
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create VideoWriter to save processed video
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process frame with Mediapipe Pose
        results = pose_detector.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = frame.shape
            keypoints = np.array([[lm.x * w, lm.y * h, lm.visibility] for lm in landmarks])

            # Normalize keypoints and append to feature buffer
            normalized_keypoints = keypoints / np.array([w, h, 1])  # Normalize by frame size
            flattened_keypoints = normalized_keypoints.flatten()[:36]  # Truncate to 36 features
            feature_buffer.append(flattened_keypoints)

            # Predict using LSTM if buffer is full
            label = "Not Falling"
            color = (0, 255, 0)  # Green for not falling
            if len(feature_buffer) == feature_buffer.maxlen:
                input_data = np.expand_dims(np.array(feature_buffer), axis=0)
                prediction = lstm_model.predict(input_data)
                if prediction[0][0] > 0.5:
                    label = "Falling"
                    color = (0, 0, 255)  # Red for falling

            # Draw the label on the frame
            cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        # Write the processed frame to the output video
        out.write(frame)

    cap.release()
    out.release()
    pose_detector.close()
    print(f"Processed video saved as {output_path}")


# Define a handler to save the recorded video and pass it to the processing function
def handle_video(data):
    video_filename = "/content/recorded_video.webm"
    processed_video_filename = "/content/processed_video.avi"
    save_video(data, video_filename)

    # Process the video using the fall detection model
    lstm_model_path = "/content/drive/My Drive/Colab Notebooks/dataset/lstm_fall_detection.h5"
    process_recorded_video(video_filename, lstm_model_path, processed_video_filename)

    # List files to ensure processed video exists
    print("Files in Colab storage:", os.listdir('/content/'))

    # Download the processed video to local PC
    files.download(processed_video_filename)


# Bind the save function to the notebook
from google.colab import output
output.register_callback('notebook.save_video', handle_video)

# Workflow
try:
    # Step 1: Record the video
    record_video()

except Exception as err:
    print(f"An error occurred: {err}")


<IPython.core.display.Javascript object>

Video saved as /content/recorded_video.webm


In [None]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model
from collections import deque
from mediapipe.python.solutions.pose import Pose  # Use Mediapipe for pose detection

def calculate_bounding_box(keypoints):
    """Calculate bounding box around the person based on keypoints."""
    valid_keypoints = keypoints[keypoints[:, 2] > 0.5]  # Filter keypoints with confidence > 0.5
    if valid_keypoints.size == 0:
        return None  # No valid keypoints
    x_coords = valid_keypoints[:, 0]
    y_coords = valid_keypoints[:, 1]
    min_x, max_x = int(np.min(x_coords)), int(np.max(x_coords))
    min_y, max_y = int(np.min(y_coords)), int(np.max(y_coords))
    return (min_x, min_y, max_x, max_y)

def is_falling(keypoints):
    """
    Determine if the person is falling based on keypoints.
    Example logic: head (keypoint 0) is below hips (keypoints 11, 12).
    """
    if keypoints[0, 2] > 0.5 and keypoints[11, 2] > 0.5 and keypoints[12, 2] > 0.5:
        head_y = keypoints[0, 1]  # Y-coordinate of head
        hip_y = np.mean([keypoints[11, 1], keypoints[12, 1]])  # Average Y-coordinate of hips
        return head_y > hip_y  # Head below hips indicates falling
    return False

def process_video(video_path, output_path, lstm_model_path):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    out = cv2.VideoWriter(output_path, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))

    # Load LSTM model
    lstm_model = load_model(lstm_model_path)

    # Buffer for storing features for LSTM input
    feature_buffer = deque(maxlen=16)

    # Initialize Mediapipe Pose
    pose_detector = Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process frame with Mediapipe Pose
        results = pose_detector.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = frame.shape
            keypoints = np.array([[lm.x * w, lm.y * h, lm.visibility] for lm in landmarks])

            # Normalize keypoints and append to feature buffer
            normalized_keypoints = keypoints / np.array([w, h, 1])  # Normalize by frame size
            flattened_keypoints = normalized_keypoints.flatten()[:36]  # Truncate to 36 features
            feature_buffer.append(flattened_keypoints)

            # Predict using LSTM if buffer is full
            label = "Not Falling"
            color = (0, 255, 0)  # Green for not falling
            if len(feature_buffer) == feature_buffer.maxlen:
                input_data = np.expand_dims(np.array(feature_buffer), axis=0)
                prediction = lstm_model.predict(input_data)
                if prediction[0][0] > 0.5:
                    if is_falling(keypoints):  # Confirm fall based on pose
                        label = "Falling"
                        color = (0, 0, 255)  # Red for falling

            # Calculate bounding box around the person
            bbox = calculate_bounding_box(keypoints)
            if bbox:
                min_x, min_y, max_x, max_y = bbox

                # Draw bounding box and label
                cv2.rectangle(frame, (min_x, min_y), (max_x, max_y), color, 2)
                cv2.rectangle(frame, (min_x, min_y - 30), (max_x, min_y), color, -1)  # Background for text
                cv2.putText(frame, label, (min_x + 10, min_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        # Write frame to output video
        out.write(frame)

    cap.release()
    out.release()
    pose_detector.close()
    cv2.destroyAllWindows()

# Example usage
process_video(
    video_path="/content/video.mp4",
    output_path="output_video.avi",
    lstm_model_path="/content/drive/My Drive/Colab Notebooks/dataset/lstm_fall_detection.h5"
)


In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode
import os

def record_video(filename='video.webm'):
    js = Javascript('''
    async function recordVideo() {
        const div = document.createElement('div');
        const recordButton = document.createElement('button');
        const stopButton = document.createElement('button');
        const video = document.createElement('video');

        recordButton.textContent = 'Start Recording';
        stopButton.textContent = 'Stop Recording';
        stopButton.style.display = 'none';

        div.appendChild(recordButton);
        div.appendChild(stopButton);
        div.appendChild(video);

        document.body.appendChild(div);

        const stream = await navigator.mediaDevices.getUserMedia({video: true});
        video.srcObject = stream;
        video.style.display = 'block';
        await video.play();

        const recorder = new MediaRecorder(stream);
        const chunks = [];

        recorder.ondataavailable = (e) => chunks.push(e.data);
        recorder.onstop = () => {
            const blob = new Blob(chunks, {type: 'video/webm'});
            const reader = new FileReader();
            reader.readAsDataURL(blob);
            reader.onloadend = () => {
                const base64data = reader.result.split(',')[1];
                google.colab.kernel.invokeFunction('notebook.save_video', [base64data], {});
            };
        };

        recordButton.onclick = () => {
            recorder.start();
            recordButton.style.display = 'none';
            stopButton.style.display = 'inline-block';
        };

        stopButton.onclick = () => {
            recorder.stop();
            stream.getTracks().forEach(track => track.stop());
            div.remove();
        };

        google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);
    }
    ''')
    display(js)
    eval_js('recordVideo()')

def save_video(data, filename='video.webm'):
    """
    Save base64-encoded video data to a file.
    """
    binary = b64decode(data)
    with open(filename, 'wb') as f:
        f.write(binary)
    print(f"Video saved as {filename}")

def convert_to_mp4(input_path='video.webm', output_path='video.mp4'):
    """
    Convert a webm video to mp4 using ffmpeg.
    """
    command = f"ffmpeg -i {input_path} -c:v libx264 -preset fast -crf 22 -c:a aac -strict experimental {output_path}"
    os.system(command)
    print(f"Video converted to {output_path}")

# Define a handler to save the recorded video and convert it to mp4
def handle_video(data):
    webm_filename = "video.webm"
    mp4_filename = "video.mp4"

    # Save the video
    save_video(data, webm_filename)

    # Convert to mp4 format
    convert_to_mp4(webm_filename, mp4_filename)

    # List files for confirmation
    print("Files in Colab storage:", os.listdir('/content'))

# Bind the save function to the notebook
from google.colab import output
output.register_callback('notebook.save_video', handle_video)

# Workflow
try:
    record_video()
except Exception as err:
    print(str(err))


<IPython.core.display.Javascript object>

Video saved as video.webm
Video converted to video.mp4
Files in Colab storage: ['.config', 'video.mp4', 'drive', 'video.webm', 'sample_data']


In [None]:
def evaluate_model(model, X_test, y_test):
    """
    Evaluate the trained model on test data.
    Args:
        model: Trained model.
        X_test: Test features.
        y_test: Test labels.
    """
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Split data for testing (example using 80-20 split)
split_idx = int(0.8 * len(X_seq))
X_train, X_test = X_seq[:split_idx], X_seq[split_idx:]
y_train, y_test = y_seq[:split_idx], y_seq[split_idx:]

# Evaluate the model
evaluate_model(lstm_model, X_test, y_test)

[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 1.0000 - loss: 1.1921e-07
Test Accuracy: 100.00%
