In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import random

# Configuration
originals_dir = "originals"
output_dir = "MMACTION"
frame_length = 50  # Number of frames in each clip
sliding_window = 10  # Sliding window step in frames

# Ensure the output directory exists
output_subdir = os.path.join(output_dir, str(frame_length))
os.makedirs(output_subdir, exist_ok=True)

# Load the labels from labels.txt
labels_df = pd.read_csv('labels.txt', sep="\t")

# Open the annotation file for writing
annotation_file_path = os.path.join(output_dir, f"{frame_length}.txt")
with open(annotation_file_path, 'w') as annotation_file:

    # Process each video file in the originals directory
    for video_file in os.listdir(originals_dir):
        if video_file.startswith("Fall") and video_file.endswith(".mp4"):
            print(f"Processing {video_file}")
            parts = video_file.replace(".mp4", "").split("_")
            clip_no = int(parts[0].replace("Fall", ""))
            cam_no = parts[1]

            # Get the fall start and end times for this clip
            fall_info = labels_df[labels_df['Clip'] == clip_no]
            fall_start = fall_info['Start'].values[0]  # In seconds
            fall_end = fall_info['End'].values[0]  # In seconds

            # Open the video file
            video_path = os.path.join(originals_dir, video_file)
            cap = cv2.VideoCapture(video_path)
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

            # Convert fall start and end times to frames
            fall_start_frame = int(fall_start * fps)
            fall_end_frame = int(fall_end * fps)

            fall_clips = []
            non_fall_clips = []

            # --- Generate fall clips ---
            # Generate clips strictly within the fall window
            for start_frame in range(fall_start_frame-frame_length+sliding_window, fall_end_frame, sliding_window):
                end_frame = start_frame + frame_length

                # Ensure the clip contains part of the fall
                is_fall = not (end_frame < fall_start_frame or start_frame > fall_end_frame)
                fall_label = 1 if is_fall else 0

                # Output filename
                output_name = f"{clip_no}_{fall_label}_{cam_no}_fall_{start_frame}_{end_frame}.mp4"
                output_path = os.path.join(output_subdir, output_name)

                # Extract frames
                cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
                frames = []
                for _ in range(frame_length):
                    ret, frame = cap.read()
                    if ret:
                        frames.append(frame)
                    else:
                        # Append black frames if not enough frames left
                        black_frame = np.zeros((height, width, 3), dtype=np.uint8)
                        frames.append(black_frame)

                # Write the frames to a video file
                out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
                for frame in frames:
                    out.write(frame)
                out.release()

                # Write the annotation for this clip to the annotation file
                annotation_file.write(f"{output_name} {fall_label}\n")

                # Append fall clip to list
                fall_clips.append(output_name)

            # --- Generate non-fall clips ---
            # We need to sample non-fall clips from outside the fall range
            non_fall_start = 0
            non_fall_end = total_frames
            valid_non_fall_ranges = []

            # Add valid non-fall ranges outside the fall window
            if non_fall_start < fall_start_frame - frame_length:
                valid_non_fall_ranges.append((non_fall_start, fall_start_frame - frame_length))
            if fall_end_frame + frame_length < non_fall_end:
                valid_non_fall_ranges.append((fall_end_frame + frame_length, non_fall_end))

            # Sample non-fall clips until the number of fall and non-fall clips are equal
            while len(non_fall_clips) < len(fall_clips):
                # Randomly choose a range to sample from
                chosen_range = random.choice(valid_non_fall_ranges)
                range_start, range_end = chosen_range

                # Randomly sample a starting frame within this valid range
                start_frame = random.randint(range_start, range_end - frame_length)
                end_frame = start_frame + frame_length

                non_fall_label = 0

                # Output filename for non-fall clip
                output_name = f"{clip_no}_{non_fall_label}_{cam_no}_nonfall_{start_frame}_{end_frame}.mp4"
                output_path = os.path.join(output_subdir, output_name)

                # Extract frames
                cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
                frames = []
                for _ in range(frame_length):
                    ret, frame = cap.read()
                    if ret:
                        frames.append(frame)
                    else:
                        # Append black frames if not enough frames left
                        black_frame = np.zeros((height, width, 3), dtype=np.uint8)
                        frames.append(black_frame)

                # Write the frames to a video file
                out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
                for frame in frames:
                    out.write(frame)
                out.release()

                # Write the annotation for this clip to the annotation file
                annotation_file.write(f"{output_name} {non_fall_label}\n")

                # Append non-fall clip to list
                non_fall_clips.append(output_name)

            # Release the video capture
            cap.release()


Processing Fall2_Cam4.mp4
2870 2920 2910 3000
2880 2930 2910 3000
2890 2940 2910 3000
2900 2950 2910 3000
2910 2960 2910 3000
2920 2970 2910 3000
2930 2980 2910 3000
2940 2990 2910 3000
2950 3000 2910 3000
2960 3010 2910 3000
2970 3020 2910 3000
2980 3030 2910 3000
2990 3040 2910 3000


In [45]:
import os
import shutil
import random

split_ratio = 0.8  # 80/20 split for training and validation
output_dir = "MMACTION"
balanced_annotation_file_path = os.path.join(output_dir, f"{frame_length}.txt")

# Paths to the directories
train_dir = os.path.join(output_dir, f"{frame_length}_train")
val_dir = os.path.join(output_dir, f"{frame_length}_validation")

# Create train and validation directories
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

# Lists to hold the filenames for train and validation splits
train_clips = []
val_clips = []

# Read the balanced annotation file
with open(balanced_annotation_file_path, 'r') as f:
    clips_with_labels = [line.strip() for line in f.readlines()]

# Shuffle the clips for randomness
random.shuffle(clips_with_labels)

# Calculate the split index
split_index = int(len(clips_with_labels) * split_ratio)

# Split the clips into train and validation sets
train_clips = clips_with_labels[:split_index]
val_clips = clips_with_labels[split_index:]

# Function to move files and write annotations
def move_files_and_create_annotation(clips, folder, annotation_filename):
    annotation_file_path = os.path.join(output_dir, annotation_filename)
    with open(annotation_file_path, 'w') as annotation_file:
        for clip in clips:
            filename, label = clip.split()
            label = int(label)
            
            # Move the video file
            source_video_path = os.path.join(output_dir, str(frame_length))
            source_video_path = os.path.join(source_video_path, filename)
            dest_video_path = os.path.join(folder, filename)
            shutil.move(source_video_path, dest_video_path)
            
            # Write the annotation to the corresponding .txt file
            annotation_file.write(f"{filename} {label}\n")

# Move the files and create the annotation files for train and validation
move_files_and_create_annotation(train_clips, train_dir, f"{frame_length}_train.txt")
move_files_and_create_annotation(val_clips, val_dir, f"{frame_length}_validation.txt")

# Output the number of clips in each split
print(f"Number of train clips: {len(train_clips)}")
print(f"Number of validation clips: {len(val_clips)}")


Number of train clips: 20
Number of validation clips: 6


In [46]:
import shutil
import os

# Path to the frame_length folder and original annotation file
frame_length_folder = os.path.join(output_dir, str(frame_length))
original_annotation_file = os.path.join(output_dir, f"{frame_length}.txt")

# Remove the frame_length folder if it exists
if os.path.exists(frame_length_folder):
    shutil.rmtree(frame_length_folder)
    print(f"Removed folder: {frame_length_folder}")
else:
    print(f"Folder {frame_length_folder} does not exist.")

# Remove the original annotation file if it exists
if os.path.exists(original_annotation_file):
    os.remove(original_annotation_file)
    print(f"Removed annotation file: {original_annotation_file}")
else:
    print(f"Annotation file {original_annotation_file} does not exist.")


Removed folder: MMACTION/50
Removed annotation file: MMACTION/50.txt
