In [None]:
import itertools
import os
import shutil
from os.path import exists, isfile, join

import cv2
import numpy as np
import pandas as pd
import PIL
from IPython import get_ipython
from IPython.display import display


In [None]:
if "google.colab" in str(get_ipython()):
    from google.colab.patches import cv2_imshow

    imshow = cv2_imshow
else:

    def imshow(a):
        """
        img= img.clip(0, 255).astype('uint8')
        plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        """
        a = a.clip(0, 255).astype("uint8")
        if a.ndim == 3:
            if a.shape[2] == 4:
                a = cv2.cvtColor(a, cv2.COLOR_BGRA2RGBA)
            else:
                a = cv2.cvtColor(a, cv2.COLOR_BGR2RGB)
        display(PIL.Image.fromarray(a))


In [None]:
def augment_img(img: np.array) -> list[np.array]:
    img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.int16)
    light_changes = [1 + (change * 0.06) for change in range(-10, 6, 5)]
    blur_sigma = [0.0, 0.5, 1]

    new_images = []
    for (light, sigma) in itertools.product(light_changes, blur_sigma):
        if light_changes == 1 and blur_sigma == 0:
            continue
        new_img = img_hsv.copy()
        new_img[..., 2] = new_img[..., 2] * light
        new_img = np.clip(new_img, 0, 255).astype(np.uint8)
        new_img = cv2.cvtColor(new_img, cv2.COLOR_HSV2BGR)
        if sigma > 0.0:
            new_img = cv2.GaussianBlur(new_img, (9, 9), sigma)
        new_images.append(new_img)

    return new_images


### Data augmentation


In [None]:
original_path = "dataset/original"
augmented_path = "dataset/augmented"
column_names = [
    "run_no",
    "step_no",
    "augment_idx",
    "forward",
    "forward_next",
    "left",
    "left_next",
]
new_csv = []
catalog_counter = 1

if exists(augmented_path):
    shutil.rmtree(augmented_path)
os.mkdir(augmented_path)

for loc_name in os.listdir(original_path):
    if isfile(join(original_path, loc_name)):
        continue
    folder_path = join(original_path, loc_name)
    csv_path = folder_path + ".csv"

    augmented_folder_path = join(augmented_path, "{:03d}".format(catalog_counter))
    os.mkdir(augmented_folder_path)

    control_df = pd.read_csv(csv_path, header=None)
    step_count = control_df[0].count()
    for step_idx, step_no in enumerate(control_df[0]):
        img_name = "{:04d}.jpg".format(step_no)
        img_path = join(folder_path, img_name)
        img = cv2.imread(img_path)
        origin_name = "{:04d}_{:03d}.jpg".format(step_idx + 1, 0)
        origin_path = join(augmented_folder_path, origin_name)
        cv2.imwrite(origin_path, img)

        forward = control_df[1][step_idx]
        forward_next = (
            control_df[1][step_idx + 1] if step_idx < step_count - 1 else forward
        )
        left = control_df[2][step_idx]
        left_next = control_df[2][step_idx + 1] if step_idx < step_count - 1 else left

        new_csv.append(
            [catalog_counter, step_idx + 1, 0, forward, forward_next, left, left_next]
        )

        augmented_list = augment_img(img)
        for aug_idx, aug_img in enumerate(augmented_list):
            aug_name = "{:04d}_{:03d}.jpg".format(step_idx + 1, aug_idx + 1)
            aug_path = join(augmented_folder_path, aug_name)
            cv2.imwrite(aug_path, aug_img)

            new_csv.append(
                [
                    catalog_counter,
                    step_idx + 1,
                    aug_idx + 1,
                    forward,
                    forward_next,
                    left,
                    left_next,
                ]
            )

    catalog_counter += 1

new_control_df = pd.DataFrame(new_csv, columns=column_names)
new_control_df.to_csv(join(augmented_path, "control.csv"))


### Train/test split


In [None]:
rng = np.random.default_rng(42)
runs = new_control_df["run_no"].unique()
test_run = rng.choice(runs)

train_test = []
for run_no in runs:
    if run_no == test_run:
        train_test.append([run_no, "test"])
    else:
        train_test.append([run_no, "train"])

train_test_df = pd.DataFrame(train_test, columns=["run_no", "split"])
train_test_df.to_csv(join(augmented_path, "train_test.csv"))
