In [1]:
import shutil
from pathlib import Path

import cv2
import numpy as np
import onnxruntime as rt
import pandas as pd
from PIL import Image
from torchvision import transforms

In [31]:
original_path = Path("original")
modified_path = Path("modified")
session = rt.InferenceSession("models/minimal.onnx")

if not original_path.exists():
    raise LookupError()

if not modified_path.exists():
    shutil.copytree(original_path, modified_path)

recordings: list[tuple[str, pd.DataFrame]] = []

for original_csv_path in sorted(original_path.glob("*.csv")):
    name = Path(original_csv_path).name[:-4]
    names = ("image_id", "forward", "steering")

    original_df = pd.read_csv(original_csv_path, header=None, names=names)
    modified_df = pd.read_csv(modified_path / f"{name}.csv", names=names)

    original_df["modified_steering"] = modified_df["steering"]

    original_df["image"] = original_df["image_id"].apply(
        lambda image_id: cv2.imread(str(original_path / name / f"{image_id:04}.jpg"))
    )

    def preprocess(img):
        def region_of_interest(image):
            height, width = image.shape[:2]
            # Define the trapezoid vertices
            vertices = np.array(
                [
                    [
                        (0, height),  # Bottom-left
                        (width, height),  # Bottom-right
                        (width * 0.6, height * 0.4),  # Top-right
                        (width * 0.4, height * 0.4),
                    ]  # Top-left
                ],
                dtype=np.int32,
            )
            mask = np.zeros_like(image)
            cv2.fillPoly(mask, vertices, 255)
            masked_image = cv2.bitwise_and(image, mask)
            return masked_image

        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        blur = cv2.GaussianBlur(gray, (5, 5), 0)
        canny = cv2.Canny(blur, 60, 100)
        dilated = cv2.dilate(canny, kernel=(5, 5))

        masked = region_of_interest(dilated)
        normalized = masked / 255
        return np.expand_dims(normalized.astype(np.float32), axis=-1)[np.newaxis]

    def postprocess(detections: np.ndarray) -> np.ndarray:
        return np.array([0, detections[0, 0]], np.float32)

    def predict(img: np.ndarray) -> np.ndarray:
        input_name = session.get_inputs()[0].name
        inputs = preprocess(img)

        assert inputs.dtype == np.float32
        assert inputs.shape == (1, 224, 224, 1)
        detections = session.run(None, {input_name: inputs})[0]
        outputs = postprocess(detections)

        assert outputs.dtype == np.float32
        assert outputs.shape == (2,)
        assert outputs.max() < 1.0
        assert outputs.min() > -1.0

        return outputs

    original_df["prediction"] = original_df["image"].apply(predict)

    recordings.append((name, original_df))

In [43]:
# Controls
# Playback: SPACE, J, K
# Modify label: H, L
# Close and save: Q


recording_index = 0
frame_index = 0
is_pause = True

while True:
    name, recording = recordings[recording_index % len(recordings)]
    frame = recording.iloc[frame_index]

    img = frame["image"]
    image_id = frame["image_id"]
    original_steering = frame["steering"]
    modified_steering = frame["modified_steering"]
    predicted_steering = frame["prediction"][1] if "prediction" in frame else 0

    img = cv2.resize(img, np.multiply(img.shape[:-1][::-1], 4))

    def format_steering(value):
        return f"{'0' if value == 0 else 'L' if value > 0 else 'R'}{abs(value):.3f}"

    img = cv2.putText(
        img,
        f"{name}/{image_id:04}, O: {format_steering(original_steering)}, M: {format_steering(modified_steering)}, P: {format_steering(predicted_steering)}",
        (16, 48),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (0, 255, 0),
        2,
    )

    center = np.divide(img.shape[:-1], 2).astype(int)
    cv2.line(
        img,
        center - (0, 8),
        center - (int(original_steering * img.shape[1] / 2), 8),
        (255, 0, 0),
        2,
    )
    cv2.line(
        img,
        center,
        center - (int(modified_steering * img.shape[1] / 2), 0),
        (0, 255, 0),
        2,
    )
    cv2.line(
        img,
        center - (0, -8),
        center - (int(predicted_steering * img.shape[1] / 2), -8),
        (0, 0, 255),
        2,
    )

    cv2.imshow("Dataset", img)

    if not is_pause:
        if frame_index + 1 >= len(recording):
            recording_index += 1
            frame_index = 0
        else:
            frame_index += 1

    key = cv2.waitKey(30) & 0xFF
    if key == ord("q"):
        break
    elif key == ord(" "):
        is_pause = not is_pause
    elif key == ord("k"):
        if frame_index + 1 >= len(recording):
            recording_index += 1
            frame_index = 0
        else:
            frame_index += 1
    elif key == ord("j"):
        if frame_index - 1 < 0:
            recording_index -= 1
            frame_index = len(recordings[recording_index][1]) - 1
        else:
            frame_index -= 1
    elif key == ord("l"):
        recording.at[frame_index, "modified_steering"] = max(
            np.round(modified_steering - 0.1, 1), -1
        )
    elif key == ord("h"):
        recording.at[frame_index, "modified_steering"] = min(
            np.round(modified_steering + 0.1, 1), 1
        )

cv2.destroyAllWindows()

for name, recording in recordings:
    recording[["image_id", "forward", "modified_steering"]].to_csv(
        modified_path / f"{name}.csv", index=False, header=False
    )

IndexError: single positional indexer is out-of-bounds

In [None]:
import keras
import tensorflow as tf

In [None]:
flipped_X = np.array([tf.image.flip_left_right(x) for x in X])
all_X = np.concatenate([flipped_X, X])[:, 56:]

flipped_y = y * (1, -1)
all_y = np.concatenate([flipped_y, y])

augmentation = keras.Sequential(
    [keras.layers.RandomBrightness(0.2), keras.layers.RandomContrast(0.2)]
)