Running benchmark in notebook for debug.


In [1]:
import os
import matplotlib.pyplot as plt
BASE_PATH = os.path.abspath('')
print(BASE_PATH)

/home/turtlewizard/thesis-mppi-model-ident/workspace/src/controller_optimization


In [2]:
# create mockup logger
class Logger:
    def info(self, msg: str) -> None:
        print('Info: ' + msg)

    def error(self, msg: str) -> None:
        print('Error: ' + msg)

    def debug(self, msg: str) -> None:
        print('Debug: ' + msg)

    def isEnabledFor(self, *args, **kwargs) -> bool:
        return True


logger = Logger()
logger.info('Hello from mockup logger.')

Info: Hello from mockup logger.


In [3]:
from controller_benchmark import ControllerBenchmark

controller_benchmark = ControllerBenchmark(
    logger=logger,
    config_path=os.path.join(
        BASE_PATH, 'config/controller_benchmark_config.yaml'),
    base_path=BASE_PATH
)

controller_benchmark.launch_nodes()

Info: Loading config...
Debug: Map config file: /home/turtlewizard/thesis-mppi-model-ident/workspace/src/controller_optimization/config/controller_benchmark_config.yaml
Debug: Config: 
 {'cmd_vel_topic': '/cmd_vel',
 'controller': 'FollowPathMPPI',
 'corridor_empty': {'goal_pose': {'x': 2.0, 'y': 0.0, 'yaw': 0.0},
                    'path': 'maps/corridor_empty.yaml',
                    'start_pose': {'x': 0.0, 'y': 0.0, 'yaw': 0.0}},
 'corridor_mid_obstacles': {'goal_pose': {'x': 2.0, 'y': 0.0, 'yaw': 0.0},
                            'path': 'maps/corridor_mid_obstacles.yaml',
                            'start_pose': {'x': 0.0, 'y': 0.0, 'yaw': 0.0}},
 'corridor_side_obstacles': {'goal_pose': {'x': 2.0, 'y': 0.0, 'yaw': 0.0},
                             'path': 'maps/corridor_side_obstacles.yaml',
                             'start_pose': {'x': 0.0, 'y': 0.0, 'yaw': 0.0}},
 'costmap_topic': '/local_costmap/costmap',
 'default_controller_params': 'config/controller_server_params_

[INFO] [1730311892.402329352] [parameter_manager]: ParameterManager for controller_server initialized
[INFO] [1730311892.486887221] [gazebo_interface]: Gazebo interface initialized
[INFO] [1730311892.491706433] [odom_subscriber]: odom_subscriber initialized
[INFO] [1730311892.498108131] [cmd_vel_subscriber]: cmd_vel_subscriber initialized
[INFO] [1730311892.504366595] [mppi_critic_subscriber]: mppi_critic_subscriber initialized


In [4]:
# run benchmark with default parameters
# controller_benchmark.run_benchmark()

[INFO] [1730311892.574374601] [parameter_manager]: Setting parameters for controller_server.


Info: Running benchmark...
Info: Starting data collection.
Info: Changing map to: corridor_mid_obstacles


[INFO] [1730311893.376717361] [basic_navigator]: Change map request was successful!
[INFO] [1730311893.899678251] [basic_navigator]: Publishing Initial Pose


Info: Getting global plan for corridor_mid_obstacles with planner: GridBased.


[INFO] [1730311894.408176957] [basic_navigator]: Getting path...


Info: ___ Starting controller: FollowPathMPPI, on map: corridor_mid_obstacles. ___


[INFO] [1730311894.641318303] [basic_navigator]: Executing path...


Info: ___ Controller FollowPathMPPI on map corridor_mid_obstacles finished in 13.229366372000001 [s]. ___
Debug: Collecting results between 1730311894.640535 and 1730311907.8699017.
Info: Stopping data collection.
Info: Benchmark finished in 15.2989 seconds.


In [22]:
import numpy as np
from itertools import product
# Generate a list of weights from 1-100
n = 10
weights = np.linspace(1.0, 100.0, n)
critics = ['ConstraintCritic', 'GoalCritic', 'PreferForwardCritic',
           'CostCritic', 'PathAlignCritic', 'PathFollowCritic', 'PathAngleCritic']

# weights = [1.0, 10.0, 200.0]
critics = ['ConstraintCritic', 'GoalCritic', 'PathFollowCritic']

parameter_space = list(product(weights, repeat=len(critics)))
print(f'Possible parameter combinations: {len(parameter_space)}')

i = 0
results: list[dict[int, bool]] = []
critic_weights = []
for weights in parameter_space:
    params = {critic: weight for critic, weight in zip(critics, weights)}
    critic_weights.append(params)

    success, res = controller_benchmark.run_benchmark(parameters=params, store_results=False)
    msg = 'successful' if success else 'unsuccessful'
    print(f'Iteration {i} was {msg}. with weights {weights}')

    controller_benchmark.save_result(res)

    results.append({i: success})
    i += 1

Possible parameter combinations: 1000


In [12]:
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.model_selection import GridSearchCV
import copy

# Generate a list of weights from 1-100
n = 10
weights = np.linspace(1.0, 100.0, n)

critics = ['ConstraintCritic', 'GoalCritic', 'PreferForwardCritic',
           'CostCritic', 'PathAlignCritic', 'PathFollowCritic', 'PathAngleCritic']

critics = ['CostCritic', 'PathFollowCritic']

# Create the parameter grid
param_grid = {}
for critic in critics:
    param_grid[critic + '.cost_weight'] = copy.deepcopy(weights)


def evaluate_mppi_config(params):
    controller_benchmark.run_benchmark(params)

    # Run the simulation and get performance metrics
    metric = controller_benchmark.calculate_metric()

    # Calculate a performance score
    performance_score = metric.ms_linear_jerk * 5.0 + metric.ms_angular_jerk

    return performance_score  # Lower score indicates better performance


class MPPIControllerOptimizer(BaseEstimator):
    def __init__(self):
        self.params = controller_benchmark.default_controller_params

    def fit(self, X, y=None):
        # This method is required but does not need to do anything for grid search
        return self

    def set_params(self, **params):
        # Update parameters with the new values from grid search
        for critic in critics:
            self.params[critic + '.cost_weight'] = params[critic + '.cost_weight']
        return self

    def score(self, X, y=None):
        # Call the evaluation function with the parameters
        return -evaluate_mppi_config(self.params)  # Negate for minimization


# Instantiate the optimizer and perform the grid search
optimizer = MPPIControllerOptimizer()
grid_search = GridSearchCV(
    estimator=optimizer,
    param_grid=param_grid,
    scoring='neg_mean_squared_error',  # Use a scoring method suitable for your evaluation
    cv=2,  # Change this to at least 2
    verbose=1
)

# Fit the grid search (the input X can be any valid input; it's not used here)
X_dummy = np.random.rand(3, 1)
grid_search.fit(X=X_dummy)  # Dummy input, as we only need parameter search here

# Get the best parameters
best_params = grid_search.best_params_
print("Best parameters found:", best_params)

Fitting 2 folds for each of 100 candidates, totalling 200 fits
Best parameters found: {'CostCritic.cost_weight': 1.0, 'PathFollowCritic.cost_weight': 1.0}


Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/sklearn/model_selection/_validation.py", line 969, in _score
    scores = scorer(estimator, X_test, **score_params)
TypeError: _BaseScorer.__call__() missing 1 required positional argument: 'y_true'

 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan]
