In [1]:
import numpy as np
import cv2
import torch
from torchvision import transforms
from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from models.yolo import Model
import math
import time

# Add the custom class to the safe globals list
torch.serialization.add_safe_globals([Model])

# Initialize device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load YOLOv7-pose model
weights = torch.load('yolov7-w6-pose.pt', map_location=device, weights_only=False)
model = weights['model']
_ = model.float().eval()

if torch.cuda.is_available():
    model.half().to(device)

# Constants from the journal
ALPHA = 0.5  # Adjustment factor for height threshold
SPEED_THRESHOLD = 0.5  # Vertical speed threshold for fall detection (pixels/frame)
ANGLE_THRESHOLD = 45  # Degrees threshold between torso and legs
TARGET_FPS = 25  # Target frames per second for processing

# Previous frame data for speed calculation
prev_shoulder_y = None
frame_count = 0

def calculate_length_factor(shoulder, torso):
    return np.sqrt((shoulder[0] - torso[0])**2 + (shoulder[1] - torso[1])**2)

def calculate_angle(a, b, c):
    """Calculate angle between points a, b, c in degrees"""
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)
    
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(cosine_angle)
    return np.degrees(angle)

def detect_fall(keypoints, threshold=0.5):
    global prev_shoulder_y, frame_count
    
    LEFT_SHOULDER, RIGHT_SHOULDER, LEFT_HIP, RIGHT_HIP, LEFT_KNEE, RIGHT_KNEE, LEFT_ANKLE, RIGHT_ANKLE = 5, 6, 11, 12, 13, 14, 15, 16
    
    keypoints_conf = [keypoints[i * 3: (i + 1) * 3] for i in [LEFT_SHOULDER, RIGHT_SHOULDER, LEFT_HIP, RIGHT_HIP, LEFT_KNEE, RIGHT_KNEE, LEFT_ANKLE, RIGHT_ANKLE]]
    
    # Check if keypoints are detected with sufficient confidence
    if any(kp[2] < threshold for kp in keypoints_conf[:4] + keypoints_conf[6:8]):
        return False
    
    # Current shoulder y position (average of left and right)
    current_shoulder_y = (keypoints_conf[0][1] + keypoints_conf[1][1]) / 2
    
    # Calculate vertical speed if we have previous data
    vertical_speed = 0
    if prev_shoulder_y is not None and frame_count > 0:
        vertical_speed = abs(current_shoulder_y - prev_shoulder_y) / frame_count
    
    prev_shoulder_y = current_shoulder_y
    frame_count = 0 
    
    # Calculate length factor (from shoulder to hip)
    length_factor = calculate_length_factor(keypoints_conf[0][:2], keypoints_conf[2][:2])
    
    # Check shoulder height relative to feet (with length factor adjustment)
    shoulder_low = (keypoints_conf[0][1] <= keypoints_conf[6][1] + ALPHA * length_factor and
                    keypoints_conf[1][1] <= keypoints_conf[7][1] + ALPHA * length_factor)
    
    # Calculate body dimensions
    body_height = abs(keypoints_conf[0][1] - keypoints_conf[6][1])
    body_width = abs(keypoints_conf[0][0] - keypoints_conf[1][0])
    horizontal_position = body_height < body_width
    
    # Calculate angle between hip, knee and ankle (for left and right legs)
    left_leg_angle = calculate_angle(keypoints_conf[2][:2], keypoints_conf[4][:2], keypoints_conf[6][:2])
    right_leg_angle = calculate_angle(keypoints_conf[3][:2], keypoints_conf[5][:2], keypoints_conf[7][:2])
    leg_angle = min(left_leg_angle, right_leg_angle)
    
    # Fall conditions from journal:
    # 1. Shoulders low relative to feet (tapi ini berdasarkan bahu kanan dan kiri kalau lebih rendah dari vertikal dari kaki kanan dan kaki kiri)
    # 2. Body in horizontal position (perbedaan lebar dan tinggi badan, algoritmanya kalkulasi kalau si height 
    # ini menjadi lebih kecil dibandingkan dengan width, ini menandakan kalau orangnya tuh jatuh)
    # 3. Either rapid movement (speed) or acute angle between torso and legs
    if shoulder_low and horizontal_position:
        if vertical_speed > SPEED_THRESHOLD or leg_angle < ANGLE_THRESHOLD:
            return True
    
    return False

# Path to video file
video_path = "C:\\Users\\LENOVO\\Documents\\A Skripsi\\datasets\\FallDataset\\Dataset\\Coffee_room_01\\Videos\\video (20).avi"
cap = cv2.VideoCapture(video_path)

# Get video properties
original_fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Calculate skip frames to achieve 25 FPS
if original_fps > 0:
    skip_frames = max(1, int(round(original_fps / TARGET_FPS)))
else:
    skip_frames = 1  # Default if FPS info not available

print(f"Original FPS: {original_fps}, Processing at {TARGET_FPS} FPS, skipping {skip_frames-1} frames between processed frames")

frame_counter = 0
start_time = time.time()
processed_frames = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_counter += 1
    
    # Skip frames to achieve target FPS
    if frame_counter % skip_frames != 0:
        continue
    
    processed_frames += 1
    frame_count += 1
    
    image = letterbox(frame, 640, stride=64, auto=True)[0]
    image_ = image.copy()
    image = transforms.ToTensor()(image)
    image = torch.tensor(np.array([image.numpy()]))

    if torch.cuda.is_available():
        image = image.half().to(device)

    with torch.no_grad():
        output, _ = model(image)
        output = non_max_suppression_kpt(output, 0.25, 0.65, nc=model.yaml['nc'], nkpt=model.yaml['nkpt'], kpt_label=True)
        output = output_to_keypoint(output)

    nimg = image[0].permute(1, 2, 0) * 255
    nimg = nimg.cpu().numpy().astype(np.uint8)
    nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)

    for idx in range(output.shape[0]):
        keypoints = output[idx, 7:].T
        plot_skeleton_kpts(nimg, keypoints, 3)

        if detect_fall(keypoints):
            cv2.putText(nimg, "Fall Detected!", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    # Calculate and display actual processing FPS
    elapsed_time = time.time() - start_time
    current_fps = processed_frames / elapsed_time if elapsed_time > 0 else 0
    cv2.putText(nimg, f"FPS: {current_fps:.1f}", (50, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    
    cv2.imshow("Fall Detection", nimg)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    if torch.cuda.is_available():
        torch.cuda.empty_cache()

# Calculate final processing statistics
total_time = time.time() - start_time
actual_fps = processed_frames / total_time if total_time > 0 else 0
print(f"Processing complete. Actual FPS: {actual_fps:.2f}")

cap.release()
cv2.destroyAllWindows()

Original FPS: 25.0, Processing at 25 FPS, skipping 0 frames between processed frames


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Processing complete. Actual FPS: 19.24


In [3]:
import numpy as np
import cv2
import torch
from torchvision import transforms
from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from models.yolo import Model
import math
import time

# Add the custom class to the safe globals list
torch.serialization.add_safe_globals([Model])

# Initialize device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load YOLOv7-pose model
weights = torch.load('yolov7-w6-pose.pt', map_location=device, weights_only=False)
model = weights['model']
_ = model.float().eval()

if torch.cuda.is_available():
    model.half().to(device)

# Constants from the journal
ALPHA = 0.5  # Adjustment factor for height threshold
SPEED_THRESHOLD = 0.5  # Vertical speed threshold for fall detection (pixels/frame)
ANGLE_THRESHOLD = 45  # Degrees threshold between torso and legs
TARGET_FPS = 25  # Target frames per second for processing

# Previous frame data for speed calculation
prev_shoulder_y = None
frame_count = 0

def calculate_length_factor(shoulder, torso):
    return np.sqrt((shoulder[0] - torso[0])**2 + (shoulder[1] - torso[1])**2)

def calculate_angle(a, b, c):
    # Calculate angle between points a, b, c in degrees
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)
    
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(cosine_angle)
    return np.degrees(angle)

def detect_fall(keypoints, threshold=0.5):
    global prev_shoulder_y, prev_frame_time, fall_start_time
    
    # Keypoint indices
    NOSE = 0
    LEFT_SHOULDER, RIGHT_SHOULDER = 5, 6
    LEFT_HIP, RIGHT_HIP = 11, 12
    LEFT_ANKLE, RIGHT_ANKLE = 15, 16
    LEFT_KNEE, RIGHT_KNEE = 13, 14

    # Initialize default return values
    is_fall = False
    current_state = "normal"
    conditions = []
    
    try:
        # Extract keypoints with confidence check
        kp = {}
        for name, idx in [('nose', NOSE), ('left_shoulder', LEFT_SHOULDER),
                         ('right_shoulder', RIGHT_SHOULDER), ('left_hip', LEFT_HIP),
                         ('right_hip', RIGHT_HIP), ('left_knee', LEFT_KNEE),
                         ('right_knee', RIGHT_KNEE), ('left_ankle', LEFT_ANKLE),
                         ('right_ankle', RIGHT_ANKLE)]:
            kp[name] = keypoints[idx*3:(idx+1)*3]
            if kp[name][2] < threshold:
                return False, "low_confidence", ["Low confidence in keypoints"]

        # 1. Calculate critical distances
        torso_length = math.sqrt((kp['left_shoulder'][0]-kp['left_hip'][0])**2 + 
                              (kp['left_shoulder'][1]-kp['left_hip'][1])**2)
        shoulder_height = (kp['left_shoulder'][1] + kp['right_shoulder'][1]) / 2
        feet_height = (kp['left_ankle'][1] + kp['right_ankle'][1]) / 2
        head_height = kp['nose'][1]

        # 2. Normalized height ratios
        head_to_feet = abs(head_height - feet_height)
        shoulder_to_feet = abs(shoulder_height - feet_height)
        normalized_ratio = shoulder_to_feet / (head_to_feet + 1e-5)

        # 3. Vertical speed calculation
        current_time = time.time()
        vertical_speed = 0
        if prev_shoulder_y is not None and prev_frame_time is not None:
            time_elapsed = current_time - prev_frame_time
            if time_elapsed > 0:
                vertical_speed = (shoulder_height - prev_shoulder_y) / time_elapsed
        
        # 4. Body orientation
        body_width = abs(kp['left_shoulder'][0] - kp['right_shoulder'][0])
        body_height = head_to_feet
        orientation_ratio = body_width / (body_height + 1e-5)

        # 5. Leg angles
        def calculate_angle(a, b, c):
            ba = np.array(a[:2]) - np.array(b[:2])
            bc = np.array(c[:2]) - np.array(b[:2])
            cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
            return np.degrees(np.arccos(np.clip(cosine_angle, -1, 1)))

        left_leg_angle = calculate_angle(kp['left_hip'], kp['left_knee'], kp['left_ankle'])
        right_leg_angle = calculate_angle(kp['right_hip'], kp['right_knee'], kp['right_ankle'])
        min_leg_angle = min(left_leg_angle, right_leg_angle)

        # Fall Detection Conditions
        conditions = []
        
        # Shoulders near feet
        shoulder_foot_threshold = 0.8 * torso_length
        if shoulder_height > feet_height - shoulder_foot_threshold:
            conditions.append("shoulders_near_feet")
        
        # Rapid downward movement
        if vertical_speed > SPEED_THRESHOLD:
            conditions.append(f"rapid_downward_{vertical_speed:.1f}px/s")
        
        # Body orientation
        if orientation_ratio > 1.2:
            conditions.append("horizontal_posture")
        
        # Leg collapse
        if min_leg_angle < ANGLE_THRESHOLD:
            conditions.append(f"legs_collapsed_{min_leg_angle:.0f}deg")

        # Fall state determination
        if "shoulders_near_feet" in conditions:
            if any("rapid_downward" in cond for cond in conditions):
                current_state = "falling"
                fall_start_time = current_time
            elif "horizontal_posture" in conditions and current_time - fall_start_time < 1.0:
                current_state = "fallen"
        
        # Final decision
        if current_state == "fallen":
            is_fall = True
        elif len(conditions) >= 2 and "shoulders_near_feet" in conditions:
            is_fall = True
        
        # Update tracking variables
        prev_shoulder_y = shoulder_height
        prev_frame_time = current_time
        
    except Exception as e:
        conditions = [f"Error: {str(e)}"]
        return False, "error", conditions
    
    return is_fall, current_state, conditions

# Path to video file
video_path = "C:\\Users\\LENOVO\\Documents\\A Skripsi\\datasets\\FallDataset\\Dataset\\Coffee_room_01\\Videos\\video (1).avi"
cap = cv2.VideoCapture(video_path)

# Get video properties
original_fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Calculate skip frames to achieve 25 FPS
if original_fps > 0:
    skip_frames = max(1, int(round(original_fps / TARGET_FPS)))
else:
    skip_frames = 1  # Default if FPS info not available

print(f"Original FPS: {original_fps}, Processing at {TARGET_FPS} FPS, skipping {skip_frames-1} frames between processed frames")

# Create video writer to maintain 25 FPS output
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('output.avi', fourcc, TARGET_FPS, (640, 640))

frame_counter = 0
start_time = time.time()
processed_frames = 0
last_frame_time = time.time()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_counter += 1
    
    # Skip frames to achieve target FPS
    if frame_counter % skip_frames != 0:
        continue
    
    # Calculate time to sleep to maintain 25 FPS
    current_time = time.time()
    elapsed = current_time - last_frame_time
    sleep_time = max(0, (1.0/TARGET_FPS) - elapsed)
    time.sleep(sleep_time)
    last_frame_time = time.time()
    
    processed_frames += 1
    frame_count += 1
    
    image = letterbox(frame, 640, stride=64, auto=True)[0]
    image_ = image.copy()
    image = transforms.ToTensor()(image)
    image = torch.tensor(np.array([image.numpy()]))

    if torch.cuda.is_available():
        image = image.half().to(device)

    with torch.no_grad():
        output, _ = model(image)
        output = non_max_suppression_kpt(output, 0.25, 0.65, nc=model.yaml['nc'], nkpt=model.yaml['nkpt'], kpt_label=True)
        output = output_to_keypoint(output)

    nimg = image[0].permute(1, 2, 0) * 255
    nimg = nimg.cpu().numpy().astype(np.uint8)
    nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)

    for idx in range(output.shape[0]):
        keypoints = output[idx, 7:].T
        plot_skeleton_kpts(nimg, keypoints, 3)
    
        # Updated to handle 3 return values
        fall_detected, state, conditions = detect_fall(keypoints)
        
        if fall_detected:
            cv2.putText(nimg, f"FALL: {state}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            # Display all triggered conditions
            for i, condition in enumerate(conditions):
                cv2.putText(nimg, condition, (50, 90 + i*30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        else:
            cv2.putText(nimg, f"State: {state}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            if conditions:  # Show conditions even when no fall
                cv2.putText(nimg, f"Conditions: {', '.join(conditions)}", (50, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
    
    # Calculate and display actual processing FPS
    elapsed_time = time.time() - start_time
    current_fps = processed_frames / elapsed_time if elapsed_time > 0 else 0
    cv2.putText(nimg, f"Processing FPS: {current_fps:.1f}", (50, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    
    # Write frame to output video
    out.write(nimg)
    
    cv2.imshow("Fall Detection", nimg)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    if torch.cuda.is_available():
        torch.cuda.empty_cache()

# Calculate final processing statistics
total_time = time.time() - start_time
actual_fps = processed_frames / total_time if total_time > 0 else 0
print(f"Processing complete. Actual FPS: {actual_fps:.2f}")

cap.release()
out.release()
cv2.destroyAllWindows()

Original FPS: 25.0, Processing at 25 FPS, skipping 0 frames between processed frames
Processing complete. Actual FPS: 20.65


In [9]:
import os
import numpy as np
import cv2
import torch
from torchvision import transforms
from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from models.yolo import Model
import math
import time
from collections import defaultdict

# Initialize device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load YOLOv7-pose model
weights = torch.load('yolov7-w6-pose.pt', map_location=device, weights_only=False)
model = weights['model'].float().eval().to(device)
if torch.cuda.is_available():
    model = model.half()

# Constants
SPEED_THRESHOLD = 0.5
ANGLE_THRESHOLD = 45
TARGET_FPS = 25

# ====== FIXED PATHS FOR YOUR DATASET ======
BASE_PATH = r"C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap"
LABELS_PATH = os.path.join(BASE_PATH, "labels")
VIDEOS_PATH = os.path.join(BASE_PATH, "video")

# ================== Evaluation Metrics ==================
class FallDetectionMetrics:
    def __init__(self):
        self.reset()
    
    def reset(self):
        self.true_positives = 0
        self.false_positives = 0
        self.true_negatives = 0
        self.false_negatives = 0
        self.frame_results = []
    
    def update(self, predicted, actual, frame_info=None):
        if predicted and actual:
            self.true_positives += 1
        elif predicted and not actual:
            self.false_positives += 1
        elif not predicted and not actual:
            self.true_negatives += 1
        elif not predicted and actual:
            self.false_negatives += 1
        
        if frame_info:
            self.frame_results.append(frame_info)
    
    def calculate_metrics(self):
        precision = self.true_positives / (self.true_positives + self.false_positives + 1e-9)
        recall = self.true_positives / (self.true_positives + self.false_negatives + 1e-9)
        f1 = 2 * (precision * recall) / (precision + recall + 1e-9)
        accuracy = (self.true_positives + self.true_negatives) / (self.true_positives + self.false_positives + self.true_negatives + self.false_negatives + 1e-9)
        
        return {
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'accuracy': accuracy,
            'true_positives': self.true_positives,
            'false_positives': self.false_positives,
            'true_negatives': self.true_negatives,
            'false_negatives': self.false_negatives
        }
    
    def save_results(self, output_dir="results"):
        os.makedirs(output_dir, exist_ok=True)
        
        with open(os.path.join(output_dir, "metrics.txt"), "w") as f:
            metrics = self.calculate_metrics()
            for k, v in metrics.items():
                f.write(f"{k}: {v:.4f}\n")
        
        with open(os.path.join(output_dir, "frame_results.csv"), "w") as f:
            f.write("video,frame,predicted,actual,state,conditions\n")
            for res in self.frame_results:
                f.write(f"{res['video']},{res['frame_num']},{res['predicted']},{res['actual']},{res['state']},\"{'|'.join(res['conditions'])}\"\n")

# ================== Annotation Loader ==================
def load_annotations(annotation_dir):
    annotations = {}
    print(f"Loading annotations from: {annotation_dir}")
    
    try:
        for ann_file in os.listdir(annotation_dir):
            if ann_file.endswith(".txt"):
                video_name = os.path.splitext(ann_file)[0]
                annotation_path = os.path.join(annotation_dir, ann_file)
                
                with open(annotation_path, "r") as f:
                    lines = f.readlines()
                    if len(lines) >= 2:
                        start = int(lines[0].strip())
                        end = int(lines[1].strip())
                        annotations[video_name] = (start, end) if (start != 0 or end != 0) else None
        print(f"Successfully loaded {len(annotations)} annotations")
    except Exception as e:
        print(f"Error loading annotations: {str(e)}")
    
    return annotations

# ================== Fall Detection Logic ==================
def detect_fall(keypoints, threshold=0.5):
    NOSE = 0
    L_SHOULDER, R_SHOULDER = 5, 6
    L_HIP, R_HIP = 11, 12
    L_ANKLE, R_ANKLE = 15, 16
    
    try:
        kp = {
            'nose': keypoints[NOSE*3:(NOSE+1)*3],
            'l_shoulder': keypoints[L_SHOULDER*3:(L_SHOULDER+1)*3],
            'r_shoulder': keypoints[R_SHOULDER*3:(R_SHOULDER+1)*3],
            'l_hip': keypoints[L_HIP*3:(L_HIP+1)*3],
            'r_hip': keypoints[R_HIP*3:(R_HIP+1)*3],
            'l_ankle': keypoints[L_ANKLE*3:(L_ANKLE+1)*3],
            'r_ankle': keypoints[R_ANKLE*3:(R_ANKLE+1)*3]
        }
        
        for k, v in kp.items():
            if v[2] < threshold:
                return False, "low_confidence", ["Low confidence"]
        
        shoulder_y = (kp['l_shoulder'][1] + kp['r_shoulder'][1]) / 2
        feet_y = (kp['l_ankle'][1] + kp['r_ankle'][1]) / 2
        torso_length = np.sqrt((kp['l_shoulder'][0]-kp['l_hip'][0])**2 + (kp['l_shoulder'][1]-kp['l_hip'][1])**2)
        
        conditions = []
        if shoulder_y > feet_y - 0.8 * torso_length:
            conditions.append("shoulders_near_feet")
        
        body_width = abs(kp['l_shoulder'][0] - kp['r_shoulder'][0])
        body_height = abs(kp['nose'][1] - feet_y)
        if body_width / (body_height + 1e-5) > 1.2:
            conditions.append("horizontal_posture")
        
        if len(conditions) >= 2:
            return True, "fallen", conditions
        return False, "normal", conditions
    
    except Exception as e:
        return False, "error", [f"Error: {str(e)}"]

# ================== Video Processing ==================
def process_video(video_path, annotations, metrics):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    fall_range = annotations.get(video_name)
    
    print(f"\nProcessing video: {video_name}")
    print(f"Fall frames in ground truth: {fall_range if fall_range else 'No fall'}")
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening {video_path}")
        return
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        
        # Process frame
        img = letterbox(frame, 640, stride=64, auto=True)[0]
        img = transforms.ToTensor()(img)
        img = img.unsqueeze(0).to(device)
        if torch.cuda.is_available():
            img = img.half()
        
        with torch.no_grad():
            output, _ = model(img)
            output = non_max_suppression_kpt(output, 0.25, 0.65, nc=model.yaml['nc'], nkpt=model.yaml['nkpt'], kpt_label=True)
            output = output_to_keypoint(output)
        
        # Visualize
        nimg = img[0].permute(1, 2, 0).cpu().numpy() * 255
        nimg = nimg.astype(np.uint8)
        nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)
        
        for idx in range(output.shape[0]):
            kpts = output[idx, 7:].T
            plot_skeleton_kpts(nimg, kpts, 3)
            
            # Detect fall
            fall_detected, state, conditions = detect_fall(kpts)
            actual_fall = fall_range is not None and fall_range[0] <= frame_count <= fall_range[1]
            
            # Update metrics
            metrics.update(
                fall_detected, 
                actual_fall,
                {
                    'video': video_name,
                    'frame_num': frame_count,
                    'predicted': fall_detected,
                    'actual': actual_fall,
                    'state': state,
                    'conditions': conditions
                }
            )
            
            # Display info
            color = (0, 255, 0) if fall_detected == actual_fall else (0, 0, 255)
            status = "CORRECT" if fall_detected == actual_fall else "WRONG"
            cv2.putText(nimg, f"Frame: {frame_count} | State: {state} | {status}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            if fall_range is None:
                gt_text = "GT: No fall"
            else:
                gt_text = f"GT Fall: {fall_range[0]}-{fall_range[1]}"
            cv2.putText(nimg, gt_text, (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
        cv2.imshow("Fall Detection", nimg)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# ================== Main Execution ==================
if __name__ == "__main__":
    # Load annotations
    annotations = load_annotations(LABELS_PATH)
    
    # Initialize metrics
    metrics = FallDetectionMetrics()
    
    # Process all videos in the test set
    test_video_dir = os.path.join(VIDEOS_PATH, "test")
    if not os.path.exists(test_video_dir):
        test_video_dir = VIDEOS_PATH  # Fallback if no test subfolder exists
    
    for video_file in os.listdir(test_video_dir):
        if video_file.endswith(".avi"):
            video_path = os.path.join(test_video_dir, video_file)
            process_video(video_path, annotations, metrics)
    
    # Save results
    metrics.save_results()
    print("\nEvaluation Complete!")
    print("Metrics saved to 'results/' directory")
    
    # Print summary
    results = metrics.calculate_metrics()
    print("\n=== Final Metrics ===")
    print(f"Precision: {results['precision']:.4f}")
    print(f"Recall: {results['recall']:.4f}")
    print(f"F1 Score: {results['f1_score']:.4f}")
    print(f"Accuracy: {results['accuracy']:.4f}")

Loading annotations from: C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap\labels
Successfully loaded 0 annotations

Processing video: video (43)
Fall frames in ground truth: No fall

Processing video: video (44)
Fall frames in ground truth: No fall

Processing video: video (45)
Fall frames in ground truth: No fall

Processing video: video (46)
Fall frames in ground truth: No fall

Processing video: video (47)
Fall frames in ground truth: No fall

Processing video: video (48)
Fall frames in ground truth: No fall

Processing video: video (49)
Fall frames in ground truth: No fall

Processing video: video (50)
Fall frames in ground truth: No fall

Processing video: video (51)
Fall frames in ground truth: No fall

Evaluation Complete!
Metrics saved to 'results/' directory

=== Final Metrics ===
Precision: 0.0000
Recall: 0.0000
F1 Score: 0.0000
Accuracy: 1.0000


In [8]:
import os
import numpy as np
import cv2
import torch
from torchvision import transforms
from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from models.yolo import Model
import math
import time
from collections import defaultdict

# Initialize device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load YOLOv7-pose model
weights = torch.load('yolov7-w6-pose.pt', map_location=device, weights_only=False)
model = weights['model'].float().eval().to(device)
if torch.cuda.is_available():
    model = model.half()

# Constants
SPEED_THRESHOLD = 0.5  # Vertical speed threshold (pixels/frame)
ANGLE_THRESHOLD = 45    # Degrees threshold between torso and legs
TARGET_FPS = 25         # Target processing FPS

# Dataset Paths (Update these!)
LABELS_PATH = r"C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap\labels\test"
VIDEOS_PATH = r"C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap\video\test"

# ================== Evaluation Metrics Class ==================
class FallDetectionMetrics:
    def __init__(self):
        self.reset()
    
    def reset(self):
        self.true_positives = 0
        self.false_positives = 0
        self.true_negatives = 0
        self.false_negatives = 0
        self.frame_results = []
    
    def update(self, predicted, actual, frame_info=None):
        if predicted and actual:
            self.true_positives += 1
        elif predicted and not actual:
            self.false_positives += 1
        elif not predicted and not actual:
            self.true_negatives += 1
        elif not predicted and actual:
            self.false_negatives += 1
        
        if frame_info:
            self.frame_results.append(frame_info)
    
    def calculate_metrics(self):
        precision = self.true_positives / (self.true_positives + self.false_positives + 1e-9)
        recall = self.true_positives / (self.true_positives + self.false_negatives + 1e-9)
        f1 = 2 * (precision * recall) / (precision + recall + 1e-9)
        accuracy = (self.true_positives + self.true_negatives) / (self.true_positives + self.false_positives + self.true_negatives + self.false_negatives + 1e-9)
        
        return {
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'accuracy': accuracy,
            'true_positives': self.true_positives,
            'false_positives': self.false_positives,
            'true_negatives': self.true_negatives,
            'false_negatives': self.false_negatives
        }
    
    def save_results(self, output_dir="results"):
        os.makedirs(output_dir, exist_ok=True)
        
        # Save metrics
        with open(os.path.join(output_dir, "metrics.txt"), "w") as f:
            metrics = self.calculate_metrics()
            for k, v in metrics.items():
                f.write(f"{k}: {v:.4f}\n")
        
        # Save frame-level results
        with open(os.path.join(output_dir, "frame_results.csv"), "w") as f:
            f.write("video,frame,predicted,actual,state,conditions\n")
            for res in self.frame_results:
                f.write(f"{res['video']},{res['frame_num']},{res['predicted']},{res['actual']},{res['state']},\"{'|'.join(res['conditions'])}\"\n")

# ================== Annotation Loader ==================
def load_annotations(annotation_dir):
    annotations = {}
    for ann_file in os.listdir(annotation_dir):
        if ann_file.endswith(".txt"):
            video_name = os.path.splitext(ann_file)[0]
            with open(os.path.join(annotation_dir, ann_file), "r") as f:
                lines = f.readlines()
                if len(lines) >= 2:
                    start = int(lines[0].strip())
                    end = int(lines[1].strip())
                    annotations[video_name] = (start, end) if (start != 0 or end != 0) else None
    return annotations

# ================== Fall Detection Logic ==================
def detect_fall(keypoints, threshold=0.5):
    # Keypoint indices
    NOSE = 0
    L_SHOULDER, R_SHOULDER = 5, 6
    L_HIP, R_HIP = 11, 12
    L_ANKLE, R_ANKLE = 15, 16
    
    try:
        # Extract keypoints
        kp = {
            'nose': keypoints[NOSE*3:(NOSE+1)*3],
            'l_shoulder': keypoints[L_SHOULDER*3:(L_SHOULDER+1)*3],
            'r_shoulder': keypoints[R_SHOULDER*3:(R_SHOULDER+1)*3],
            'l_hip': keypoints[L_HIP*3:(L_HIP+1)*3],
            'r_hip': keypoints[R_HIP*3:(R_HIP+1)*3],
            'l_ankle': keypoints[L_ANKLE*3:(L_ANKLE+1)*3],
            'r_ankle': keypoints[R_ANKLE*3:(R_ANKLE+1)*3]
        }
        
        # Check confidence
        for k, v in kp.items():
            if v[2] < threshold:
                return False, "low_confidence", ["Low confidence"]
        
        # Calculate metrics
        shoulder_y = (kp['l_shoulder'][1] + kp['r_shoulder'][1]) / 2
        feet_y = (kp['l_ankle'][1] + kp['r_ankle'][1]) / 2
        torso_length = np.sqrt((kp['l_shoulder'][0]-kp['l_hip'][0])**2 + (kp['l_shoulder'][1]-kp['l_hip'][1])**2)
        
        # Conditions
        conditions = []
        if shoulder_y > feet_y - 0.8 * torso_length:
            conditions.append("shoulders_near_feet")
        
        body_width = abs(kp['l_shoulder'][0] - kp['r_shoulder'][0])
        body_height = abs(kp['nose'][1] - feet_y)
        if body_width / (body_height + 1e-5) > 1.2:
            conditions.append("horizontal_posture")
        
        # Determine fall state
        if len(conditions) >= 2:
            return True, "fallen", conditions
        return False, "normal", conditions
    
    except Exception as e:
        return False, "error", [f"Error: {str(e)}"]

# ================== Video Processing ==================
def process_video(video_path, annotations, metrics):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    fall_range = annotations.get(video_name)
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening {video_path}")
        return
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        
        # Process frame
        img = letterbox(frame, 640, stride=64, auto=True)[0]
        img = transforms.ToTensor()(img)
        img = img.unsqueeze(0).to(device)
        if torch.cuda.is_available():
            img = img.half()
        
        with torch.no_grad():
            output, _ = model(img)
            output = non_max_suppression_kpt(output, 0.25, 0.65, nc=model.yaml['nc'], nkpt=model.yaml['nkpt'], kpt_label=True)
            output = output_to_keypoint(output)
        
        # Visualize
        nimg = img[0].permute(1, 2, 0).cpu().numpy() * 255
        nimg = nimg.astype(np.uint8)
        nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)
        
        for idx in range(output.shape[0]):
            kpts = output[idx, 7:].T
            plot_skeleton_kpts(nimg, kpts, 3)
            
            # Detect fall
            fall_detected, state, conditions = detect_fall(kpts)
            
            # Compare with ground truth
            actual_fall = fall_range is not None and fall_range[0] <= frame_count <= fall_range[1]
            
            # Update metrics
            metrics.update(
                fall_detected, 
                actual_fall,
                {
                    'video': video_name,
                    'frame_num': frame_count,
                    'predicted': fall_detected,
                    'actual': actual_fall,
                    'state': state,
                    'conditions': conditions
                }
            )
            
            # Display info
            color = (0, 255, 0) if fall_detected == actual_fall else (0, 0, 255)
            status = "CORRECT" if fall_detected == actual_fall else "WRONG"
            cv2.putText(nimg, f"Frame: {frame_count} | State: {state} | {status}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            if fall_range is None:
                gt_text = "GT: No fall"
            else:
                gt_text = f"GT Fall: {fall_range[0]}-{fall_range[1]}"
            cv2.putText(nimg, gt_text, (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
        cv2.imshow("Fall Detection", nimg)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# ================== Main Execution ==================
if __name__ == "__main__":
    # Load annotations
    annotations = load_annotations(LABELS_PATH)
    
    # Initialize metrics
    metrics = FallDetectionMetrics()
    
    # Process all test videos
    for video_file in os.listdir(VIDEOS_PATH):
        if video_file.endswith(".avi"):
            video_path = os.path.join(VIDEOS_PATH, video_file)
            process_video(video_path, annotations, metrics)
    
    # Save results
    metrics.save_results()
    print("\nEvaluation Complete!")
    print("Metrics saved to 'results/' directory")


Evaluation Complete!
Metrics saved to 'results/' directory


In [10]:
import os
import numpy as np
import cv2
import torch
from torchvision import transforms
from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts
from models.yolo import Model
import math
import time
from collections import defaultdict

# Initialize device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load YOLOv7-pose model
weights = torch.load('yolov7-w6-pose.pt', map_location=device, weights_only=False)
model = weights['model'].float().eval().to(device)
if torch.cuda.is_available():
    model = model.half()

# Dataset paths (sesuaikan dengan struktur folder Anda)
BASE_PATH = r"C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap"
PATHS = {
    'train': {'video': os.path.join(BASE_PATH, 'video', 'train'), 
              'label': os.path.join(BASE_PATH, 'labels', 'train')},
    'valid': {'video': os.path.join(BASE_PATH, 'video', 'valid'),
              'label': os.path.join(BASE_PATH, 'labels', 'valid')},
    'test': {'video': os.path.join(BASE_PATH, 'video', 'test'),
             'label': os.path.join(BASE_PATH, 'labels', 'test')}
}

# ================== Enhanced Fall Detection ==================
def detect_fall(keypoints, threshold=0.3):
    # Keypoint indices
    NOSE = 0
    L_SHOULDER, R_SHOULDER = 5, 6
    L_HIP, R_HIP = 11, 12
    L_ANKLE, R_ANKLE = 15, 16
    
    try:
        # Extract keypoints with lower confidence threshold
        kp = {
            'nose': keypoints[NOSE*3:(NOSE+1)*3],
            'l_shoulder': keypoints[L_SHOULDER*3:(L_SHOULDER+1)*3],
            'r_shoulder': keypoints[R_SHOULDER*3:(R_SHOULDER+1)*3],
            'l_hip': keypoints[L_HIP*3:(L_HIP+1)*3],
            'r_hip': keypoints[R_HIP*3:(R_HIP+1)*3],
            'l_ankle': keypoints[L_ANKLE*3:(L_ANKLE+1)*3],
            'r_ankle': keypoints[R_ANKLE*3:(R_ANKLE+1)*3]
        }
        
        # Check if keypoints are detected
        for k, v in kp.items():
            if v[2] < threshold:
                return False, "low_confidence", ["Keypoint low confidence"]
        
        # Calculate body parameters
        shoulder_y = (kp['l_shoulder'][1] + kp['r_shoulder'][1]) / 2
        feet_y = (kp['l_ankle'][1] + kp['r_ankle'][1]) / 2
        torso_length = np.sqrt((kp['l_shoulder'][0]-kp['l_hip'][0])**2 + 
                         (kp['l_shoulder'][1]-kp['l_hip'][1])**2)
        
        # Enhanced conditions for dark environments
        conditions = []
        
        # 1. Shoulder position relative to feet (adjusted for dark)
        if shoulder_y > feet_y - 0.5 * torso_length:  # More tolerant threshold
            conditions.append("shoulder_near_feet")
        
        # 2. Body orientation
        body_width = abs(kp['l_shoulder'][0] - kp['r_shoulder'][0])
        body_height = abs(kp['nose'][1] - feet_y)
        if body_width / (body_height + 1e-5) > 0.8:  # Reduced threshold
            conditions.append("horizontal_pose")
        
        # 3. Motion detection (fall speed)
        global prev_shoulder_y, prev_frame_time
        if prev_shoulder_y is not None and prev_frame_time is not None:
            time_elapsed = time.time() - prev_frame_time
            if time_elapsed > 0:
                speed = (shoulder_y - prev_shoulder_y) / time_elapsed
                if speed > 0.6:  # More sensitive to fast movements
                    conditions.append(f"high_speed_{speed:.1f}px")
        
        # Update tracking variables
        prev_shoulder_y = shoulder_y
        prev_frame_time = time.time()
        
        # Decision making (at least 2 conditions)
        if len(conditions) >= 2:
            return True, "fallen", conditions
        return False, "normal", conditions
    
    except Exception as e:
        return False, "error", [f"Error: {str(e)}"]

# ================== Main Execution ==================
if __name__ == "__main__":
    # Initialize metrics for each phase
    metrics = {
        'train': FallDetectionMetrics(),
        'valid': FallDetectionMetrics(),
        'test': FallDetectionMetrics()
    }
    
    # Process all phases
    for phase in ['train', 'valid', 'test']:
        print(f"\n{'='*40}")
        print(f"Processing {phase} set (Video: {len(os.listdir(PATHS[phase]['video']))} videos)")
        
        # Load annotations
        annotations = load_annotations(PATHS[phase]['label'])
        
        # Process videos
        for video_file in os.listdir(PATHS[phase]['video']):
            if video_file.endswith(".avi"):
                video_path = os.path.join(PATHS[phase]['video'], video_file)
                process_video(video_path, annotations, metrics[phase])
        
        # Save results
        metrics[phase].save_results(f"results_{phase}")
        print(f"{phase} metrics saved to results_{phase}/")
    
    # Print final summary
    print("\n=== FINAL RESULTS ===")
    for phase in ['train', 'valid', 'test']:
        res = metrics[phase].calculate_metrics()
        print(f"\n{phase.upper():<6} Precision: {res['precision']:.3f} | Recall: {res['recall']:.3f} | F1: {res['f1_score']:.3f}")


Processing train set (Video: 42 videos)
Loading annotations from: C:\Users\LENOVO\Documents\A Skripsi\datasets\FallDataset\Datasets sudah Grouping\A Gelap\labels\train
Successfully loaded 42 annotations

Processing video: video (1)
Fall frames in ground truth: (144, 164)

Processing video: video (10)
Fall frames in ground truth: (135, 150)

Processing video: video (11)
Fall frames in ground truth: (137, 170)

Processing video: video (12)
Fall frames in ground truth: (161, 173)

Processing video: video (13)
Fall frames in ground truth: (156, 167)

Processing video: video (14)
Fall frames in ground truth: (186, 199)

Processing video: video (15)
Fall frames in ground truth: (170, 184)

Processing video: video (16)
Fall frames in ground truth: (186, 199)

Processing video: video (17)
Fall frames in ground truth: (154, 169)

Processing video: video (18)
Fall frames in ground truth: (135, 149)

Processing video: video (19)
Fall frames in ground truth: (129, 139)

Processing video: video (2

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'C:\\Users\\LENOVO\\Documents\\A Skripsi\\datasets\\FallDataset\\Datasets sudah Grouping\\A Gelap\\video\\valid'