# Dublin AFL Preprocessing

This notebook contains the code to prepare the AFL data for training, processing it from full videos hosted on S3 in Sydney, to frames that only contain the ball hosted in Dublin.

We assume that we have a `.xml` file with the annotations already uploaded to S3

In [None]:
# Install libraries
!pip install opencv-python


In [None]:
# Import libraries
import boto3
import cv2
import os
import subprocess

from typing import List, Dict

In [None]:
# Define constants
AUS_AFL_BUCKET: str = "australia-fov/"
DUB_AFL_BUCKET: str = "dublin-afl-preprocessed/"

TMP_DIR: str = "tmp"

VIDEOS_TO_PROCESS: List[str] = [
    "marvel/marvel-fov-3/20_08_2023/time_04_09_06_date_20_08_2023_.avi.avi",
    # "marvel-fov-1\\20_08_2023\\marvel_1_time_04_09_04_date_20_08_2023_.avi",
    # "marvel-fov-8\\26_08_2023\\marvel_8_time_09_09_04_date_27_08_2023_.avi"
]

First, let's inspect our buckets

In [None]:
# Ensure that each video path exists on s3
s3 = boto3.client('s3')

print("Checking video paths exist on S3...")
for video_path in VIDEOS_TO_PROCESS:
    try:
        s3.head_object(Bucket=AUS_AFL_BUCKET, Key=video_path)
        print(f"Video {video_path} exists on S3")
    except:
        print(f"Video {video_path} does not exist on S3!")

## Utility functions

Next, we define a bunch of helper function

In [None]:
import xml.etree.ElementTree as ET


def process_cvat_annotations(annotations_file: str) -> Dict[int, List[int]]:

    # Check if annotations file exists and is a .xml file
    if not os.path.exists(annotations_file):
        raise FileNotFoundError("Annotations file does not exist.")
    if not annotations_file.endswith(".xml"):
        raise ValueError("Annotations file must be a .xml file.")

    annotations: Dict[int, List[int, int]] = {}

    tree = ET.parse(annotations_file)
    root = tree.getroot()
    for child in root:
        for subchild in child:
            if "frame" in subchild.attrib:
                frame = int(subchild.attrib['frame'])
                x = int(float(subchild.attrib['points'].split(',')[0]))
                y = int(float(subchild.attrib['points'].split(',')[1]))
                annotations[frame] = [x, y]

    # Sort the dictionary by key and return
    return {k: annotations[k] for k in sorted(annotations)}


def ensure_directory_exists(path: str) -> None:
    """
    Ensures that the directory at the specified path exists.

    Args:
    path (str): Path to the directory.

    Returns:
    None
    """

    if not os.path.exists(path):
        print(f'Creating directory at {path}')
        os.makedirs(path)
    else:
        print(f'Directory already exists at {path}')


def create_directory(base_path: str, directory_name: str) -> str:
    """
    Creates a directory inside the base_path with the specified name if it doesn't already exist.

    Args:
    base_path (str): Base path where the directory should be created.
    directory_name (str): Name of the directory to be created.

    Returns:
    str: Path to the created directory.
    """
    dir_path = os.path.join(base_path, directory_name)
    ensure_directory_exists(dir_path)
    return dir_path

def find_files_with_ending(directory: str, file_ending: str = '.avi') -> List[str]:
    """
    Find all files and subfile paths in a directory with the given file ending.

    Args:
    directory (str): Path to the directory to search in.
    file_ending (str): Desired file ending, e.g., '.avi', '.mp4', etc.

    Returns:
    List[str]: List of paths to files with the given ending.
    """
    matched_files = []

    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(file_ending):
                matched_files.append(os.path.join(root, file))

    print(f"Found {len(matched_files)} files with ending {file_ending} in {directory}")

    return matched_files

def files_exist_with_extension(directory: str, extension: str) -> bool:
    """
    Check if files with the specified extension exist in the given directory.

    Args:
    directory (str): Path to the directory to search in.
    extension (str): File extension to search for.

    Returns:
    bool: True if any files with the given extension are found, otherwise False.
    """
    return any(f.endswith(extension) for f in os.listdir(directory))

def mp4_files_exist(directory: str) -> bool:
    """Check if .mp4 files exist in the given directory."""
    return files_exist_with_extension(directory, '.mp4')


def png_files_exist(directory: str) -> bool:
    """Check if .png files exist in the given directory."""
    return files_exist_with_extension(directory, '.png')


Next, we'll define functions that we'll use to process the videos. We will only operate on one video at a time so we don't need too much storage space locally.

In [None]:

def clip_video(video, output_dir: str, clip_length: int = 60) -> None:
    """
    This function clips a video into multiple 60 second long clips.
    We use FFmpeg directly to ensure lossless clipping.
    """

    # Check if the output_directory exists, and if not, make it
    ensure_directory_exists(output_dir)

    # Load the video and get its length
    cv2_video = cv2.VideoCapture(video)
    video_length: int = int(cv2_video.get(cv2.CAP_PROP_FRAME_COUNT))
    fps: int = int(cv2_video.get(cv2.CAP_PROP_FPS))

    # Get the number of clips we need to make
    num_clips: int = (video_length // fps) // clip_length

    # Get the remainder
    remainder: int = video_length % clip_length

    # Get the start and end times for each clip
    start_times: List[int] = [i * clip_length for i in range(num_clips)]

    # If there is a remainder, add it to the end of the list
    if remainder > 0:
        start_times.append(num_clips * clip_length)

    # Get the end times
    end_times: List[int] = [i + clip_length for i in start_times]

    # Get the output names
    output_names: List[str] = [output_dir + "\\" + str(i) + ".mp4" for i in range(len(start_times))]

    # Clip the video
    for i in range(len(start_times)):
        print(f"Clipping video {video} from {start_times[i]} to {end_times[i]}")

        # Craft the FFmpeg command for lossless clipping
        cmd = [
            'ffmpeg',
            '-i', video,
            '-ss', str(start_times[i]),
            '-t', str(clip_length),
            '-c:v', 'copy',
            '-an',  # This excludes the audio. If you want to include audio, you can remove this.
            output_names[i]
        ]

        subprocess.run(cmd)

        print(f"Finished clipping video {video} from {start_times[i]} to {end_times[i]}")

In [None]:
def extract_frames_from_video(file_path: str) -> None:
    """Given a .avi video file path, extract frames using ffmpeg."""

    # Create a directory called "frames"
    file_name_without_extension = os.path.splitext(os.path.basename(file_path))[0]
    file_dir = os.path.dirname(file_path)

    parent_video_name = file_path.split('\\')[-2]
    frames_dir = create_directory(file_dir, directory_name=f'{parent_video_name}{file_name_without_extension}')

    if png_files_exist(frames_dir):
        print(f'Frames already exist in {frames_dir}')
        return

    # Construct the ffmpeg command
    cmd = [
        'ffmpeg',
        '-i', file_path,
        '-vf', 'fps=30/1',
        '-start_number', '0',
        f'{frames_dir}\\frame_%07d.png'
    ]

    # Execute the command via subprocess
    print(cmd)
    subprocess.run(cmd)

In [None]:
def download_video_from_s3(video: str, source_bucket: str):
    """
    Download specified videos from the S3 bucket to local storage if they don't already exist.

    Parameters:
    - videos: List of video paths to process.
    - source_bucket: S3 bucket where videos are stored.
    """
    # Create a S3 client
    s3 = boto3.client('s3')

    local_path = video_path.replace("\\", "/")
    bucket_path = source_bucket + local_path

    # Check if the file already exists in local storage
    if not os.path.exists(local_path):
        print(f"Downloading {video_path}...")
        # Parse the bucket and object key from the S3 URL
        bucket_name = bucket_path.split('/')[2]
        object_key = "/".join(bucket_path.split('/')[3:])

        # Download the file
        with open(local_path, 'wb') as f:
            s3.download_fileobj(bucket_name, object_key, f)
        print(f"{video_path} downloaded successfully!")
    else:
        print(f"{video_path} already exists in local storage!")

## Processing time

In [None]:
for video_path in VIDEOS_TO_PROCESS:
    # Download the videos to local storage, but check if they exist already first
    download_video_from_s3(video_path, source_bucket=AUS_AFL_BUCKET)

    # Clip video into 1 minute clips
    clip_video(video_path, output_dir=TMP_DIR, clip_length=60)

    # Extract frames from clipped videos
    for video in find_files_with_ending(TMP_DIR, '.mp4'):
        extract_frames_from_video(video)

    # Remove images that don't contain the ball
    # TODO: to be implemented
