# Visualize Field Test Data
### This notebook serves as a helper to load and visualize using the global coordinates the field test experiments

### Imports

In [None]:
import os
import pandas as pd
from glob import glob
import numpy as np
import seaborn as sns
# pandas show all columns
pd.set_option('display.max_columns', None)


### Helper functions

In [None]:
def load_all_field_tests(base_dir, robot, task):
    folder_path = os.path.join(base_dir, robot, task)
    csv_files = glob(os.path.join(folder_path, "*.csv"))

    runs = []
    for file in csv_files:
        df = pd.read_csv(file)
        df["run_id"] = os.path.basename(file)  # Store filename for tracking

        runs.append(df)  # ✅ Ensures tuple format

    print(f"✅ Loaded {len(runs)} runs for {robot} - {task}.")
    return runs  # Ensure the return is a list of (df, filename)


In [None]:
def compute_cumulative_goals_field(run, threshold=0.2, robot="Kingfisher"):
    """
    Compute cumulative goals reached for a single field test run, handling column swaps for specific robots.

    Args:
        run (pd.DataFrame): Field test run dataframe.
        robot (str): Name of the robot (used for column correction).
        threshold (float): Distance threshold to consider a goal as reached.

    Returns:
        np.ndarray: Cumulative goals reached over time.
    """
    # Detect the correct column based on the robot type
    if robot == "Kingfisher":
        distance_column = "linear_velocities_bodyx.m/s"  # Fix the swap
    else:
        distance_column = "task_data.dist.m"

    if "num_goals_reached.u" not in run.columns:
        print(f"⚠️ No goals reached data found for {robot}.")
        goals_reached_csv = None
    else:
        print(f"✅ Found goals reached data for {robot}.")    
        goals_reached_csv = run["num_goals_reached.u"].values[-1]
    # Ensure the column exists before proceeding
    if distance_column not in run.columns:
        raise ValueError(f"Column '{distance_column}' not found in the field test data for {robot}")

    distances = run[distance_column].values
    cumulative_goals = np.zeros_like(distances, dtype=int)
    goal_reached_flag = False
    total_goals = 0

    for t in range(len(distances)):
        if distances[t] < threshold and not goal_reached_flag:
            total_goals += 1
            goal_reached_flag = True
        elif distances[t] > threshold:
            goal_reached_flag = False
        cumulative_goals[t] = total_goals

    return cumulative_goals, goals_reached_csv

In [None]:
import matplotlib.pyplot as plt

def plot_all_field_runs(runs, metric="distance_error.m"):
    """
    Plot all runs for a given metric.

    Args:
        runs (list): List of DataFrames.
        metric (str): Metric to plot (e.g., "distance_error.m").
    """
    plt.figure(figsize=(12, 6))

    for df in runs:
        if metric in df.columns:
            plt.plot(df[metric], label=df["run_id"].iloc[0])  # Use filename as legend

    plt.xlabel("Timesteps")
    plt.ylabel(metric.replace("_", " ").title())
    plt.title(f"Field Test Runs - {metric.replace('_', ' ').title()}")
    plt.legend(loc="upper right", fontsize=8, frameon=True)
    plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
    plt.show()


## Plot robot-task pair, showing all runs available in the folder

In [None]:
BASE_FIELD_DIR = "../../../../../field_tests_data"

# Select robot and task
robot = "Turtlebot2"
task = "GoThroughPositions"

# Load all runs
all_runs = load_all_field_tests(BASE_FIELD_DIR, robot, task)
print(f'total runs: {len(all_runs)}')
# print the name of the run at its length
for run in all_runs:
    print(run["run_id"].iloc[0], len(run)) # Print run name and length
    goals_num, goals_reached = compute_cumulative_goals_field(run, threshold=0.2, robot=robot)
    print(goals_num[-1], goals_reached) # Print the number of goals reached and the number of goals reached in the run
    
# Plot all runs for distance error
if all_runs:
    plot_all_field_runs(all_runs, metric="distance_error.m")


# Helper functions for plots and runs filtering

### Manually assign labels to different run types based on their trajectory characteristics.
Interactive labelling through command line

In [None]:
def manual_classify_runs(runs):
    """
    Manually assign labels to different run types based on their trajectory characteristics.
    
    Args:
        runs (list): List of DataFrames representing different runs.

    Returns:
        dict: Dictionary with cluster labels (e.g., "Straight", "Zigzag").
    """
    cluster_labels = {}

    print("\n🛠️ Manually classify each run:")
    for run in runs:
        plt.figure(figsize=(6, 4))
        plt.plot(run["elapsed_time.s"], run["num_goals_reached.u"], label=run["run_id"].iloc[0])
        plt.xlabel("Time (s)")
        plt.ylabel("Cumulative Goals Reached")
        plt.title(f"Run: {run['run_id'].iloc[0]}")
        plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
        plt.legend(fontsize=8, loc="upper left")
        plt.show()

        label = input("Enter label for this run (e.g., 'Straight', 'Zigzag', 'Square', 'Diagonal'): ")
        cluster_labels[run["run_id"].iloc[0]] = label

    return cluster_labels

def plot_runs_by_category(runs, cluster_labels):
    """
    Plot runs grouped by manually assigned categories.

    Args:
        runs (list): List of DataFrames representing different runs.
        cluster_labels (dict): Dictionary of manually assigned cluster labels.
    """
    unique_labels = set(cluster_labels.values())
    
    for label in unique_labels:
        plt.figure(figsize=(10, 6))
        
        for run in runs:
            run_id = run["run_id"].iloc[0]
            if cluster_labels[run_id] == label:
                plt.plot(run["elapsed_time.s"], run["num_goals_reached.u"], label=run_id, alpha=0.7)

        plt.xlabel("Time (s)")
        plt.ylabel("Cumulative Goals Reached")
        plt.title(f"Cluster: {label}")
        plt.legend(fontsize=6, loc="upper right", frameon=True)
        plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
        plt.show()

### Different plot functions

In [None]:

def plot_all_runs(runs, title="All Runs - Cumulative Goals"):
    """
    Plot all runs to identify different trajectory types.

    Args:
        runs (list): List of DataFrames representing different runs.
        title (str): Title for the plot.
    """
    plt.figure(figsize=(10, 6))
    
    for run in runs:
        plt.plot(run["elapsed_time.s"], run["num_goals_reached.u"], label=run["run_id"].iloc[0], alpha=0.7)

    plt.xlabel("Time (s)")
    plt.ylabel("Cumulative Goals Reached")
    plt.title(title)
    plt.legend(fontsize=6, loc="upper right", frameon=True)
    plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
    plt.show()


In [None]:

def plot_multiple_trajectories(runs, task, show_velocities=True, show_goals=True, max_runs=10):
    """
    Plots multiple robot trajectories, goal locations, and optional velocity vectors.

    Args:
        runs (list of pd.DataFrame): List of run dataframes.
        task (str): Task name.
        show_velocities (bool): Whether to overlay velocity vectors.
        show_goals (bool): Whether to mark goal positions.
        max_runs (int): Limit the number of plotted runs to reduce clutter.
    """
    sns.set_theme(style="darkgrid")

    plt.figure(figsize=(8, 7))

    # Limit the number of runs plotted
    plotted_runs = runs[:max_runs]

    for run in plotted_runs:
        positions = run[["position_world.x.m", "position_world.y.m"]]
        run_id = run["run_id"].iloc[0]  # Identify run by filename

        # Plot trajectory
        plt.plot(positions["position_world.x.m"], positions["position_world.y.m"], label=run_id, alpha=0.7)

        # Plot goal positions
        if show_goals:
            goals = run[["target_position.x.m", "target_position.y.m"]].drop_duplicates()
            plt.scatter(goals["target_position.x.m"], goals["target_position.y.m"], color="red", marker="X", s=100)

        # Overlay velocity vectors
        if show_velocities:
            skip = max(1, len(run) // 30)  # Reduce clutter
            plt.quiver(
                run["position_world.x.m"][::skip], run["position_world.y.m"][::skip],
                run["linear_velocities_worldx.m/s"][::skip], run["linear_velocities_worldy.m/s"][::skip],
                color="orange", angles="xy", scale_units="xy", scale=5, width=0.02
            )

    # Labels & aesthetics
    plt.xlabel("X Position (m)")
    plt.ylabel("Y Position (m)")
    plt.title(f"{task}: Trajectory & Goal Visualization")
    plt.legend(fontsize=6, loc="upper right", frameon=True)
    plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
    plt.show()


In [None]:
def plot_single_trajectory(run_df, run_id, task, show_velocities=True, show_goals=True):
    """
    Plots a single robot trajectory with goal positions and velocity vectors.

    Args:
        run_df (pd.DataFrame): DataFrame of the run.
        run_id (str): Name of the run file.
        task (str): Task name.
        show_velocities (bool): Whether to overlay velocity vectors.
        show_goals (bool): Whether to mark goal positions.
    """
    sns.set_theme(style="darkgrid")
    plt.figure(figsize=(8, 7))

    # Extract positions
    positions = run_df[["position_world.x.m", "position_world.y.m"]]

    # Plot trajectory
    plt.plot(positions["position_world.x.m"], positions["position_world.y.m"], label="Trajectory", color="blue", alpha=0.7)

    # Plot goal positions
    if show_goals:
        goals = run_df[["target_position.x.m", "target_position.y.m"]].drop_duplicates()
        plt.scatter(goals["target_position.x.m"], goals["target_position.y.m"], color="red", marker="o", s=100, label="Goals")

    # Overlay velocity vectors
    if show_velocities:
        skip = max(1, len(run_df) // 30)  # Reduce clutter
        plt.quiver(
            run_df["position_world.x.m"][::skip], run_df["position_world.y.m"][::skip],
            run_df["linear_velocities_worldx.m/s"][::skip], run_df["linear_velocities_worldy.m/s"][::skip],
            color="orange", angles="xy", scale_units="xy", scale=5, width=0.02, label="Velocity Vectors"
        )

    # Labels & aesthetics
    plt.xlabel("X Position (m)")
    plt.ylabel("Y Position (m)")
    plt.title(f"{task}: Trajectory - {run_id}")
    plt.legend()
    plt.grid(True, linestyle="--", linewidth=0.5, alpha=0.4)
    plt.show()

### Load robot-task

In [None]:
# Example Usage
base_dir = "../../../../../field_tests_data"
robot = "Kingfisher"
task = "TrackVelocities"

### Select operation: plot single run, multiple runs, classify trajectories based on shape etc.

In [None]:

# Load all runs
all_runs_glob = load_all_field_tests(base_dir, robot, task)
plot_multiple_trajectories(all_runs_glob, task, show_velocities=True, show_goals=True, max_runs=1)

# # Manually classify runs
# run_labels = manual_classify_runs(all_runs)

# # Plot by category
# plot_runs_by_category(all_runs, run_labels)

# Save classification results
# pd.DataFrame.from_dict(run_labels, orient="index", columns=["Trajectory Type"]).to_csv("manual_trajectory_classification.csv")


In [None]:
for run_df in all_runs_glob:
    RUN_ID = run_df["run_id"].iloc[0]
    # If run is longer than 350 steps, then plot it to 350 steps
    # if len(run_df) > 400:
    #     run_df = run_df[:400]
    print(f"\n🔍 Plotting Run: {RUN_ID}, with length: {len(run_df)}")
    plot_single_trajectory(run_df, RUN_ID, task, show_velocities=True, show_goals=True)
    # save the df to a csv file
    # run_df.to_csv(f"{RUN_ID.split('.')[0]}_short.csv", index=False)
