In [1]:
import cv2
import mediapipe as mp

In [2]:
import os
import math

In [3]:
input_folder = "./input/test/nofall"  # folder containing .avi files
output_folder = "./input/test/nofall_annotations"
os.makedirs(output_folder, exist_ok=True)

In [5]:
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.24)

In [6]:
def get_coords(landmarks, idx):
    pt = landmarks[idx]
    return (pt.x, pt.y) if pt.visibility > 0.5 else (None, None)

In [7]:
def is_valid(*coords):
    for c in coords:
        if c is None or c[0] is None or c[1] is None:
            return False
        if not isinstance(c[0], (float, int)) or not isinstance(c[1], (float, int)):
            return False
    return True

In [8]:
def extract_pose_line(lm):
    l_sh, r_sh = get_coords(lm, 11), get_coords(lm, 12)
    l_hip, r_hip = get_coords(lm, 23), get_coords(lm, 24)
    l_knee, r_knee = get_coords(lm, 25), get_coords(lm, 26)
    l_ankle, r_ankle = get_coords(lm, 27), get_coords(lm, 28)

    if not is_valid(l_sh, r_sh, l_hip, r_hip, l_knee, r_knee, l_ankle, r_ankle):
        return None

    mid_sh = [(l_sh[0] + r_sh[0]) / 2, (l_sh[1] + r_sh[1]) / 2]
    mid_hip = [(l_hip[0] + r_hip[0]) / 2, (l_hip[1] + r_hip[1]) / 2]

    line = mid_sh + mid_hip + list(l_hip) + list(r_hip) + list(l_hip) + list(l_knee) + list(l_ankle) + list(r_hip) + list(r_knee) + list(r_ankle)
    return line

In [9]:
def process_video(video_path, output_txt_path):
    cap = cv2.VideoCapture(video_path)
    frame_id = 0

    with open(output_txt_path, "w") as out_file:
        out_file.write("0\n0\n")  # First two lines

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            result = pose.process(frame_rgb)

            if result.pose_landmarks:
                lm = result.pose_landmarks.landmark
                pose_line = extract_pose_line(lm)
                if pose_line:
                    out_file.write(f"{frame_id} " + " ".join([f"{v:.6f}" for v in pose_line]) + "\n")
                else:
                    out_file.write(f"{frame_id} " + " ".join(["-1"] * 20) + "\n")
            else:
                out_file.write(f"{frame_id} " + " ".join(["-1"] * 20) + "\n")

            frame_id += 1

    cap.release()

In [10]:
for file in os.listdir(input_folder):
    if file.lower().endswith(".avi"):
        input_path = os.path.join(input_folder, file)
        output_path = os.path.join(output_folder, os.path.splitext(file)[0] + ".txt")
        print(f"🔍 Processing {file} → {os.path.basename(output_path)}")
        process_video(input_path, output_path)
pose.close()
print("✅ All videos processed.")

🔍 Processing video (1).avi → video (1).txt
🔍 Processing video (10).avi → video (10).txt
🔍 Processing video (11).avi → video (11).txt
🔍 Processing video (12).avi → video (12).txt
🔍 Processing video (13).avi → video (13).txt
🔍 Processing video (14).avi → video (14).txt
🔍 Processing video (15).avi → video (15).txt
🔍 Processing video (16).avi → video (16).txt
🔍 Processing video (17).avi → video (17).txt
🔍 Processing video (18).avi → video (18).txt
🔍 Processing video (19).avi → video (19).txt
🔍 Processing video (2).avi → video (2).txt
🔍 Processing video (20).avi → video (20).txt
🔍 Processing video (21).avi → video (21).txt
🔍 Processing video (22).avi → video (22).txt
🔍 Processing video (23).avi → video (23).txt
🔍 Processing video (24).avi → video (24).txt
🔍 Processing video (25).avi → video (25).txt
🔍 Processing video (26).avi → video (26).txt
🔍 Processing video (27).avi → video (27).txt
🔍 Processing video (28).avi → video (28).txt
🔍 Processing video (29).avi → video (29).txt
🔍 Processing v

In [28]:
def draw_lines_on_video(video_path, txt_path, output_path):
    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    out = cv2.VideoWriter(output_path,
                          cv2.VideoWriter_fourcc(*'XVID'),
                          fps, (frame_width, frame_height))

    # Read annotation file
    with open(txt_path, "r") as f:
        lines = f.readlines()

    lines = lines[2:]  # skip first two '0' lines

    def norm_to_px(x, y):
        return int(x * frame_width), int(y * frame_height)

    for i, line in enumerate(lines):
        ret, frame = cap.read()
        if not ret:
            break

        parts = list(map(float, line.strip().split()))
        if len(parts) < 21 or parts[1] == -1:
            out.write(frame)
            continue

        # Unpack values
        _, bx1, by1, bx2, by2, \
        lx1, ly1, rx1, ry1, \
        llh_x, llh_y, llk_x, llk_y, lla_x, lla_y, \
        rlh_x, rlh_y, rlk_x, rlk_y, rla_x, rla_y = parts

        # Draw torso
        cv2.line(frame, norm_to_px(bx1, by1), norm_to_px(bx2, by2), (255, 0, 0), 2)
        # Draw pelvis
        cv2.line(frame, norm_to_px(lx1, ly1), norm_to_px(rx1, ry1), (0, 255, 0), 2)
        # Left leg: hip → knee → ankle
        cv2.line(frame, norm_to_px(llh_x, llh_y), norm_to_px(llk_x, llk_y), (0, 255, 255), 2)
        cv2.line(frame, norm_to_px(llk_x, llk_y), norm_to_px(lla_x, lla_y), (0, 255, 255), 2)
        # Right leg
        cv2.line(frame, norm_to_px(rlh_x, rlh_y), norm_to_px(rlk_x, rlk_y), (255, 255, 0), 2)
        cv2.line(frame, norm_to_px(rlk_x, rlk_y), norm_to_px(rla_x, rla_y), (255, 255, 0), 2)

        out.write(frame)

    cap.release()
    out.release()
    print(f"✅ Saved: {output_path}")