# Evaluation Planar Robot

In this notebook we evaluate the extended A* using a planar robot with 3, 6 and 9 DoF. First, let us take a look at the environments used for evaluation:

In [None]:
from core.IPBenchmark import Benchmark 
import evaluation.robotic_arm.IPTestSuite_robotic_arm_3_DoF as ts
import matplotlib.pyplot as plt

from evaluation.robotic_arm.PlotEnvironments import visualizeBenchmark

for benchmark in ts.benchList:
        benchmark: Benchmark
        print(f"----- benchmark: {benchmark.name} -----")
        fig, ax = visualizeBenchmark(benchmark)
        plt.show()

## Evaluation Code

In this section you can find some code for evaluating A*, loading existing results and plotting the results.

In [None]:
import os
import json
import networkx as nx
from typing import List, Dict

import sys
import os

sys.path.append(os.path.join(os.getcwd(), 'core'))
sys.path.append(os.path.join(os.getcwd(), 'evaluation/robotic_arm'))

from core.IPAStarExtended import AStar
from evaluation.robotic_arm.Benchmarking import get_config_dir_name, evaluate
import evaluation.robotic_arm.IPTestSuite_robotic_arm_3_DoF as ts3
import evaluation.robotic_arm.IPTestSuite_robotic_arm_6_DoF as ts6
import evaluation.robotic_arm.IPTestSuite_robotic_arm_9_DoF as ts9
from core.IPLazyPRM import LazyPRM

testSuits = {3: ts3, 6: ts6, 9: ts9}

def get_evaluation_results(configs: List[Dict], algorithm: str = "astar", dump: bool = True):
    results = []

    for i, config in enumerate(configs):
        print(f"----- config {i + 1} of {len(configs)} -----")

        config_results = {}

        ts = testSuits[config["dof"]]
        benchmarks = [ts.benchList[i] for i in config["benchmarks"]]
        for benchmark in benchmarks:
            print(f"----- benchmark: {benchmark.name} -----")
            benchmark_results = {}

            dir_name = get_config_dir_name(config=config, benchmark_name=benchmark.name, algorithm=algorithm)

            if os.path.exists(dir_name):
                # load evaluation results if they exist.

                match algorithm:
                    case "astar":
                        solver = AStar(benchmark.collisionChecker)

                    case "prm":
                        solver = LazyPRM(benchmark.collisionChecker)
                
                solver.start = benchmark.startList[0]
                solver.goal = benchmark.goalList[0]
                with open(f'{dir_name}/graph.json') as f:
                    graph = nx.node_link_graph(json.load(f), edges="links")
                    solver.graph = graph

                with open(f'{dir_name}/stats.json') as f:
                    stats = json.load(f)

                with open(f'{dir_name}/solution.json') as f:
                    solution = json.load(f)

            else:
                # do evaluation

                stats, solution, solver = evaluate(config=config, benchmark=benchmark, algorithm=algorithm, dump=dump)

            
            benchmark_results["stats"] = stats
            benchmark_results["solution"] = solution
            benchmark_results["solver"] = solver

            config_results[benchmark.name] = benchmark_results
        results.append((config, config_results))
    return results

In [None]:
def plot_results_line(results: List, x_axis_values: List, x_axis_title: str):
    # plot execution time
    plot_lines = {}
    for bench_name in results[0][1].keys():
        plot_lines[bench_name] = []

    for _, config_results in results:
        for bench_name, benchmark_results in config_results.items():
            plot_lines[bench_name].append(benchmark_results["stats"]["execution_time"])

    for bench_name in results[0][1].keys():
        plt.plot(x_axis_values, plot_lines[bench_name], "o-", label = bench_name)
    plt.legend()
    plt.title("Execution Time")
    plt.xlabel(x_axis_title)
    plt.ylabel("Execution Time [s]")
    plt.show()

    # plot roadmap size
    plot_lines = {}
    for bench_name in results[0][1].keys():
        plot_lines[bench_name] = []

    for _, config_results in results:
        for bench_name, benchmark_results in config_results.items():
            plot_lines[bench_name].append(benchmark_results["stats"]["road_map_size"])

    for bench_name in results[0][1].keys():
        plt.plot(x_axis_values, plot_lines[bench_name], "o-", label = bench_name)
    plt.legend()
    plt.title("Roadmap Size")
    plt.xlabel(x_axis_title)
    plt.ylabel("Roadmap Size")
    plt.show()

    # plot nume nodes in solution path
    plot_lines = {}
    for bench_name in results[0][1].keys():
        plot_lines[bench_name] = []

    for _, config_results in results:
        for bench_name, benchmark_results in config_results.items():
            if (benchmark_results["stats"]["num_nodes_solution_path"] >= 0):
                plot_lines[bench_name].append(benchmark_results["stats"]["num_nodes_solution_path"])
            else:
                plot_lines[bench_name].append(np.nan)

    for bench_name in results[0][1].keys():
        plt.plot(x_axis_values, plot_lines[bench_name], "o-", label = bench_name)

    plt.legend()
    plt.title("Number of Nodes in Solution Path")
    plt.xlabel(x_axis_title)
    plt.ylabel("Number of Nodes in Solution Path")
    plt.show()

    # plot solution path length
    plot_lines = {}
    for bench_name in results[0][1].keys():
        plot_lines[bench_name] = []

    for _, config_results in results:
        for bench_name, benchmark_results in config_results.items():
            if (benchmark_results["stats"]["solution_path_length"] >= 0):
                plot_lines[bench_name].append(benchmark_results["stats"]["solution_path_length"])
            else:
                plot_lines[bench_name].append(np.nan)

    for bench_name in results[0][1].keys():
        plt.plot(x_axis_values, plot_lines[bench_name], "o-", label = bench_name)

    plt.legend()
    plt.title("Solution Path Length")
    plt.xlabel(x_axis_title)
    plt.ylabel("Solution Path Length")
    plt.show()

In [None]:
def plot_results_bar(results: List, x_axis_values: List, x_axis_title: str):
    # Get benchmark names
    bench_names = list(results[0][1].keys())
    n_benchmarks = len(bench_names)
    n_configs = len(x_axis_values)
    
    # Set up bar positioning
    bar_width = 0.8 / n_configs  # Total width of 0.8, divided by number of configs
    x_positions = np.arange(n_benchmarks)
    
    # Plot execution time
    plt.figure(figsize=(10, 6))
    for i, x_val in enumerate(x_axis_values):
        execution_times = []
        for bench_name in bench_names:
            _, config_results = results[i]
            execution_times.append(config_results[bench_name]["stats"]["execution_time"])
        
        offset = (i - (n_configs - 1) / 2) * bar_width
        plt.bar(x_positions + offset, execution_times, bar_width, 
                label=f'{x_axis_title}: {x_val}', alpha=0.8)
    
    plt.xlabel('Benchmarks')
    plt.ylabel('Execution Time [s]')
    plt.title('Execution Time by Benchmark')
    plt.xticks(x_positions, bench_names, rotation=45, ha='right')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # Plot roadmap size
    plt.figure(figsize=(10, 6))
    for i, x_val in enumerate(x_axis_values):
        roadmap_sizes = []
        for bench_name in bench_names:
            _, config_results = results[i]
            roadmap_sizes.append(config_results[bench_name]["stats"]["road_map_size"])
        
        offset = (i - (n_configs - 1) / 2) * bar_width
        plt.bar(x_positions + offset, roadmap_sizes, bar_width, 
                label=f'{x_axis_title}: {x_val}', alpha=0.8)
    
    plt.xlabel('Benchmarks')
    plt.ylabel('Roadmap Size')
    plt.title('Roadmap Size by Benchmark')
    plt.xticks(x_positions, bench_names, rotation=45, ha='right')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # Plot number of nodes in solution path
    plt.figure(figsize=(10, 6))
    for i, x_val in enumerate(x_axis_values):
        num_nodes = []
        for bench_name in bench_names:
            _, config_results = results[i]
            val = config_results[bench_name]["stats"]["num_nodes_solution_path"]
            num_nodes.append(val if val >= 0 else np.nan)
        
        offset = (i - (n_configs - 1) / 2) * bar_width
        plt.bar(x_positions + offset, num_nodes, bar_width, 
                label=f'{x_axis_title}: {x_val}', alpha=0.8)
    
    plt.xlabel('Benchmarks')
    plt.ylabel('Number of Nodes in Solution Path')
    plt.title('Number of Nodes in Solution Path by Benchmark')
    plt.xticks(x_positions, bench_names, rotation=45, ha='right')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    # Plot solution path length
    plt.figure(figsize=(10, 6))
    for i, x_val in enumerate(x_axis_values):
        path_lengths = []
        for bench_name in bench_names:
            _, config_results = results[i]
            val = config_results[bench_name]["stats"]["solution_path_length"]
            path_lengths.append(val if val >= 0 else np.nan)
        
        offset = (i - (n_configs - 1) / 2) * bar_width
        plt.bar(x_positions + offset, path_lengths, bar_width, 
                label=f'{x_axis_title}: {x_val}', alpha=0.8)
    
    plt.xlabel('Benchmarks')
    plt.ylabel('Solution Path Length')
    plt.title('Solution Path Length by Benchmark')
    plt.xticks(x_positions, bench_names, rotation=45, ha='right')
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
import matplotlib.animation
from IPython.display import HTML

from IPPlanarManipulator import PlanarRobot

#%matplotlib widget

def interpolate_line(startPos, endPos, step_l):
    steps = []
    line = np.array(endPos) - np.array(startPos)
    line_l = np.linalg.norm(line)
    step = line / line_l * step_l
    n_steps = np.floor(line_l / step_l).astype(np.int32)
    c_step = np.array(startPos)
    for i in range(n_steps):
        steps.append(copy.deepcopy(c_step))
        c_step += step
    if not (c_step == np.array(endPos)).all():
        steps.append(np.array(endPos))
    return steps

matplotlib.rcParams['animation.embed_limit'] = 64
def animateSolution(planner, environment, solution, dof):
    _planner = planner
    _environment = environment
    _solution = solution
    
    fig_local = plt.figure(figsize=(7, 7))
    ax = fig_local.add_subplot(1, 1, 1)
    ## get positions for solution
    solution_pos = [_planner.graph.nodes[node]['pos'] for node in _solution]
    ## interpolate to obtain a smoother movement
    i_solution_pos = [solution_pos[0]]
    for i in range(1, len(solution_pos)):
        segment_s = solution_pos[i-1]
        segment_e = solution_pos[i]
        i_solution_pos = i_solution_pos + interpolate_line(segment_s, segment_e, 0.1)[1:]

    ## animate
    frames = len(i_solution_pos)
    
    def animate(t):
        ## clear taks space figure
        ax.cla()
        ## fix figure size
        ax.set_xlim([-3,3])
        ax.set_ylim([-3,3])
        ## draw obstacles
        _environment.drawObstacles(ax, inWorkspace = True)
        ## update robot position
        pos = i_solution_pos[t]
        _environment.kin_chain.move(pos)
        planarRobotVisualize(_environment.kin_chain, ax)

    ani = matplotlib.animation.FuncAnimation(fig_local, animate, frames=frames)
    html = HTML(ani.to_jshtml())
    display(html)
    plt.close()

def planarRobotVisualize(kin_chain, ax):
    joint_positions = kin_chain.get_transforms()
    for i in range(1, len(joint_positions)):
        xs = [joint_positions[i-1][0], joint_positions[i][0]]
        ys = [joint_positions[i-1][1], joint_positions[i][1]]
        ax.plot(xs, ys, color='g')

## Evaluation: Discretization

In this section we will evaluate the influence of the discretization on A*. 
For computational cost reasons, we evaluate the higher DoF only with lower numbers of discretization steps.

### 3DoF

In [None]:
import copy
import matplotlib.pyplot as plt
import numpy as np

disc_values = [4, 6, 10, 20, 50]

configs = []
for disc in disc_values:
    disc_config = dict()
    disc_config["dof"] = 3
    disc_config["lowLimits"] = [-2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["highLimits"] = [2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["discretization"] = [disc for _ in range(disc_config["dof"])]
    disc_config["w"] = .5
    disc_config["heuristic"]  = "euclidean"
    disc_config["reopen"] = True
    disc_config["check_connection"] = True
    disc_config["lazy_check_connection"] = True
    disc_config["benchmarks"] = [0, 1, 2]

    configs.append(disc_config)

results = get_evaluation_results(configs=configs)

plot_results_line(results=results, x_axis_values=disc_values, x_axis_title="Discretization Steps")

### 6DoF

In [None]:
import copy
import matplotlib.pyplot as plt
import numpy as np

disc_values = [4, 6, 10, 20]

configs = []
for disc in disc_values:
    disc_config = dict()
    disc_config["dof"] = 6
    disc_config["lowLimits"] = [-2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["highLimits"] = [2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["discretization"] = [disc for _ in range(disc_config["dof"])]
    disc_config["w"] = .5
    disc_config["heuristic"]  = "euclidean"
    disc_config["reopen"] = True
    disc_config["check_connection"] = True
    disc_config["lazy_check_connection"] = True
    disc_config["benchmarks"] = [0, 1, 2]

    configs.append(disc_config)

results = get_evaluation_results(configs=configs)

plot_results_line(results=results, x_axis_values=disc_values, x_axis_title="Discretization Steps")

### 9DoF

In [None]:
import copy
import matplotlib.pyplot as plt
import numpy as np

disc_values = [4, 6]

configs = []
for disc in disc_values:
    disc_config = dict()
    disc_config["dof"] = 9
    disc_config["lowLimits"] = [-2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["highLimits"] = [2 *np.pi for _ in range(disc_config["dof"])]
    disc_config["discretization"] = [disc for _ in range(disc_config["dof"])]
    disc_config["w"] = .5
    disc_config["heuristic"]  = "euclidean"
    disc_config["reopen"] = True
    disc_config["check_connection"] = True
    disc_config["lazy_check_connection"] = True
    disc_config["benchmarks"] = [0, 1, 2]

    configs.append(disc_config)

results = get_evaluation_results(configs=configs)

plot_results_line(results=results, x_axis_values=disc_values, x_axis_title="Discretization Steps")

The execution time generally increases with the number of discretization steps. For higher numbers of DoF, this increase is so rapid, that we had to choose different maximum numbers of discretization steps for each robot arm. This was expected, because the upper bound for the roadmap size grows exponentially with the DoF.

There is no clear trend we can see in the path length. But we would expect that for higher numbers of discretization, the path length would converge to one minimal path length for each combination of DoF and benchmark environment. However, our computational resources do not allow us to show this experimentally.

## Evaluation: Weight w

In this section we will evaluate the influence of the weight w. For this experiment we use 3 DoF and 50 discretization steps.

In [None]:

w_values = [0.5, 0.75, 1]

configs = []
for w in w_values:
        w_config  = dict()
        w_config["dof"] = 3
        w_config["lowLimits"] = [-2 *np.pi for _ in range(w_config["dof"])]
        w_config["highLimits"] = [2 *np.pi for _ in range(w_config["dof"])]
        w_config["discretization"] = [50 for _ in range(w_config["dof"])]
        w_config["w"] = w
        w_config["heuristic"]  = "euclidean"
        w_config["reopen"] = True
        w_config["check_connection"] = True
        w_config["lazy_check_connection"] = True
        w_config["benchmarks"] = [0, 1, 2]

        configs.append(w_config)

results = get_evaluation_results(configs=configs)

plot_results_line(results=results, x_axis_values=w_values, x_axis_title="Weight w")

Increasing the w value leads to significant improvements in the execution time in this experiment. Of course, a w value that is higher than 0.5 also means that the solution is not optimal anymore, which manifests in an increase of the path length. How much the path length changes seems to be highly dependent on the environment. As a general trend, the path length of more complex scenes seems to suffer more severely when w is increased.

## Evaluation: Comparison to Lazy PRM

In this section we compare A* to Lazy PRM.

For Lazy PRM we use the parameter values:     
initialRoadmapSize = 500     
updateRoadmapSize  = 50    
kNearest = 20       
maxIterations = 100     

### 3DoF

In this experiment we use 50 discretization steps for A*.

In [None]:
algorithms = ["A*", "Lazy PRM"]

configs = []

astar_config  = dict()
astar_config["dof"] = 3
astar_config["lowLimits"] = [-2 *np.pi for _ in range(astar_config["dof"])]
astar_config["highLimits"] = [2 *np.pi for _ in range(astar_config["dof"])]
astar_config["discretization"] = [50 for _ in range(astar_config["dof"])]
astar_config["w"] = 0.5
astar_config["heuristic"]  = "euclidean"
astar_config["reopen"] = True
astar_config["check_connection"] = True
astar_config["lazy_check_connection"] = True
astar_config["benchmarks"] = [0, 1, 2]

configs.append(astar_config)


results_3DoF = get_evaluation_results(configs=configs, algorithm="astar")
results_3DoF.extend(get_evaluation_results(configs=configs, algorithm="prm"))

plot_results_bar(results=results_3DoF, x_axis_values=algorithms, x_axis_title="")

### 6DoF

In this experiment we use 20 discretization steps for A*.

In [None]:
algorithms = ["A*", "Lazy PRM"]

configs = []

astar_config  = dict()
astar_config["dof"] = 6
astar_config["lowLimits"] = [-2 *np.pi for _ in range(astar_config["dof"])]
astar_config["highLimits"] = [2 *np.pi for _ in range(astar_config["dof"])]
astar_config["discretization"] = [20 for _ in range(astar_config["dof"])]
astar_config["w"] = 0.5
astar_config["heuristic"]  = "euclidean"
astar_config["reopen"] = True
astar_config["check_connection"] = True
astar_config["lazy_check_connection"] = True
astar_config["benchmarks"] = [0, 1, 2]

configs.append(astar_config)


results_6DoF = get_evaluation_results(configs=configs, algorithm="astar")
results_6DoF.extend(get_evaluation_results(configs=configs, algorithm="prm"))

plot_results_bar(results=results_6DoF, x_axis_values=algorithms, x_axis_title="")

### 9DoF

In this experiment we use 6 discretization steps for A*.

In [None]:
algorithms = ["A*", "Lazy PRM"]

configs = []

astar_config  = dict()
astar_config["dof"] = 9
astar_config["lowLimits"] = [-2 *np.pi for _ in range(astar_config["dof"])]
astar_config["highLimits"] = [2 *np.pi for _ in range(astar_config["dof"])]
astar_config["discretization"] = [6 for _ in range(astar_config["dof"])]
astar_config["w"] = 0.5
astar_config["heuristic"]  = "euclidean"
astar_config["reopen"] = True
astar_config["check_connection"] = True
astar_config["lazy_check_connection"] = True
astar_config["benchmarks"] = [0, 1, 2]

configs.append(astar_config)


results_9DoF = get_evaluation_results(configs=configs, algorithm="astar")
results_9DoF.extend(get_evaluation_results(configs=configs, algorithm="prm"))

plot_results_bar(results=results_9DoF, x_axis_values=algorithms, x_axis_title="")

Lazy PRM is almost always significantly faster than A*. But while A* will always find the optimal solution for the given discretization, Lazy PRM often does not find a solution for high DoF and complex, narrow environments. The length of the paths found by Lazy PRM are almost always significantly longer than the paths found by A*. This is to be expected, because A* finds optimal paths, while Lazy PRM just searches for any path. The few cases where the path found by Lazy PRM is shorter than the path found by A* can be attributed to the fact that A* only changes one DoF per step in this implementation. Lazy PRM can change multiple DoF simultaneously.

#### Example Result A*:

In [None]:
result = results_3DoF[0][1]["complex"]
solver = result["solver"]
environment = solver._collisionChecker
solution = result["solution"]
animateSolution(planner=solver, environment=environment, solution=solution, dof=3)

####  Example Result PRM:

In [None]:
result = results_3DoF[1][1]["complex"]
solver = result["solver"]
environment = solver._collisionChecker
solution = result["solution"]
print(solution)
animateSolution(planner=solver, environment=environment, solution=solution, dof=3)

Lazy PRM results in smoother motions than A*, because more than one joint can be moved simultaneously. However, the solution path found by A* in most cases is shorter. You can see that the arm in the A* only narrowly avoids the obstacles. This is a symptom of A* only changing the DoF as much as necessary.