In [None]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as plticker
import os
import seaborn as sns

from matplotlib.colors import ListedColormap
from pathlib import Path

In [None]:
# Folder
mission_path = os.path.join(
    Path().home(), "digiforest_mission_data/2023-09-06-21-16-47"
)
mission_path = "/media/matias/T7/research/2024-autonomous-legged-forester-autonomy/2024-02-20-09-40-24-mission-forest-of-dean"

## Matplotlib config

In [None]:
cm = 1 / 2.54
plot_width = 8.89 * cm
plot_height = 8 * cm

plt.rcParams["font.size"] = 8

n_colors = 10

### Color palettes

#### Color palette

In [None]:
color_palette = sns.color_palette("colorblind", n_colors=n_colors, as_cmap=False)
color_palette_str = [
    "blue",
    "orange",
    "green",
    "red",
    "pink",
    "brown",
    "light_pink",
    "gray",
    "yellow",
    "light_blue",
]
color_palette_str = {k: v for k, v in zip(color_palette_str, color_palette)}
mpl_colorblind_cmap = ListedColormap(color_palette)

# Okabe-Ito colormap: https://clauswilke.com/dataviz/color-pitfalls.html
okabeito_palette_str = {
    "orange": np.array([230, 159, 0]) / 255,
    "light_blue": np.array([86, 180, 233]) / 255,
    "green": np.array([0, 158, 115]) / 255,
    "yellow": np.array([240, 228, 66]) / 255,
    "blue": np.array([0, 114, 178]) / 255,
    "red": np.array([213, 94, 0]) / 255,
    "pink": np.array([204, 121, 167]) / 255,
    "black": np.array([0, 0, 0]) / 255,
}
okabeito_palette = okabeito_palette_str.values()
mpl_okabeito_cmap = ListedColormap(okabeito_palette)

#### Grayscale palette

In [None]:
gray_palette = [(c, c, c) for c in np.linspace(0, 1, n_colors + 1)]

gray_palette_str = {f"{n_colors - p}0": v for p, v in enumerate(gray_palette)}
gray_palette_str["black"] = gray_palette_str["100"]
gray_palette_str["white"] = gray_palette_str["00"]

mpl_gray_cmap = ListedColormap(gray_palette)

In [None]:
blue_palette = sns.light_palette(
    color_palette_str["blue"], reverse=True, n_colors=n_colors + 1
)
blue_palette_str = {f"{n_colors - p}0": v for p, v in enumerate(blue_palette)}
mpl_blue_cmap = ListedColormap(blue_palette)

#### Divergent palette

In [None]:
div_palette = sns.blend_palette(
    [color_palette[0], [1, 1, 1], color_palette[3]], n_colors=n_colors, as_cmap=False
)
mpl_div_cmap = ListedColormap(div_palette)

In [None]:
gradient = np.linspace(0, 1, n_colors)
gradient = np.vstack((gradient, gradient))

fig, ax = plt.subplots(
    5, 1, figsize=(2, 3), constrained_layout=False, dpi=200, sharex=True
)
ax[0].imshow(gradient, aspect="auto", cmap=mpl_colorblind_cmap)
ax[1].imshow(gradient, aspect="auto", cmap=mpl_okabeito_cmap)
ax[2].imshow(gradient, aspect="auto", cmap=mpl_gray_cmap)
ax[3].imshow(gradient, aspect="auto", cmap=mpl_blue_cmap)
ax[4].imshow(gradient, aspect="auto", cmap=mpl_div_cmap)

gradient = np.linspace(0, 1, n_colors)[None].repeat(n_colors, axis=0)
for i in range(n_colors):
    gradient[i] = np.roll(gradient[i], i)

## Main parameters

In [None]:
# Other params
BASE_INVERTED = True
SENSOR_RANGE_NOMINAL = 100  # radius, in meters, VLP-16
SENSOR_RANGE_EFFECTIVE = 15  # radius, in meters, VLP-16

FOOT_RADIUS = 0.03  # meters
FOOT_AREA = np.pi * FOOT_RADIUS**2  # square meters

REFERENCE_FRAMES = ["base_vilens", "map"]
QUERY_FRAMES = ["LF_FOOT", "LH_FOOT", "RF_FOOT", "RH_FOOT"]

FOOT_MAPPER = {}


if BASE_INVERTED:
    FOOT_MAPPER["LF_FOOT"] = "right hind"
    FOOT_MAPPER["LH_FOOT"] = "right front"
    FOOT_MAPPER["RF_FOOT"] = "left hind"
    FOOT_MAPPER["RH_FOOT"] = "left front"
else:
    FOOT_MAPPER["LF_FOOT"] = "left front"
    FOOT_MAPPER["LH_FOOT"] = "left hind"
    FOOT_MAPPER["RF_FOOT"] = "right front"
    FOOT_MAPPER["RH_FOOT"] = "right hind"

FOOT_SYMBOL = {}
FOOT_SYMBOL["LF_FOOT"] = "d"  # diamond
FOOT_SYMBOL["LH_FOOT"] = "o"  # sphere
FOOT_SYMBOL["RF_FOOT"] = "^"  # triangle
FOOT_SYMBOL["RH_FOOT"] = "P"  # star

FOOT_COLOR = {}
FOOT_COLOR["LF_FOOT"] = color_palette_str["blue"]
FOOT_COLOR["LH_FOOT"] = color_palette_str["orange"]
FOOT_COLOR["RF_FOOT"] = color_palette_str["green"]
FOOT_COLOR["RH_FOOT"] = color_palette_str["red"]

## Load data

### Load pose files

In [None]:
from scipy.spatial.transform import Rotation as R


def read_poses_file(filename, base_inverted=False):
    df = pd.read_csv(filename)
    df = df.drop_duplicates()

    # Generate timestamp from sec and nsec
    ts = 1e9 * df["sec"] + df["nsec"]  # In nanoseconds
    df.index = pd.to_datetime(ts)
    df = df[~df.index.duplicated(keep="first")]

    # Parse data
    poses = {}
    for ts, x, y, z, qx, qy, qz, qw in zip(
        df.index, df["x"], df["y"], df["z"], df["qx"], df["qy"], df["qz"], df["qw"]
    ):
        poses[f"{ts:.10f}"] = np.eye(4)
        poses[f"{ts:.10f}"][0:3, 3] = np.array([x, y, z])
        poses[f"{ts:.10f}"][0:3, 0:3] = R.from_quat([qx, qy, qz, qw]).as_matrix()

        # TODO: fix base inversion

    return df, poses


df_state_poses, poses_list = read_poses_file(
    os.path.join(mission_path, "states/state_pose_data.csv"),
    base_inverted=BASE_INVERTED,
)

### Load twist files

In [None]:
def read_twist_file(filename, base_inverted=False):
    df = pd.read_csv(filename)
    df = df.drop_duplicates()

    # Generate timestamp from sec and nsec
    ts = 1e9 * df["sec"] + df["nsec"]  # In nanoseconds
    df.index = pd.to_datetime(ts)
    df = df[~df.index.duplicated(keep="first")]

    # Correct twist due to base inversion
    if BASE_INVERTED:
        df["vx"] *= -1
        df["vy"] *= -1

    # Speeds
    df["lin_speed"] = (df["vx"] ** 2 + df["vy"] ** 2).pow(1.0 / 2)
    df["ang_speed"] = df["wz"].abs()

    return df


df_state_twist = read_twist_file(
    os.path.join(mission_path, "states/state_twist_data.csv"),
    base_inverted=BASE_INVERTED,
)
df_reference_twist = read_twist_file(
    os.path.join(mission_path, "states/reference_twist_data.csv"),
    base_inverted=BASE_INVERTED,
)
df_operator_twist = read_twist_file(
    os.path.join(mission_path, "states/operator_twist_data.csv"),
    base_inverted=BASE_INVERTED,
)

### Load other operator's signals

In [None]:
def read_param_change_file(filename, base_inverted=False):
    df = pd.read_csv(filename)
    df = df.drop_duplicates()

    # Generate timestamp from sec and nsec
    ts = 1e9 * df["sec"] + df["nsec"]  # In nanoseconds
    df.index = pd.to_datetime(ts)
    df = df[~df.index.duplicated(keep="first")]

    return df


# df_local_planner_param = read_param_change_file(
#     os.path.join(mission_path, "states/local_planner_param_data.csv"),
# )
# df_rmp_param = read_param_change_file(
#     os.path.join(mission_path, "states/rmp_param_data.csv"),
# )

### Load TFs

In [None]:
df_tf = {}
for parent in REFERENCE_FRAMES:
    for child in QUERY_FRAMES:
        prefix = f"{parent}_{child}"
        df, poses = read_poses_file(
            os.path.join(mission_path, f"states/{prefix}_data.csv"),
            base_inverted=BASE_INVERTED,
        )

        df_tf[prefix] = df.copy(deep=True).drop_duplicates()

In [None]:
# Join all indices
joined_indices = pd.Index(df_tf[prefix].index)
for k, v in df_tf.items():
    joined_indices = joined_indices.union(v.index)

joined_indices = joined_indices.drop_duplicates()

for k, v in df_tf.items():
    df_tf[k] = df_tf[k].reindex(
        index=joined_indices,
    )
    df_tf[k] = df_tf[k].interpolate(method="index")

In [None]:
import scipy.signal as signal

total_footsteps = 0
footsteps = {}
for foot in ["LF_FOOT", "RF_FOOT", "LH_FOOT", "RH_FOOT"]:
    prefix = f"base_vilens_{foot}"
    off = 0
    dt = -1
    y = df_tf[prefix]["z"][off : off + dt]
    sy = df_tf[prefix]["z"].rolling(5, center=True).mean()[off : off + dt]
    t = df_tf[prefix]["z"].index[off : off + dt]

    peaks, properties = signal.find_peaks(-sy, distance=10, prominence=0.01)
    footsteps[foot] = peaks

    print(
        "phase period: ",
        1
        / df_tf["base_vilens_LF_FOOT"]["z"][peaks]
        .index.to_series()
        .diff()
        .mean()
        .total_seconds(),
    )
    # ax.plot(t, y, linewidth=0.5)
    # ax.plot(t, sy, linewidth=0.5)
    # ax.plot(t[peaks], y[peaks], marker="x", linewidth=0)
    total_footsteps += peaks.size

    print(f"num_footsteps: {peaks.size}")


# nout = 100
# w = np.linspace(0.001, 10, nout)
# pgram = signal.lombscargle(df_tf["base_vilens_LF_FOOT"]["z"].index[0:300], df_tf["base_vilens_LF_FOOT"]["z"][0:300], w)
# plt.plot(pgram)
# plt.show()
# pgram

### Load SLAM graph
Required for distance computation and coverage

In [None]:
df_slam, slam_graph = read_poses_file(
    os.path.join(mission_path, "states/slam_graph_data.csv")
)

### Align time series

In [None]:
# Extend indices
joined_indices = df_state_twist.index.union(df_reference_twist.index).drop_duplicates()
joined_indices = joined_indices.union(df_operator_twist.index).drop_duplicates()

joined_indices = joined_indices.union(df_local_planner_param.index).drop_duplicates()
joined_indices = joined_indices.union(df_rmp_param.index).drop_duplicates()

df_state_twist = df_state_twist.reindex(index=joined_indices)
df_state_twist = df_state_twist.interpolate(method="index")

df_operator_twist = df_operator_twist.reindex(index=joined_indices)
df_operator_twist = df_operator_twist.interpolate(method="index")

df_reference_twist = df_reference_twist.reindex(index=joined_indices)
df_reference_twist = df_reference_twist.interpolate(method="index")

# df_local_planner_param = df_local_planner_param.reindex(index=joined_indices)
# df_rmp_param = df_rmp_param.reindex(index=joined_indices)

## Estimate metrics

### Operator interventions
This is required for autonomy metrics

In [None]:
# Interventions from safety officer
df_state_twist["safety_intervention"] = df_operator_twist["lin_speed"] > 0.0
df_state_twist["safety_intervention"] = df_state_twist[
    "safety_intervention"
].interpolate(method="pad")

# # Interventions from forestry operator
# df_state_twist["operator_intervention"] = (~df_local_planner_param["sec"].isna()) + (
#     ~df_rmp_param["sec"].isna()
# )

In [None]:
plt.plot(df_state_twist["safety_intervention"])
# plt.plot(df_operator_twist["lin_speed"] > 0.0)

#### Estimate covered area

In [None]:
odom_points = df_state_poses[["x", "y"]].to_numpy()
slam_points = df_slam[["x", "y"]].to_numpy()

import shapely
import shapely.plotting

sp_odom_points = shapely.MultiPoint(odom_points)
sp_slam_points = shapely.MultiPoint(slam_points)
# sp_slam_hull = sp_slam_points.convex_hull

sp_sensing_area_nominal = sp_slam_points.buffer(SENSOR_RANGE_NOMINAL)
sp_sensing_area_effective = sp_slam_points.buffer(SENSOR_RANGE_EFFECTIVE)

#### 

### Normalize time

In [None]:
ref_ts = df_state_twist.index[0]

df_state_poses.index = df_state_poses.index - ref_ts
df_state_twist.index = df_state_twist.index - ref_ts
df_reference_twist.index = df_reference_twist.index - ref_ts
df_operator_twist.index = df_operator_twist.index - ref_ts
df_slam.index = df_slam.index - ref_ts

### Mission statistics

In [None]:
import scipy.integrate as integrate

stats = {}

# Constants
stats["sqm_to_ha"] = 0.0001
stats["sec_to_min"] = 1 / 60
stats["sec_to_hour"] = 1 / 3600

# Speed
stats["max_lin_speed"] = df_state_twist["lin_speed"].max().item()
stats["min_lin_speed"] = df_state_twist["lin_speed"].min().item()
stats["mean_lin_speed"] = df_state_twist["lin_speed"].mean().item()
stats["std_lin_speed"] = df_state_twist["lin_speed"].std().item()

stats["mean_ang_speed"] = df_state_twist["ang_speed"].mean().item()
stats["std_ang_speed"] = df_state_twist["ang_speed"].std().item()

# Distance walked
stats["distance_m"] = (
    df_slam[["x", "y", "z"]]
    .diff()
    .apply(lambda values: sum([v**2 for v in values]), axis=1)
    .sum()
).item()

# Footsteps
stats["footsteps"] = total_footsteps
stats["footsteps_area_m2"] = total_footsteps * FOOT_AREA

# Mission time
stats["time_sec"] = (df_slam.index[-1] - df_slam.index[0]).total_seconds()

# Interventions
stats["safety_intervention_sec"] = integrate.simpson(
    df_state_twist["safety_intervention"], df_state_twist.index.total_seconds()
).item()

# stats["operator_intervention_sec"] = integrate.simpson(
#     df_state_twist["operator_intervention"], df_state_twist.index.total_seconds()
# ).item()

# Percentaje of interventions
stats["safety_intervention_perc"] = (
    stats["safety_intervention_sec"] / stats["time_sec"] * 100
)
# stats["operator_intervention_perc"] = (
#     stats["operator_intervention_sec"] / stats["time_sec"] * 100
# )

# Area covered
stats["sensor_range_nominal_m"] = SENSOR_RANGE_NOMINAL
stats["sensed_area_nominal_m2"] = sp_sensing_area_nominal.area
stats["sensed_area_nominal_ha"] = sp_sensing_area_nominal.area * stats["sqm_to_ha"]

stats["sensor_range_effective_m"] = SENSOR_RANGE_EFFECTIVE
stats["sensed_area_effective_m2"] = sp_sensing_area_effective.area
stats["sensed_area_effective_ha"] = sp_sensing_area_effective.area * stats["sqm_to_ha"]

# Hectares per second
stats["ha_per_sec"] = stats["sensed_area_effective_ha"] / stats["time_sec"]
stats["ha_per_min"] = stats["sensed_area_effective_ha"] / (
    stats["time_sec"] * stats["sec_to_min"]
)
stats["ha_per_hour"] = stats["sensed_area_effective_ha"] / (
    stats["time_sec"] * stats["sec_to_hour"]
)

# Print
for k, v in stats.items():
    print(f"{k:>8}: {v:<20.2f}")

# Save as YAML
import yaml

file = open(os.path.join(mission_path, "mission_report.yaml"), "w")
yaml.dump(stats, file)
file.close()

In [None]:
integrate.simpson(
    df_state_twist["safety_intervention"], df_state_twist.index.total_seconds()
)

## Plots

### Covered area

In [None]:
from digiforest_analysis.utils.plotting import lighten

# Plotting
plot_width = 15 * cm
plot_height = 15 * cm
alpha = 0.7

fig, ax = plt.subplots(
    1, 1, figsize=(plot_width, plot_height), constrained_layout=False, dpi=300
)
# Axes
ax.set_axisbelow(True)
ax.set_aspect("equal")
ax.grid(which="major", color=gray_palette_str["20"], linewidth=0.7)
ax.grid(which="minor", color=gray_palette_str["10"], linestyle=":", linewidth=0.5)
ax.minorticks_on()


# shapely.plotting.plot_points(
#     sp_odom_points,
#     markersize=1,
#     marker=".",
#     color=gray_palette[1],
#     alpha=0.5,
#     fillstyle="full",
#     linewidth=0,
#     label="Odometry",
# ax=ax,
# )

# Plot sensor polygon
shapely.plotting.plot_polygon(
    sp_sensing_area_nominal,
    add_points=False,
    linewidth=0,
    alpha=alpha,
    color=blue_palette_str["30"],
    label=f"Nominal sensing range ({SENSOR_RANGE_NOMINAL} m)",
    ax=ax,
)

shapely.plotting.plot_polygon(
    sp_sensing_area_effective,
    add_points=False,
    linewidth=0,
    alpha=alpha,
    color=blue_palette_str["50"],
    label=f"Effective sensing range ({SENSOR_RANGE_EFFECTIVE} m)",
)


# shapely.plotting.plot_polygon(hull, add_points=False)

# Add footsteps
# for i, (foot, v) in enumerate(footsteps.items()):
#     prefix = f"map_vilens_{foot}"
#     ax.scatter(
#         df_tf[prefix]["x"][v],
#         df_tf[prefix]["y"][v],
#         s=2,
#         marker=FOOT_SYMBOLS[foot],
#         edgecolor="none",
#         alpha=1,
#         label=FOOT_MAPPER[foot],
#         color=color_palette[i],
#     )

list_arrays = [
    np.array((geom.xy[0][0], geom.xy[1][0])) for geom in sp_slam_points.geoms
]
sp_slam_line = shapely.LineString(list_arrays)
shapely.plotting.plot_line(
    sp_slam_line,
    linewidth=1,
    color=gray_palette_str["80"],
    alpha=1.0,
    add_points=False,
    ax=ax,
)

# Plot points
shapely.plotting.plot_points(
    sp_slam_points,
    markersize=2,
    marker="o",
    color=gray_palette_str["80"],
    alpha=1.0,
    fillstyle="full",
    label="SLAM path",
)


lgnd = ax.legend(edgecolor=(1, 1, 1, 0), framealpha=0.9, loc=(0, 1.01), ncol=2)
for handle in lgnd.legend_handles:
    try:
        handle.set_sizes([40.0])
    except Exception as e:
        print(e)
for handle in lgnd.get_lines():
    handle.set_linewidth(1)


ax.set_xlabel("x [m]")
ax.set_ylabel("y [m]")
ax.margins(x=0.1, y=0.1)

fig.set_tight_layout(True)
fig.savefig(os.path.join(mission_path, "sensed_area.pdf"))

### Robot velocity

In [None]:
smoothing_window = "3000ms"
linewidth = 1

plot_width = 15 * cm
plot_height = 6 * cm

fig, ax = plt.subplots(
    1,
    1,
    figsize=(plot_width, plot_height),
    constrained_layout=False,
    dpi=300,
    sharex=True,
)
# Axes
ax.set_axisbelow(True)
# ax.set_aspect("equal")
ax.grid(which="major", color=gray_palette_str["20"], linewidth=0.7)
ax.grid(which="minor", color=gray_palette_str["10"], linestyle=":", linewidth=0.5)
ax.minorticks_on()

ax.plot(
    df_state_twist.index.total_seconds(),
    df_state_twist["lin_speed"].rolling(smoothing_window, center=True).mean(),
    label="Robot speed",
    linewidth=linewidth,
    color=color_palette_str["blue"],
)
ax.plot(
    df_reference_twist.index.total_seconds(),
    df_reference_twist["lin_speed"].rolling(smoothing_window, center=True).mean(),
    label="Local planner reference",
    linewidth=linewidth,
    color=color_palette_str["orange"],
)

# ax[1].plot(
#     df_operator_twist.index.total_seconds(),
#     df_operator_twist["lin_speed"].rolling(smoothing_window, center=True).mean(),
#     label="Operator command",
#     linewidth=linewidth,
#     color=color_palette[2],
# )

# if df_state_twist["operator_intervention"].any():
#     ax.fill_between(
#         df_state_twist["operator_intervention"].index.total_seconds(),
#         df_state_twist["operator_intervention"].index.total_seconds() * 0,
#         df_state_twist["operator_intervention"],
#         label="Operator action",
#         linewidth=1,
#         linestyle="-",
#         color=gray_palette_str["70"],
#         alpha=1.0,
#     )

if df_state_twist["safety_intervention"].any():
    ax.fill_between(
        df_state_twist["safety_intervention"].index.total_seconds(),
        df_state_twist["safety_intervention"].index.total_seconds() * 0,
        df_state_twist["safety_intervention"],
        label="Safety intervention",
        linewidth=0,
        color=gray_palette_str["40"],
        alpha=0.7,
    )


# ax[1].plot(
#     df_reference_twist.index.total_seconds(),
#     (df_reference_twist["lin_speed"].rolling(smoothing_window, center=True).mean() - df_state_twist["lin_speed"].rolling(smoothing_window, center=True).mean()),
#     label="Operator command",
#     linewidth=linewidth,
#     color=color_palette[3],
# )

# Legends
lgnd = ax.legend(edgecolor=(1, 1, 1, 0), framealpha=0.9, loc=(0, 1.03), ncol=2)
for handle in lgnd.legend_handles:
    try:
        handle.set_sizes([40.0])
    except Exception as e:
        print(e)

for handle in lgnd.get_lines():
    handle.set_linewidth(1)


# ax.set_title('Computation Time')

ax.margins(x=0, y=0)
ax.set_ylabel("Speed [m/s]")
ax.set_xlabel("Time [s]")

# ax[1].margins(x=0, y=0)
# ax[1].set_ylabel("Speed [m/s]")
# ax.set_xlabel("Time [s]")

# Export
fig.set_tight_layout(True)
fig.savefig(os.path.join(mission_path, "mission_velocity.pdf"))

### Autonomy plot

### Plot footsteps

In [None]:
# Plotting
plot_width = 20 * cm
plot_height = 15 * cm
alpha = 0.8

fig, ax = plt.subplots(
    1,
    1,
    figsize=(plot_width, plot_height),
    constrained_layout=False,
    dpi=300,
)

# Axes
# [ax.spines[side].set_visible(False) for side in ax.spines] # Remove borders of plot
ax.set_axisbelow(True)
ax.set_aspect("equal")
ax.grid(which="major", color=gray_palette_str["20"], linewidth=0.7)
ax.grid(which="minor", color=gray_palette_str["10"], linestyle=":", linewidth=0.5)
ax.minorticks_on()

# Add footsteps
for i, (foot, v) in enumerate(footsteps.items()):
    prefix = f"map_{foot}"
    ax.scatter(
        df_tf[prefix]["x"][v],
        df_tf[prefix]["y"][v],
        s=5,
        marker=FOOT_SYMBOL[foot],
        edgecolor="none",
        alpha=alpha,
        label=FOOT_MAPPER[foot],
        color=FOOT_COLOR[foot],
    )

lgnd = ax.legend(edgecolor=(1, 1, 1, 0), framealpha=0.9, loc=(0.01, 1.01), ncol=4)
for handle in lgnd.legend_handles:
    handle.set_sizes([40.0])

# ax.set_title("Footsteps")
ax.set_xlabel("x [m]")
ax.set_ylabel("y [m]")
ax.margins(x=0.15, y=0.1)

loc = plt.MultipleLocator(10.0)  # this locator puts ticks at regular intervals
# ax.xaxis.set_major_locator(loc)
ax.yaxis.set_major_locator(loc)

# loc = plt.MultipleLocator(1.0) # this locator puts ticks at regular intervals
# ax.xaxis.set_minor_locator(loc)
# ax.yaxis.set_minor_locator(loc)

fig.set_tight_layout(True)
fig.savefig(os.path.join(mission_path, "footsteps.pdf"))