In [1]:
# ! python -m venv .venv
# ! python.exe -m pip install -U --upgrade-strategy eager --no-cache-dir pip
# ! pip install albumentations opencv-python numpy

In [None]:
import os
import cv2
import albumentations as A
import numpy as np


def augment_dataset(input_dir, output_dir):
    print(f"1. Input directory: {input_dir}")
    print(f"2. Output directory: {output_dir}")

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"3. Created output directory: {output_dir}")
    else:
        print(f"3. Output directory already exists: {output_dir}")

    # Define individual transformations
    transformations = [
        (
            "brightness_contrast",
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2),
        ),
        (
            "hue_saturation",
            A.HueSaturationValue(
                hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20
            ),
        ),
        ("gaussian_noise", A.GaussNoise(var_limit=(10.0, 50.0))),
        ("gamma", A.RandomGamma(gamma_limit=(80, 120))),
        ("clahe", A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8))),
        ("blur", A.Blur(blur_limit=(3, 7))),
        ("gaussian_blur", A.GaussianBlur(blur_limit=(3, 7))),
        ("motion_blur", A.MotionBlur(blur_limit=(3, 7))),
        ("median_blur", A.MedianBlur(blur_limit=3)),
        ("sharpen", A.Sharpen(alpha=(0.2, 0.5), lightness=(0.5, 1.0))),
        ("emboss", A.Emboss(alpha=(0.2, 0.5), strength=(0.2, 0.7))),
        (
            "shadow",
            A.RandomShadow(
                num_shadows_lower=1,
                num_shadows_upper=2,
                shadow_dimension=5,
                shadow_roi=(0, 0.5, 1, 1),
            ),
        ),
        ("fog", A.RandomFog(fog_coef_lower=0.1, fog_coef_upper=0.3, alpha_coef=0.08)),
        (
            "sun_flare",
            A.RandomSunFlare(
                flare_roi=(0, 0, 1, 0.5),
                angle_lower=0,
                angle_upper=1,
                num_flare_circles_lower=6,
                num_flare_circles_upper=10,
                src_radius=400,
                src_color=(255, 255, 255),
            ),
        ),
        ("tone_curve", A.RandomToneCurve(scale=0.1)),
        (
            "color_jitter",
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        ),
        ("sepia", A.ToSepia()),
        ("channel_shuffle", A.ChannelShuffle()),
        ("rgb_shift", A.RGBShift(r_shift_limit=20, g_shift_limit=20, b_shift_limit=20)),
        ("posterize", A.Posterize(num_bits=4)),
        ("equalize", A.Equalize()),
        (
            "multiplicative_noise",
            A.MultiplicativeNoise(multiplier=(0.9, 1.1), per_channel=True),
        ),
        ("iso_noise", A.ISONoise(color_shift=(0.01, 0.05), intensity=(0.1, 0.5))),
    ]

    files = [
        f
        for f in os.listdir(input_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ]
    print(f"4. Total number of image files in input directory: {len(files)}")

    images_processed = 0
    for filename in files:
        image_path = os.path.join(input_dir, filename)
        print(f"5. Processing image: {filename}")

        image = cv2.imread(image_path)
        if image is None:
            print(f"Error: Failed to read image: {image_path}")
            continue

        # Read corresponding annotation file
        base_name = os.path.splitext(filename)[0]
        ann_path = os.path.join(input_dir, f"{base_name}.txt")
        if os.path.exists(ann_path):
            with open(ann_path, "r") as f:
                annotations = f.read()
            print(f"   Found annotation file: {ann_path}")
        else:
            print(f"   Warning: No annotation file found for {filename}")
            annotations = ""

        # Apply each transformation
        for transform_name, transform in transformations:
            augmented = transform(image=image)
            aug_image = augmented["image"]

            # Save augmented image
            aug_filename = f"{base_name}_{transform_name}.jpg"
            aug_path = os.path.join(output_dir, aug_filename)
            cv2.imwrite(aug_path, aug_image)

            # Copy original annotation file for augmented image
            if annotations:
                aug_ann_path = os.path.join(
                    output_dir, f"{base_name}_{transform_name}.txt"
                )
                with open(aug_ann_path, "w") as f:
                    f.write(annotations)

        # Also save the original image and annotation to the output directory
        cv2.imwrite(os.path.join(output_dir, filename), image)
        if annotations:
            with open(os.path.join(output_dir, f"{base_name}.txt"), "w") as f:
                f.write(annotations)

        images_processed += 1
        print(f"   Created {len(transformations)} augmented versions of {filename}")

    print(f"\nAugmentation completed!")
    print(f"Total images processed: {images_processed}")
    print(f"Total augmented images created: {images_processed * len(transformations)}")
    print(
        f"Total images in output (including originals): {images_processed * (len(transformations) + 1)}"
    )

    # List files in output directory
    output_files = os.listdir(output_dir)
    print(f"Total files in output directory: {len(output_files)}")
    print(f"First 5 files in output directory: {output_files[:5]}")


# Run the augmentation function
input_dir = "data/Original_Data"
output_dir = "data/Augmented_Data"
augment_dataset(input_dir, output_dir)

In [2]:
import os
import random
import shutil
import re


def split_dataset(source_dir, modelling_dir, train_ratio=0.75):
    # Create train and test directories if they don't exist
    for split in ["train", "test"]:
        os.makedirs(os.path.join(modelling_dir, "images", split), exist_ok=True)
        os.makedirs(os.path.join(modelling_dir, "labels", split), exist_ok=True)

    # Get all files in the source directory
    all_files = os.listdir(source_dir)

    # Group files by their base name (without augmentation suffix and extension)
    file_groups = {}
    for file in all_files:
        # Use regex to extract the base name
        match = re.match(r"(DJI_\d+_\d+_D).*", file)
        if match:
            base_name = match.group(1)
            if base_name not in file_groups:
                file_groups[base_name] = []
            file_groups[base_name].append(file)

    # Randomly split the groups
    all_groups = list(file_groups.keys())
    random.shuffle(all_groups)
    split_index = int(len(all_groups) * train_ratio)
    train_groups = all_groups[:split_index]
    test_groups = all_groups[split_index:]

    # Move files to their respective directories
    for groups, split in [(train_groups, "train"), (test_groups, "test")]:
        for group in groups:
            for file in file_groups[group]:
                source_path = os.path.join(source_dir, file)
                if file.lower().endswith((".jpg", ".jpeg", ".png", ".tif", ".tiff")):
                    dest_path = os.path.join(modelling_dir, "images", split, file)
                elif file.lower().endswith(".txt"):
                    dest_path = os.path.join(modelling_dir, "labels", split, file)
                else:
                    continue  # Skip files that are neither images nor labels
                shutil.move(source_path, dest_path)

    print(
        f"Dataset split complete. {len(train_groups)} groups in training, {len(test_groups)} groups in testing."
    )


# Usage
source_directory = "data/Augmented Data"
modelling_directory = "data/Modelling"

split_dataset(source_directory, modelling_directory)

Dataset split complete. 30 groups in training, 10 groups in testing.
