In [1]:
from ultralytics import YOLO
import cv2
import os
import torch
import numpy as np
from ultralytics import YOLO
from tensorflow.keras.models import load_model
import joblib

In [7]:
# If using GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [9]:
# MODEL VIDEO TESTING
# Base Parameters
WINDOW = 30  # frames
STRIDE = 10   # next window step
K_CONSEC_FALL = 8  # consecutive frames to alert
K_CONSEC_ATT = 5
CONFIG = "lstm"
CONFIG_LABEL = f"{WINDOW}_s{STRIDE}_kf{K_CONSEC_FALL}_ka{K_CONSEC_ATT}"
# Parameters
NUM_CLASSES = 3
OUTPUT_BASE_PATH = "results/fall_detect_yolo11n_pose_balanced"
OUTPUT_DATASET_DIR = f"{OUTPUT_BASE_PATH}/windows_{CONFIG_LABEL}/"
OUTPUT_LSTM_MODEL_DIR = f"{OUTPUT_BASE_PATH}/lstm_model_w{CONFIG_LABEL}_{CONFIG}"
OUTPUT_LSTM_MODEL_FULL = "lstm_model_full.keras"
OUTPUT_LSTM_MODEL_BEST = "lstm_model_best.keras"
OUTPUT_LSTM_MODEL_HISTORY = "lstm_model_history.json"
VIDEOS_TEST_PATH = "videos_test"
SCALER_PATH = f"{OUTPUT_LSTM_MODEL_DIR}/train_test_split/scaler.joblib"
YOLO_MODEL_PATH = f"{OUTPUT_BASE_PATH}/yolo11n_pose_train/weights/best.pt"
LSTM_MODEL_PATH = f"{OUTPUT_LSTM_MODEL_DIR}/{OUTPUT_LSTM_MODEL_BEST}"
MODEL_TEST_PATH = f"{OUTPUT_BASE_PATH}/model_test_w{CONFIG_LABEL}_{CONFIG}"
os.makedirs(MODEL_TEST_PATH, exist_ok=True)
LABELS = {0: "no_fall", 1: "fall", 2: "attention"}
#LABELS = ['no_fall', 'fall', 'attention']

# Check if model files exist
if not os.path.exists(YOLO_MODEL_PATH):
    raise FileNotFoundError(f"YOLO model not found in {YOLO_MODEL_PATH}")
if not os.path.exists(LSTM_MODEL_PATH):
    raise FileNotFoundError(f"LSTM model not found in {LSTM_MODEL_PATH}")
    
model_yolo = YOLO(YOLO_MODEL_PATH)
model_lstm = load_model(LSTM_MODEL_PATH)
scaler = joblib.load(SCALER_PATH)
print("All loaded. Ready for prediction.")

All loaded. Ready for prediction.


In [11]:
### Script model testing
def extract_box_features(box):
    x1, y1, x2, y2 = box
    width = x2 - x1
    height = y2 - y1
    x_center = x1 + width / 2
    y_center = y1 + height / 2
    area = width * height
    aspect_ratio = width / height if height != 0 else 0
    return np.array([x1, y1, x2, y2, x_center, y_center, width, height, area, aspect_ratio], dtype=np.float32)

In [13]:
def normalize_box_features(features, frame_shape):
    frame_height, frame_width = frame_shape[:2]
    max_area = frame_width * frame_height

    # Normalize os valores com base na dimensão do frame
    normalized = np.array([
        features[0] / frame_width,     # x1
        features[1] / frame_height,    # y1
        features[2] / frame_width,     # x2
        features[3] / frame_height,    # y2
        features[4] / frame_width,     # x_center
        features[5] / frame_height,    # y_center
        features[6] / frame_width,     # width
        features[7] / frame_height,    # height
        features[8] / max_area,        # area
        features[9]                    # aspect_ratio (já é uma razão)
    ], dtype=np.float32)

    return normalized

In [15]:
def extract_pose_features(pose_data, frame_shape):
    keypoints = []
    frame_height, frame_width = frame_shape[:2]
    
    if pose_data is None or len(pose_data) == 0:
        return np.zeros(54, dtype=np.float32)
    
    for x, y, c in pose_data:
        #if c < 0.5: # ignora quando confiança é menor que 50% - evita ruídos
        #    continue
        keypoints.extend([x / frame_width, y / frame_height, c])
    return np.array(keypoints, dtype=np.float32)

In [17]:
def diff_features(curr: np.ndarray, prev: np.ndarray) -> np.ndarray:
    """Difference (frame_t - frame_{t-1}) in all normalized features."""
    if prev is None or prev.shape != curr.shape:
        return np.zeros_like(curr, dtype=np.float32)
    return (curr - prev).astype(np.float32)

In [19]:
def predict_from_video(video_path):
    if not os.path.exists(video_path):
        print(f"Error: video '{video_path}' not found.")
        return
        
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening the video '{video_path}'")
        return

    # Parâmetros para salvar o vídeo
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # ou 'XVID'
    output = cv2.VideoWriter(OUTPUT_PATH, fourcc, fps, (width, height))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total de frames: {total_frames}")
    
    # Definir checkpoints de 10%
    checkpoints = [int(total_frames * i / 10) for i in range(1, 11)]
    # sequence = []
    frame_count = 0
    sequences_by_id = {}
    prev_features_by_id = {}
    predictions_by_id = {}
    #all_sequences = []
    #tick = 0

    print("Processing video...")

    while cap.isOpened():
        valid, frame = cap.read()
        
        if not valid:
            print("End of video or error reading frame")
            break

        # Check if frame is not empty
        if frame is None or frame.size == 0:
            print("Empty frame found")
            continue

        frame_count += 1
        annotated_frame = frame.copy()
        
        if frame_count in checkpoints:
            percent = (frame_count / total_frames) * 100
            print(f"Processed {percent:.0f}% ({frame_count}/{total_frames})")
        
        # YOLO: person detection
        results = model_yolo.track(frame, verbose=False)
        
        #if len(results) == 0 or results[0].boxes is None or len(results[0].boxes) == 0:
        #    print(f"Frame {frame_count}: No detection")
            #cv2.imshow('Detecção de Queda', annotated_frame)
            
        #    if cv2.waitKey(1) & 0xFF == ord('q'):
        #        break
        #    continue

        if results[0].boxes is None or results[0].boxes.id is None:
            output.write(annotated_frame)
            continue

        boxes = results[0].boxes.xyxy.cpu().numpy()
        classes = results[0].boxes.cls.cpu().numpy().astype(int)
        keypoints = results[0].keypoints.data.cpu().numpy()
        track_ids = results[0].boxes.id.cpu().numpy().astype(int)

        #if len(boxes) == 0:
        #    continue

        for i, (box, kp, track_id) in enumerate(zip(boxes, keypoints, track_ids)):
            label = classes[i]
            #if label not in [0, 1, 2]:
            #    continue

            if track_id not in sequences_by_id:
                sequences_by_id[track_id] = []
                prev_features_by_id[track_id] = None
                predictions_by_id[track_id] = (0, 0.0) # (class_id, confidence)

            box_feat = extract_box_features(box)
            norm_box = normalize_box_features(box_feat, frame.shape)
            pose_feat = extract_pose_features(kp, frame.shape)

            combined_static_features = np.concatenate([norm_box, pose_feat])
            velocity_features = diff_features(combined_static_features, prev_features_by_id[track_id])
            prev_features_by_id[track_id] = combined_static_features.copy()
            all_features = np.concatenate([combined_static_features, velocity_features])
            sequences_by_id[track_id].append((all_features)) #label

            if len(sequences_by_id[track_id]) > WINDOW:
                sequences_by_id[track_id].pop(0)
                
            #combined_feat = np.concatenate([norm_box, pose_feat])  # 64 features
            #sequence.append(combined_feat)

            if len(sequences_by_id[track_id]) == WINDOW:
                #input_seq = np.expand_dims(np.array(current_sequence), axis=0)
                input_seq = np.array(sequences_by_id[track_id])
                input_scaled = scaler.transform(input_seq)
                input_final = np.expand_dims(input_scaled, axis=0)

                pred = model_lstm.predict(input_final, verbose=0)
                class_id = np.argmax(pred)
                confidence = pred[0][class_id]
                #label_str = LABELS[class_id]
                predictions_by_id[track_id] = (class_id, confidence) # store last prediction

                for track_id in predictions_by_id:
                    # Encontrar o box atual do track_id para desenhar
                    current_box = None
                    for i, tid in enumerate(track_ids):
                        if tid == track_id:
                            current_box = boxes[i]
                            break
                    
                    if current_box is not None:
                        class_id, confidence = predictions_by_id[track_id]
                        label_str = LABELS.get(class_id, "unknown")
                        color = {
                            'no_fall': (0, 255, 0), 
                            'attention': (0, 255, 255), 
                            'fall': (0, 0, 255)
                        }.get(label_str, (255, 255, 255))
                        
                        x1, y1, x2, y2 = current_box.astype(int)
                        cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
                        cv2.putText(annotated_frame, f'ID {track_id}: {label_str} ({confidence:.2f})', (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                        
                # Draw prediction
                #x1, y1, x2, y2 = box.astype(int)
                # Class colors
                #if label_str == 'no_fall':
                #    color = (0, 255, 0)  # Verde
                #elif label_str == 'attention':
                #    color = (0, 255, 255)  # Amarelo
                #elif label_str == 'fall':
                #    color = (0, 0, 255)  # Vermelho
                #else:
                #    color = (255, 255, 255)  # Branco (fallback)

                # Draw border and text
                #cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                #cv2.putText(frame, f'{label_str} ({confidence:.2f})', (x1, y1 - 10),
                #            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

                #print(f"Predição frame: {frame_count}: {label} - Confiança: {confidence:.2f}")
                #sequence.pop(0)

        output.write(annotated_frame)

    cap.release()
    output.release()
    cv2.destroyAllWindows()
    print("Video processing completed.")

In [39]:
INPUT_PATH = f"{VIDEOS_TEST_PATH}/070.mp4"
OUTPUT_PATH = f"{MODEL_TEST_PATH}/070A_w{CONFIG_LABEL}_{CONFIG}.mp4"

predict_from_video(INPUT_PATH)

Total de frames: 188
Processing video...
Processed 10% (18/188)
Processed 20% (37/188)
Processed 30% (56/188)
Processed 40% (75/188)
Processed 50% (94/188)
Processed 60% (112/188)
Processed 70% (131/188)
Processed 80% (150/188)
Processed 90% (169/188)
Processed 100% (188/188)
End of video or error reading frame
Video processing completed.
