In [1]:
import os
import cv2 as cv
import numpy as np

In [4]:
video_folder = "video_test/fall"
output_base_folder = "frames_test_fall/"
target_size = (320, 240)

def extract_rgb_frames(video_path, output_folder, frame_step=1):
    cap = cv.VideoCapture(video_path)
    os.makedirs(output_folder, exist_ok=True)

    frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
    print(f"Processing {video_path} - Total Frames: {frame_count}")

    for i in range(0, frame_count, frame_step):  
        cap.set(cv.CAP_PROP_POS_FRAMES, i)  
        ret, frame = cap.read()
        if not ret:
            break

        h, w, _ = frame.shape  

        
        if w >= 2 * h:  
            rgb_frame = frame[:, w//2:, :]  
        else:
            rgb_frame = frame  

        frame_resized = cv.resize(rgb_frame, target_size, interpolation=cv.INTER_AREA)

        frame_path = os.path.join(output_folder, f"frame_{i:04d}.jpg")
        cv.imwrite(frame_path, frame_resized)

    cap.release()
    print(f"✅ Finished {video_path} - Extracted frames.")


video_files = [f for f in os.listdir(video_folder) if f.endswith((".avi", ".mp4"))]

for video_file in video_files:
    video_path = os.path.join(video_folder, video_file)
    output_folder = os.path.join(output_base_folder, os.path.splitext(video_file)[0])
    extract_rgb_frames(video_path, output_folder, frame_step=1)

print("✅ Done! RGB frames are extracted successfully.")


Processing video_test/fall\video_12.avi - Total Frames: 46
✅ Finished video_test/fall\video_12.avi - Extracted frames.
Processing video_test/fall\video_18.avi - Total Frames: 40
✅ Finished video_test/fall\video_18.avi - Extracted frames.
Processing video_test/fall\video_283_flip.avi - Total Frames: 32
✅ Finished video_test/fall\video_283_flip.avi - Extracted frames.
Processing video_test/fall\video_284_flip.avi - Total Frames: 31
✅ Finished video_test/fall\video_284_flip.avi - Extracted frames.
Processing video_test/fall\video_287.avi - Total Frames: 31
✅ Finished video_test/fall\video_287.avi - Extracted frames.
Processing video_test/fall\video_290_flip.avi - Total Frames: 43
✅ Finished video_test/fall\video_290_flip.avi - Extracted frames.
Processing video_test/fall\video_291_flip.avi - Total Frames: 46
✅ Finished video_test/fall\video_291_flip.avi - Extracted frames.
Processing video_test/fall\video_42.avi - Total Frames: 41
✅ Finished video_test/fall\video_42.avi - Extracted frames

In [None]:
video_folder = "video_test/adl"
output_base_folder = "frames_test_adl/"
target_size = (320, 240)

def extract_rgb_frames(video_path, output_folder, frame_step=1):
    cap = cv.VideoCapture(video_path)
    os.makedirs(output_folder, exist_ok=True)

    frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT))
    print(f"Processing {video_path} - Total Frames: {frame_count}")

    for i in range(0, frame_count, frame_step):  
        cap.set(cv.CAP_PROP_POS_FRAMES, i)  
        ret, frame = cap.read()
        if not ret:
            break

        h, w, _ = frame.shape  

        
        if w >= 2 * h:  
            rgb_frame = frame[:, w//2:, :]  
        else:
            rgb_frame = frame  

        frame_resized = cv.resize(rgb_frame, target_size, interpolation=cv.INTER_AREA)

        frame_path = os.path.join(output_folder, f"frame_{i:04d}.jpg")
        cv.imwrite(frame_path, frame_resized)

    cap.release()
    print(f"✅ Finished {video_path} - Extracted frames.")


video_files = [f for f in os.listdir(video_folder) if f.endswith((".avi", ".mp4"))]

for video_file in video_files:
    video_path = os.path.join(video_folder, video_file)
    output_folder = os.path.join(output_base_folder, os.path.splitext(video_file)[0])
    extract_rgb_frames(video_path, output_folder, frame_step=1)

print("✅ Done! RGB frames are extracted successfully.")


In [None]:
import os
import cv2 as cv
import pandas as pd
import mediapipe as mp
import numpy as np
from collections import deque


SELECTED_LANDMARKS = [0, 11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27, 28]

mpPose = mp.solutions.pose
pose = mpPose.Pose(min_detection_confidence=0.8, min_tracking_confidence=0.8)


acceleration_queue = deque(maxlen=5)

def calculate_acceleration(prev_velocity, current_velocity, time_interval):
    if prev_velocity is None or current_velocity is None or time_interval == 0:
        return np.zeros_like(current_velocity)
    return (current_velocity - prev_velocity) / time_interval

def detectPose(image, pose, prev_landmarks, prev_velocity, time_interval):
    imgHeight, imgWidth, _ = image.shape
    imageRGB = cv.cvtColor(image, cv.COLOR_BGR2RGB)
    results = pose.process(imageRGB)
    
    landmarks = []
    neck_y = None
    
    if results.pose_landmarks:
        for idx, landmark in enumerate(results.pose_landmarks.landmark):
            if idx in SELECTED_LANDMARKS:
                landmarks.append((landmark.x * imgWidth, landmark.y * imgHeight))
        
        left_shoulder = results.pose_landmarks.landmark[11]
        right_shoulder = results.pose_landmarks.landmark[12]
        neck_y = (left_shoulder.y + right_shoulder.y) / 2 * imgHeight
    
    current_velocity = None
    acceleration = None
    
    if prev_landmarks and landmarks:
        displacement = np.array(landmarks) - np.array(prev_landmarks)
        current_velocity = displacement / time_interval
        acceleration = calculate_acceleration(prev_velocity, current_velocity, time_interval)
        acceleration_queue.append(np.mean(np.abs(acceleration)))
    
    return results, landmarks, neck_y, current_velocity, acceleration

def classify_pose(neck_y, mean_acceleration, image_height):
    if neck_y is None:
        return "UNKNOWN"
    
   
    fall_threshold = 0.6 * image_height
    stay_threshold = 0.4 * image_height
    acceleration_threshold = 5  
    
    if neck_y > fall_threshold and mean_acceleration > acceleration_threshold:
        return "FALL"
    else:
        return "ADL"

def processFrames(input_folders, output_csv, output_image_folder):
    data = []
    os.makedirs(output_image_folder, exist_ok=True)
    
    for label, folder_path in input_folders.items():
        for subfolder in os.listdir(folder_path):
            subfolder_path = os.path.join(folder_path, subfolder)
            if os.path.isdir(subfolder_path):
                output_subfolder = os.path.join(output_image_folder, subfolder)
                os.makedirs(output_subfolder, exist_ok=True)
                
                prev_landmarks = None
                prev_velocity = None
                time_interval = 1 / 30  
                
                for filename in sorted(os.listdir(subfolder_path)):
                    if filename.endswith(('.jpg', '.png')):
                        file_path = os.path.join(subfolder_path, filename)
                        print(f"Processing: {file_path}")
                        
                        image = cv.imread(file_path)
                        if image is None:
                            print(f"Error reading image: {file_path}")
                            continue
                        
                        results, landmarks, neck_y, current_velocity, acceleration = detectPose(
                            image, pose, prev_landmarks, prev_velocity, time_interval
                        )
                        
                        if results.pose_landmarks:
                            mp.solutions.drawing_utils.draw_landmarks(
                                image, results.pose_landmarks, mpPose.POSE_CONNECTIONS
                            )
                            processed_image_path = os.path.join(output_subfolder, filename)
                            cv.imwrite(processed_image_path, image)
                        
                        if landmarks:
                            flattened_landmarks = [coord for landmark in landmarks for coord in landmark]
                            
                           
                            label = classify_pose(neck_y, np.mean(acceleration_queue) if acceleration_queue else 0, image.shape[0])
                            
                            flattened_landmarks.append(label)  
                            data.append(flattened_landmarks)
                        
                        prev_landmarks = landmarks
                        prev_velocity = current_velocity
    
    
    columns = [f'{axis}{i+1}' for i in range(len(SELECTED_LANDMARKS)) for axis in ['x', 'y']] + ['Label']
    
    df = pd.DataFrame(data, columns=columns)
    df.to_csv(output_csv, index=False)



input_folders = {
    'fall': 'frames_test_fall',
    'adl':  'frames_test_adl'
}
output_csv = 'pose_data_2d_test.csv'
output_image_folder = 'processed_images_test'


processFrames(input_folders, output_csv, output_image_folder)

Processing: frames_test_fall\video_12\frame_0000.jpg
Processing: frames_test_fall\video_12\frame_0001.jpg
Processing: frames_test_fall\video_12\frame_0002.jpg
Processing: frames_test_fall\video_12\frame_0003.jpg
Processing: frames_test_fall\video_12\frame_0004.jpg
Processing: frames_test_fall\video_12\frame_0005.jpg
Processing: frames_test_fall\video_12\frame_0006.jpg
Processing: frames_test_fall\video_12\frame_0007.jpg
Processing: frames_test_fall\video_12\frame_0008.jpg
Processing: frames_test_fall\video_12\frame_0009.jpg
Processing: frames_test_fall\video_12\frame_0010.jpg
Processing: frames_test_fall\video_12\frame_0011.jpg
Processing: frames_test_fall\video_12\frame_0012.jpg
Processing: frames_test_fall\video_12\frame_0013.jpg
Processing: frames_test_fall\video_12\frame_0014.jpg
Processing: frames_test_fall\video_12\frame_0015.jpg
Processing: frames_test_fall\video_12\frame_0016.jpg
Processing: frames_test_fall\video_12\frame_0017.jpg
Processing: frames_test_fall\video_12\frame_00