In [1]:
import os
import shutil
import numpy as np

from pathlib import Path
from datetime import datetime
from bisect import bisect_left, bisect_right

validation_percentage = 0.1

In [2]:
# delete all files and subfolders in ./data_training
def delete_all_contents(directory):
    # Check if the directory exists
    if os.path.exists(directory):
        # Iterate over each item in the directory
        for item in os.listdir(directory):
            item_path = os.path.join(directory, item)
            # Check if it is a file or directory and delete appropriately
            if os.path.isfile(item_path) or os.path.islink(item_path):
                os.remove(item_path)
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path) 

delete_all_contents('./data_training')

In [3]:
# summarize all consecutive empty / occupied images as parking periods, idea being that each period is denoting a parking event or a period when the spot was empty
def list_files(directory):
    # returns all files sorted by their timestamps
    files = [f for f in Path(directory).rglob('*.jpg') if f.is_file()]
    files.sort(key=lambda x: datetime.strptime(x.stem, '%Y-%m-%d-%H%M%S'))
    return files

def extract_timestamps(files):
    # extracts and sorts timestamps for fast binary search
    timestamps = sorted(datetime.strptime(f.stem, '%Y-%m-%d-%H%M%S') for f in files)
    return timestamps

def within_period(last_datetime, current_datetime, opposite_sorted_timestamps):
    # efficient check for any timestamps in the specified range using binary search
    start_idx = bisect_right(opposite_sorted_timestamps, last_datetime)
    end_idx = bisect_left(opposite_sorted_timestamps, current_datetime)
    return start_idx == end_idx

def group_into_periods(files, opposite_sorted_timestamps):
    periods = []
    current_period = []

    if not files:
        return periods

    current_period.append(files[0])
    last_datetime = datetime.strptime(files[0].stem, '%Y-%m-%d-%H%M%S')

    for file in files[1:]:
        current_datetime = datetime.strptime(file.stem, '%Y-%m-%d-%H%M%S')
        if within_period(last_datetime, current_datetime, opposite_sorted_timestamps):
            current_period.append(file)
        else:
            periods.append(current_period)
            current_period = [file]
        last_datetime = current_datetime

    if current_period:
        periods.append(current_period)

    return periods

empty_files = list_files('./data_labeled/empty')
occupied_files = list_files('./data_labeled/occupied')
empty_timestamps = extract_timestamps(empty_files)
occupied_timestamps = extract_timestamps(occupied_files)

empty_periods = group_into_periods(empty_files, occupied_timestamps)
occupied_periods = group_into_periods(occupied_files, empty_timestamps)

In [4]:
def copy_and_preprocess(file, dest_path):
    shutil.copy(file, dest_path)

# for each occupied period, copy x images to the ./data_training/occupied folder
def copy_files_to_training_folder(files, training_folder, x):
    Path(training_folder).mkdir(parents=True, exist_ok=True)

    for file in files:
        dest_path = Path(training_folder) / file.name
        copy_and_preprocess(file, dest_path)

def copy_files_to_validation_folder(files, validation_folder):
    Path(validation_folder).mkdir(parents=True, exist_ok=True)
    [copy_and_preprocess(file, validation_folder) for file in files]

for idx, period in enumerate(occupied_periods):
    if idx % int((len(occupied_periods) * validation_percentage)) == 0:
        copy_files_to_validation_folder(period, "./data_training/validation/occupied")
    else:
        copy_files_to_training_folder(period, "./data_training/training/occupied", x=5)

for idx, period in enumerate(empty_periods):
    if idx % int((len(empty_periods) * validation_percentage)) == 0:
        copy_files_to_validation_folder(period, "./data_training/validation/empty")
    else:
        copy_files_to_training_folder(period, "./data_training/training/empty", x=5)

# copy validation_percentage of the ./data_labeled/undefined files into ./data_training/validation/undefined
# Path('./data_training/validation/undefined').mkdir(parents=True, exist_ok=True)
# Path('./data_training/training/undefined').mkdir(parents=True, exist_ok=True)
# undefined_files = [f for f in Path('./data_labeled/undefined').rglob('*.jpg') if f.is_file()]
# for idx, file in enumerate(undefined_files):
#     if idx % int((len(undefined_files) * validation_percentage)) == 0:
#         copy_and_preprocess(file, Path("./data_training/validation/undefined") / file.name)
#     else:
#         copy_and_preprocess(file, Path("./data_training/training/undefined") / file.name)
