# Experiment Notebook

This notebook documents **experimental work** conducted during the development of the **Elderly Monitoring AI System**.

⚠️ **Important**  
- This notebook is for **research and validation only**.  
- It is **NOT part of the final runtime pipeline**.  
- The production-ready logic used by the system is implemented in `/src/layers/fall_detection`.


# Fall Detection Experiment (MediaPipe Pose)

This notebook explores a **vision-based fall detection approach** using **MediaPipe Pose Landmarker (Live Stream mode)**.

The experiment focuses on:
- body orientation
- vertical motion velocity
- temporal confirmation

Insights from this experiment informed the simplified and safer fall detection logic used in the final system.


## Purpose

- Detect potential falls from live video input
- Analyze pose orientation and motion patterns
- Evaluate temporal stability before raising fall alerts


## Environment

- Python 3.10
- OpenCV
- MediaPipe (Pose Landmarker Tasks API)
- NumPy

See `requirements-experiments.txt` for the full dependency list.


## Model Setup

This experiment uses the **MediaPipe Pose Landmarker** in **LIVE_STREAM** mode.

⚠️ The required model file must be placed locally:

```
pose_landmarker_full.task
```


In [None]:
import cv2
import mediapipe as mp
import time
import math
import numpy as np
from collections import deque
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

print("✅ Libraries imported")

In [None]:
# =========================
# MODEL SETUP
# =========================

base_options = python.BaseOptions(model_asset_path="pose_landmarker_full.task")
current_result = None

def result_callback(result, output_image, timestamp):
    global current_result
    current_result = result

options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=vision.RunningMode.LIVE_STREAM,
    num_poses=1,
    min_pose_detection_confidence=0.5,
    result_callback=result_callback
)

landmarker = vision.PoseLandmarker.create_from_options(options)
print("✅ Pose landmarker initialized")

In [None]:
# =========================
# VIDEO SETUP
# =========================

cap = cv2.VideoCapture(0)
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

output_path = "clean_fall_monitor.mp4"
out = cv2.VideoWriter(
    output_path,
    cv2.VideoWriter_fourcc(*'mp4v'),
    fps,
    (frame_width, frame_height)
)

print("✅ Video stream initialized")

In [None]:
# =========================
# FALL DETECTION LOGIC VARIABLES
# =========================

frame_count = 0
center_history = deque(maxlen=15)
angle_history = deque(maxlen=10)
fall_state = "NORMAL"
fall_start_time = None


In [None]:
def draw_styled_overlay(img, status):
    """Draws a clean status overlay."""
    overlay = img.copy()
    color = (80, 175, 76) if status == "NORMAL" else (50, 50, 200)
    label = "SYSTEM: NORMAL" if status == "NORMAL" else "ALERT: FALL DETECTED"

    cv2.rectangle(overlay, (0, 0), (img.shape[1], 50), color, -1)
    cv2.addWeighted(overlay, 0.6, img, 0.4, 0, img)

    font = cv2.FONT_HERSHEY_DUPLEX
    text_size = cv2.getTextSize(label, font, 0.7, 1)[0]
    text_x = (img.shape[1] - text_size[0]) // 2
    cv2.putText(img, label, (text_x, 32), font, 0.7, (255, 255, 255), 1, cv2.LINE_AA)


In [None]:
# =========================
# MAIN LOOP
# =========================

print("Starting fall detection experiment — press ESC to exit")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    timestamp_ms = int(frame_count * (1000 / fps))
    frame_count += 1

    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    landmarker.detect_async(mp_image, timestamp_ms)
    current_time = time.time()

    if current_result and current_result.pose_landmarks:
        landmarks = current_result.pose_landmarks[0]

        mid_hip_y = (landmarks[23].y + landmarks[24].y) / 2
        center_history.append((mid_hip_y * frame_height, timestamp_ms))

        dx = ((landmarks[23].x + landmarks[24].x)/2) - ((landmarks[11].x + landmarks[12].x)/2)
        dy = ((landmarks[23].y + landmarks[24].y)/2) - ((landmarks[11].y + landmarks[12].y)/2)
        avg_angle = abs(90 - abs(math.degrees(math.atan2(dy, dx))))

        velocity = 0
        if len(center_history) >= 2:
            (y2, t2), (y1, t1) = center_history[-1], center_history[-2]
            dt = max((t2 - t1) / 1000.0, 1e-6)
            velocity = (y2 - y1) / dt

        if fall_state == "NORMAL" and velocity > 400 and avg_angle > 45:
            fall_state = "FALLING"
            fall_start_time = current_time
        elif fall_state == "FALLING" and (current_time - fall_start_time > 0.3):
            fall_state = "FALL"
        elif fall_state == "FALL" and avg_angle < 30:
            fall_state = "NORMAL"

    draw_styled_overlay(frame, fall_state)
    out.write(frame)
    cv2.imshow("Monitoring", frame)

    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
out.release()
cv2.destroyAllWindows()
landmarker.close()
print("Experiment finished")

## Observations & Notes

- Rapid vertical motion combined with body orientation change is a strong fall indicator.
- Temporal confirmation reduces false positives.
- Lighting and camera angle affect stability.
- This experiment motivated the simpler, frame-based fall detection logic used in the final system.


In [None]:
# Verify environment
print(f"OpenCV Version: {cv2.__version__}")
print(f"MediaPipe Version: {mp.__version__}")

model_file = "pose_landmarker_full.task"
if os.path.exists(model_file):
    print(f"✅ Model file '{model_file}' found.")
else:
    print(f"❌ Missing '{model_file}'. Download it from MediaPipe models page.")