In [None]:
%pip install ultralytics tqdm numpy opencv-python datasets

In [None]:
import os
import json
import cv2
from ultralytics import YOLO
import shutil
import numpy as np
import logging
from tqdm import tqdm
import re
import random
import string

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
current_working_directory = os.getcwd()

YOLO_path = os.path.join(current_working_directory, "models/yolov8n-pose.pt") #path to the YOLO model
datasetDir = os.path.join(current_working_directory, "archive", "RWF-2000") #change here to the other dataset like RFC-2000
newKeypointsDir = os.path.join(current_working_directory, "archive", "Keypoints_Dataset") #The new json data are saved here
os.makedirs(newKeypointsDir, exist_ok=True)

In [None]:
# Options for saving
save_original = False  # Set to True if you want to save the original video
save_processed = False  # Set to True if you want to save the processed video with keypoints

In [None]:

model = YOLO(YOLO_path) #staring the model

In [None]:
# Define dataset categories and video processing limits
sets = ['train', 'val']
categories = ['Fight', 'NonFight']
video_limit = 10  # Limit number of videos processed per category

#making the labels in the COCO format for YOLO model
labeledKeypoints = [
    "nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"
]

In [None]:
def fixFileName(filename):
    fixedName = re.sub(r'[^A-Za-z0-9_]', '', filename)
    
    # If the sanitized name is too short or empty, create a random name
    if len(fixedName) < 5:
        fixedName = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
    
    return fixedName

In [None]:
# Process each set (train/val) and category (Fight/NonFight)
for set_name in tqdm(sets, desc="Processing Sets"):
    for category in tqdm(categories, desc=f"Processing Categories in {set_name}", leave=False):
        input_dir = os.path.join(datasetDir, set_name, category)
        video_count = 0

        for video_file in tqdm(os.listdir(input_dir), desc=f"Processing Videos in {category}", leave=False):
            if not video_file.endswith(('.avi', '.mp4')):
                continue
            
            ## uncomment the code below to apply the video limit
            # if video_count >= video_limit:
            #     break

            # Sanitize or generate a unique name for problematic video filenames
            videoName = fixFileName(os.path.splitext(video_file)[0])
            videoDirectory = os.path.join(newKeypointsDir, set_name, category, f"{videoName}")
            os.makedirs(videoDirectory, exist_ok=True)

            videoPath = os.path.join(input_dir, video_file)
            json_output_path = os.path.join(videoDirectory, f"{videoName}.json")

            # Copy original video if save_original is True
            if save_original:
                original_save_path = os.path.join(videoDirectory, f"{videoName}.avi")
                try:
                    shutil.copy2(videoPath, original_save_path)
                    logging.info(f"Original video saved: {original_save_path}")
                except FileNotFoundError as e:
                    logging.warning(f"File not found or accessible: {videoPath}. Skipping this file.")
                    continue

            # making the video capture ready for getting frames
            videoCap = cv2.VideoCapture(videoPath)
            frame_index = 0
            keyPointsData = []

            # Set up writer for processed video if save_processed is True
            if save_processed:
                processedVideoP = os.path.join(videoDirectory, f"{videoName}_processed.avi")
                fourcc = cv2.VideoWriter_fourcc(*'XVID')
                fps = videoCap.get(cv2.CAP_PROP_FPS) #frames for the video
                frameWidth = int(videoCap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frameHeight = int(videoCap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                processedVideoWriter = cv2.VideoWriter(processedVideoP, fourcc, fps, (frameWidth, frameHeight))
                logging.info(f"Writer initialized for: {processedVideoP}")

            # Process each frame
            while videoCap.isOpened():
                ret, frame = videoCap.read()
                if not ret:
                    break

                # Detect keypoints using YOLO model
                results = model(frame)
                newFrameData = []

                # Extract keypoints and bounding box data for JSON output
                for result in results:
                    boxes = result.boxes
                    keypoints = result.keypoints

                    if boxes is not None and len(boxes) > 0:
                        for i in range(len(boxes)):
                            box_data = boxes.xyxy[i].cpu().numpy()
                            confidence = boxes.conf[i].cpu().item()
                            box = {
                                "x1": float(box_data[0]),
                                "y1": float(box_data[1]),
                                "x2": float(box_data[2]),
                                "y2": float(box_data[3])
                            }

                            keypoints_data = []
                            if keypoints is not None:
                                keypoints_array = keypoints.data[i].cpu().numpy()
                                for j, (x, y, conf) in enumerate(keypoints_array):
                                    keypoints_data.append({
                                        "label": labeledKeypoints[j],
                                        "coordinates": {"x": float(x), "y": float(y)},
                                        "confidence": float(conf)
                                    })

                            newFrameData.append({
                                "person_id": i + 1,
                                "confidence": confidence,
                                "box": box,
                                "keypoints": keypoints_data
                            })

                keyPointsData.append({"frame": frame_index, "detections": newFrameData})

                # Use results[0].plot() for processed video frame
                if save_processed:
                    frame_with_keypoints = results[0].plot()
                    processedVideoWriter.write(frame_with_keypoints)

                frame_index += 1

            # Release resources
            videoCap.release()
            if save_processed:
                processedVideoWriter.release()
                logging.info(f"Processed video saved to: {processedVideoP}")
                
            # Save JSON data for keypoints
            with open(json_output_path, 'w') as json_file:
                json.dump(keyPointsData, json_file, indent=4)
                logging.info(f"Keypoints data saved to: {json_output_path}")

            video_count += 1

print("Done")
