In [1]:
# cSpell:ignore codemirror ipython nbconvert pygments nbformat kernelspec #

In [None]:
from density_estimation import load_kinematic_states, KinematicStates
from assets import ControlType, ChangeParam
import json

def load_kinematic_states_and_result(
    load_dir: str, mem_diff=False, max_lateral_deviation: float = 5.0
) -> tuple[KinematicStates, float, float]:
    """Return the KinematicStates object and the lateral error of the simulation result.

    This read the given directory (the directory where the kinematic states are stored)
    To read the lateral deviation, however, it goes to read the appropriate directory next to the given directory.

    Returns:
    * KinematicStates: KinematicStates object
    * float: the lateral error of the simulation result. this is not for trained data
    * float: the lateral error of the simulation result. this is for trained data
    """
    kinematic_states = load_kinematic_states(
        control_type=None,
        change_param=None,
        index=None,
        load_dir=load_dir,
    )

    load_dir_for_result = load_dir.replace(ControlType.pp_eight.value, "pp_aided")
    if mem_diff:
        load_dir_for_result += "_mem_diff"
    load_dir_for_trained_result = load_dir_for_result.replace("_aided_sim_", "_aided_sim_trained_")

    with open(f"{load_dir_for_result}/auto_test_performance_result.json", "r") as f:
        data = json.load(f)

    with open(f"{load_dir_for_trained_result}/auto_test_performance_result.json", "r") as f:
        data_trained = json.load(f)

    total_abs_max_lateral_deviation = min(data.get("total_abs_max_lateral_deviation"), max_lateral_deviation)
    trained_total_abs_max_lateral_deviation = min(data_trained.get("total_abs_max_lateral_deviation"), max_lateral_deviation)
    return kinematic_states, total_abs_max_lateral_deviation, trained_total_abs_max_lateral_deviation


load_kinematic_states_and_result(
    "test_run_sim_20240607_163543/test_pure_pursuit_figure_eight_sim_steer_scaling_0th"
)

In [None]:
import numpy as np
from typing import Callable
from density_estimation import (
    Shape,
    calc_minimum_density_point,
    calc_percentage_of_density_below_threshold,
    ScottCoef,
    default_range,
)
from typing import Literal

class CalcMethodOfScalarIndex():
    name : Literal["minimum", "percentage"]
    """Which scalar index is to be calculated."""

    threshold : float
    """This is used only when the name is `percentage`."""

    def __init__(self, name : Literal["minimum", "percentage"], threshold : float = 0.0):
        self.name = name
        self.threshold = threshold

    def description(self) -> str:
        if self.name == "minimum":
            return "minimum value of KDE"
        if self.name == "percentage":
            return "percentage of KDE below threshold"


def calc_scalar_indexes(
    kinematic_states: KinematicStates,
    method: CalcMethodOfScalarIndex,
    fst: Literal["speed", "acc", "steer"],
    snd: Literal["speed", "acc", "steer"],
    fst_range: list[float],
    snd_range: list[float],
    bandwidths: list[float] | list[ScottCoef],
    shape: Callable[..., bool] = Shape.trivial,
) -> np.ndarray:
    """Return an ndarray of scalar indexes calculated by the kernel density estimation.

    This takes as a parameter the method (the minimum value in a particular region) used to compute the scalar index of kernel density.
    """
    array = np.array([])

    if method.name == "minimum":
        for bandwidth in bandwidths:
            min_point, min_val = calc_minimum_density_point(
                kinematic_states=kinematic_states,
                fst=fst,
                snd=snd,
                fst_range=fst_range,
                snd_range=snd_range,
                shape=shape,
                bandwidth=bandwidth,
            )
            array = np.append(array, min_val)

    if method.name == "percentage":
        for bandwidth in bandwidths:
            index = calc_percentage_of_density_below_threshold(
                kinematic_states=kinematic_states,
                threshold=method.threshold,
                fst=fst,
                snd=snd,
                fst_range=fst_range,
                snd_range=snd_range,
                shape=shape,
                bandwidth=bandwidth,
            )
            array = np.append(array, index)
        pass

    return array


kinematic_states_steer_scaling_0th = load_kinematic_states(
    control_type=ControlType.pp_eight,
    change_param=ChangeParam.steer_scaling,
    index=0
)

calc_scalar_indexes(
    kinematic_states=kinematic_states_steer_scaling_0th,
    method=CalcMethodOfScalarIndex(name="minimum"),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=[ScottCoef(i) for i in np.arange(0.5, 5.5, 0.5)],
    shape=lambda x, y: Shape.ellipse(x, y),
)

In [3]:
import matplotlib.pyplot as plt
from math import sqrt
import numpy as np
from typing import Callable
from density_estimation import (
    ScottCoef,
    default_range,
)
from typing import Literal

def plot_histogram(data : np.ndarray, xlabel : str, title="histogram"):
    bins = int(sqrt(data.size))
    plt.hist(data, bins=bins, edgecolor='black')
    plt.xlabel(xlabel)
    plt.ylabel('count')
    plt.title(title)
    plt.grid(True)
    plt.show()


In [None]:
from density_estimation import get_kde_scores

data_for_histogram, _ = get_kde_scores(
    kinematic_states=kinematic_states_steer_scaling_0th,
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    shape=lambda x, y: Shape.ellipse(x, y),
    point_number=30,
    bandwidth=0.5,
)

plot_histogram(data=data_for_histogram, xlabel="KDE score")

In [9]:
def plot_histogram_of_kde_scores(
    load_dir_list: list[str],
    fst: Literal["speed", "acc", "steer"],
    snd: Literal["speed", "acc", "steer"],
    shape : Callable[..., bool],
    bandwidth : float | ScottCoef,
):
    for load_dir in load_dir_list:
        kinematic_states = load_kinematic_states(load_dir=load_dir)
        data_for_histogram, _ = get_kde_scores(
            kinematic_states=kinematic_states,
            fst=fst,
            snd=snd,
            fst_range=default_range(fst),
            snd_range=default_range(snd),
            shape=shape,
            point_number=30,
            bandwidth=bandwidth,
        )
        plot_histogram(data=data_for_histogram, xlabel="KDE score", title=load_dir)

In [None]:
plot_histogram_of_kde_scores(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th"
        for i in range(0, 65)
    ],
    fst="speed",
    snd="acc",
    shape=lambda x, y: Shape.ellipse(x, y),
    bandwidth=0.5
)

In [None]:
calc_scalar_indexes(
    kinematic_states=kinematic_states_steer_scaling_0th,
    method=CalcMethodOfScalarIndex(name="percentage", threshold=0.04),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=[ScottCoef(i) for i in np.arange(0.5, 5.5, 0.5)],
    shape=lambda x, y: Shape.ellipse(x, y),
)

In [12]:
def correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list: list[str],
    method: CalcMethodOfScalarIndex,
    fst: Literal["speed", "acc", "steer"],
    snd: Literal["speed", "acc", "steer"],
    fst_range: list[float],
    snd_range: list[float],
    bandwidths: list[float] | list[ScottCoef],
    shape: Callable[..., bool] = Shape.trivial,
) -> np.ndarray:
    accumulation = np.array([])
    for load_dir in load_dir_list:
        # The one that was additionally learned is named "trained"
        kinematic_states, result, trained_result = load_kinematic_states_and_result(load_dir)
        scalar_indexes = calc_scalar_indexes(
            kinematic_states=kinematic_states,
            method=method,
            fst=fst,
            snd=snd,
            fst_range=fst_range,
            snd_range=snd_range,
            bandwidths=bandwidths,
            shape=shape,
        )

        # cSpell:ignore corcoef #
        # Add lateral errors to the end of the vector of scalar indexes for calculating the correlation coefficient with numpy.corcoef
        scalar_indexes = np.append(arr=scalar_indexes, values=[result, trained_result])

        if accumulation.size == 0:
            accumulation = accumulation.reshape(scalar_indexes.shape[0], -1)
        accumulation = np.hstack((accumulation, scalar_indexes.reshape(-1,1)))

    return np.corrcoef(accumulation)[-1,:-2]

In [28]:
import matplotlib.pyplot as plt

def list_of_scalar_index_and_error(
    load_dir_list: list[str],
    method: CalcMethodOfScalarIndex,
    fst: Literal["speed", "acc", "steer"],
    snd: Literal["speed", "acc", "steer"],
    fst_range: list[float],
    snd_range: list[float],
    bandwidth: float | ScottCoef,
    shape: Callable[..., bool] = Shape.trivial,
) -> np.ndarray:
    """Produce a data of scatter plot of the lateral deviation of the run results and the computed scalar indexes."""
    accumulation = np.array([])

    if method.name == "minimum":
        for load_dir in load_dir_list:
            # The one that was additionally learned is named "trained"
            kinematic_states, result, trained_result = load_kinematic_states_and_result(
                load_dir
            )
            min_point, min_val = calc_minimum_density_point(
                kinematic_states=kinematic_states,
                fst=fst,
                snd=snd,
                fst_range=fst_range,
                snd_range=snd_range,
                shape=shape,
                bandwidth=bandwidth,
            )
            accumulation = np.append(accumulation, [trained_result, min_val])

    if method.name == "percentage":
        for load_dir in load_dir_list:
            # The one that was additionally learned is named "trained"
            kinematic_states, result, trained_result = load_kinematic_states_and_result(
                load_dir
            )
            index = calc_percentage_of_density_below_threshold(
                kinematic_states=kinematic_states,
                threshold=method.threshold,
                fst=fst,
                snd=snd,
                fst_range=fst_range,
                snd_range=snd_range,
                shape=shape,
                bandwidth=bandwidth,
            )
            accumulation = np.append(accumulation, [trained_result, index])
    return accumulation.reshape(-1, 2)


def plot_scatter_acc(bandwidth : float | ScottCoef, method : CalcMethodOfScalarIndex) -> None:
    data = list_of_scalar_index_and_error (
            load_dir_list=[
                f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th"
                for i in range(0, 65)
            ],
            method=method,
            fst="speed",
            snd="acc",
            fst_range=default_range("speed"),
            snd_range=default_range("acc"),
            bandwidth=bandwidth,
            shape=lambda x, y: Shape.ellipse(x, y),
        )
    plt.scatter(data[:, 0], data[:, 1])

    plt.xlabel('Trained lateral error')
    plt.ylabel(method.description())

    plt.title('Kernel density and lateral error')

    plt.show()

In [None]:
plot_scatter_acc(bandwidth=0.2, method=CalcMethodOfScalarIndex(name="minimum"))

In [None]:
plot_scatter_acc(bandwidth=0.5, method=CalcMethodOfScalarIndex(name="percentage", threshold=0.1))

In [None]:
def plot_scatter_steer(bandwidth : float | ScottCoef, method: CalcMethodOfScalarIndex) -> None:
    coef = 4.76/2.79
    POINTS = np.array(
        [
            [1.0+0.3*5, coef * (0.15)],
            [4.0, coef * (0.15)],
            [11.0, 0.0],
            [1.0+0.3*5, coef * -(0.15)],
            [4, coef * -(0.15)],
        ]
    )
    data = list_of_scalar_index_and_error (
            load_dir_list=[
                f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_2th"
                for i in range(0, 65)
            ],
            method=method,
            fst="speed",
            snd="steer",
            fst_range=default_range("speed"),
            snd_range=default_range("steer"),
            bandwidth=bandwidth,
            shape=lambda x, y: Shape.convex_hull(POINTS, x, y),
        )
    plt.scatter(data[:, 0], data[:, 1])

    plt.xlabel('Trained lateral error')
    plt.ylabel(method.description())

    plt.title('Kernel density and lateral error')

    plt.show()

In [None]:
plot_scatter_steer(bandwidth=0.3, method=CalcMethodOfScalarIndex(name="minimum"))

In [None]:
plot_scatter_steer(bandwidth=0.3, method=CalcMethodOfScalarIndex(name="percentage", threshold=0.1))

In [None]:
correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th" for i in range(0, 65)
    ],
    method=CalcMethodOfScalarIndex(name="minimum"),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=np.arange(0.5, 5.5, 0.5),
    shape=lambda x, y: Shape.ellipse(x, y),
)

In [None]:
correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th" for i in range(0, 65)
    ],
    method=CalcMethodOfScalarIndex(name="percentage", threshold=0.04),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=np.arange(0.05, 0.55, 0.05),
    shape=lambda x, y: Shape.ellipse(x, y),
)

In [None]:
print(np.array([0.13464718, 0.12864314, 0.12072862, 0.10069864, 0.1145275 ,
       0.1129953 , 0.1039199 , 0.13580686, 0.1347425 , 0.12432162]))

In [None]:
print(correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th" for i in range(0, 65)
    ],
    method=CalcMethodOfScalarIndex(name="minimum"),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=np.arange(0.1, 1.1, 0.1),
    shape=lambda x, y: Shape.ellipse(x, y),
))

In [None]:
print(correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_2th" for i in range(5, 35)
    ],
    method=CalcMethodOfScalarIndex(name="minimum"),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=np.arange(0.1, 1.1, 0.1),
    shape=lambda x, y: Shape.ellipse(x, y),
))

In [None]:
print(correlation_between_scalar_indexes_and_lateral_errors(
    load_dir_list=[
        f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th" for i in range(5, 35)
    ],
    method=CalcMethodOfScalarIndex(name="minimum"),
    fst="speed",
    snd="acc",
    fst_range=default_range("speed"),
    snd_range=default_range("acc"),
    bandwidths=[ScottCoef(i) for i in np.arange(0.5, 5.5, 0.5)],
    shape=lambda x, y: Shape.ellipse(x, y),
))

In [None]:
coef = 4.76/2.79 #get_estimated_wheel_base_coef("test_param_search_0_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_2th")
for i in range(1):
    POINTS = np.array(
        [
            [1.0+0.3*5, coef * (0.15-0.01*i)],
            [4.0, coef * (0.15-0.01*i)],
            [11.0, 0.0],
            [1.0+0.3*5, coef * -(0.15-0.01*i)],
            [4, coef * -(0.15-0.01*i)],
        ]
    )

    corrcoef_steer = correlation_between_scalar_indexes_and_lateral_errors(
        load_dir_list=[
            f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_3th" for i in range(5, 35)
        ],
        method=CalcMethodOfScalarIndex(name="minimum"),
        fst="speed",
        snd="steer",
        fst_range=default_range("speed"),
        snd_range=default_range("steer"),
        bandwidths=np.arange(0.1, 1.1, 0.1),
        shape=lambda x, y: Shape.convex_hull(POINTS, x, y),
    )
    print(i,corrcoef_steer)

In [None]:
coef = 4.76/2.79 #get_estimated_wheel_base_coef("test_param_search_0_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_2th")
for i in range(1):
    POINTS = np.array(
        [
            [1.0+0.3*5, coef * (0.15-0.01*i)],
            [4.0, coef * (0.15-0.01*i)],
            [11.0, 0.0],
            [1.0+0.3*5, coef * -(0.15-0.01*i)],
            [4, coef * -(0.15-0.01*i)],
        ]
    )

    corrcoef_steer = correlation_between_scalar_indexes_and_lateral_errors(
        load_dir_list=[
            f"test_param_search_{i}_test_vehicle/test_pure_pursuit_figure_eight_sim_test_vehicle_2th" for i in range(5, 35)
        ],
        method=CalcMethodOfScalarIndex(name="minimum"),
        fst="speed",
        snd="steer",
        fst_range=default_range("speed"),
        snd_range=default_range("steer"),
        bandwidths=np.arange(0.1, 1.1, 0.1),
        shape=lambda x, y: Shape.convex_hull(POINTS, x, y),
    )
    print(i,corrcoef_steer)

In [7]:
import matplotlib.pyplot as plt

In [None]:
from density_estimation import visualize_speed_acc, visualize_speed_steer, ScottCoef

# cSpell:ignore nrows ncols #
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(18, 12), tight_layout=True)

ax1 : plt.Axes = axes[0]
ax2 : plt.Axes = axes[1]

fig, ax1 = visualize_speed_acc(fig=fig, ax=ax1, kinematic_states=kinematic_states_steer_scaling_0th, bandwidth=ScottCoef(0.6))
ax1.plot()

fig, ax2 = visualize_speed_steer(fig=fig, ax=ax2, kinematic_states=kinematic_states_steer_scaling_0th, bandwidth=ScottCoef(1.3))
ax2.plot()

fig.show()

In [None]:
from density_estimation import KinematicStates

kinematic_states_steer_scaling_0th.plot("speed", "acc", "steer")

In [None]:
kinematic_states_steer_scaling_0th.plot("acc", "steer")

In [None]:
from density_estimation import plot_kernel_density

kinematic_states_steer_scaling_0th.plot("speed", "acc").show()
plot_kernel_density(kinematic_states_steer_scaling_0th, "speed", "acc", [2, 10], [-2, 2])

In [None]:
calc_minimum_density_point(
    kinematic_states=kinematic_states_steer_scaling_0th,
    bandwidth=0.01,
    fst="speed",
    snd="steer",
    fst_range=[2, 10],
    snd_range=[-1.0, 1.0],
    shape=lambda x, y : Shape.triangle([4, -0.2], [4, 0.2], [10, 0], x, y),
)

In [None]:
kinematic_states_steer_scaling_0th.plot("speed", "steer").show()
plot_kernel_density(
    kinematic_states=kinematic_states_steer_scaling_0th,
    bandwidth=0.15,
    fst="speed",
    snd="steer",
    fst_range=[2, 10],
    snd_range=[-0.4, 0.2],
)

In [15]:
kinematic_states_vehicle = load_kinematic_states(
    control_type=ControlType.pp_eight,
    change_param=ChangeParam.vehicle_type,
    index=0,
)

In [None]:
kinematic_states_vehicle.plot("speed", "acc", "steer").show()

In [None]:
kinematic_states_vehicle.plot("speed", "steer").show()
plot_kernel_density(
    kinematic_states=kinematic_states_vehicle,
    fst="speed",
    snd="steer",
    fst_range=[2, 10],
    snd_range=[-0.3, 0.4],
)