From eae108dbba23200e589c38971e5a88112dc61f0d Mon Sep 17 00:00:00 2001 From: rsasaki0109 Date: Fri, 5 Jun 2026 11:17:10 +0900 Subject: [PATCH] Add RRT online replanning in continuous space (navigation/41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The repo had grid A* online replanning (04) but no sampling-based planner. This adds an RRT (LaValle, 1998) — the continuous-space counterpart — running the same plan/discover/replan loop. The robot plans an RRT with the obstacles it knows and follows the path, but one obstacle is hidden until it comes within sensor range. When a newly sensed obstacle is found to block the remaining path, the world reports a path_blocked failure and the agent replans an RRT from its current position with the updated obstacle set. Plan, discover, replan, arrive. (RRT is randomized, so the first tree occasionally routes around the hidden disc and no replan is needed; the robot reaches the goal either way — the default seed always replans.) - self-contained RRTWorld (known + hidden discs, local sensing, segment-disc collision) + a standalone plan_rrt() and an RRTReplanAgent that detects a blocked path and replans, plus a References section - two smoke tests: the default seed threads the gap, senses the disc, replans, and reaches the goal with no collision; the planner+replanner reach the goal on every seed in range(6) - examples index + navigation README section; example 43->44, tests 120->122 Verified across seeds 0-9 (10/10 reach the goal, ~8/10 trigger a replan; default seed replans once) and the matplotlib render path under Agg. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 2 +- docs/status.md | 4 +- examples/README.md | 1 + examples/navigation/41_rrt_replanning.py | 339 +++++++++++++++++++++++ examples/navigation/README.md | 42 +++ tests/test_examples_smoke.py | 26 ++ 6 files changed, 411 insertions(+), 3 deletions(-) create mode 100644 examples/navigation/41_rrt_replanning.py diff --git a/README.md b/README.md index 306e536..33416a1 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ star helps others find it. ## Status -43 runnable examples · 38 README GIFs · 120 smoke / regression tests · +44 runnable examples · 38 README GIFs · 122 smoke / regression tests · 5 Gymnasium-style adapters · CI green on Python 3.10, 3.11, and 3.12. See `docs/status.md` for the implementation snapshot, `docs/plan.md` for the diff --git a/docs/status.md b/docs/status.md index 1eda2bb..fdd8d10 100644 --- a/docs/status.md +++ b/docs/status.md @@ -5,10 +5,10 @@ see what exists, what is verified, and what should come next. ## Snapshot -- Runnable examples: 43 +- Runnable examples: 44 - Learning-path roadmap examples: 20 - README GIFs: 38 -- Smoke and regression tests: 120 (107 example/adapter/static + 13 planning) +- Smoke and regression tests: 122 (109 example/adapter/static + 13 planning) - Colab notebooks: 5 - Core dependencies: `numpy`, `matplotlib` - Contributor extra: `pip install -e ".[dev]"` diff --git a/examples/README.md b/examples/README.md index 369ed0b..908ca94 100644 --- a/examples/README.md +++ b/examples/README.md @@ -32,6 +32,7 @@ Run any example headless with its `--no-render` flag when available. | `navigation/34_human_correction_replanning.py` | `python examples/navigation/34_human_correction_replanning.py` | plan shortcut -> human correction -> cost update -> replan | | `navigation/38_monte_carlo_localization.py` | `python examples/navigation/38_monte_carlo_localization.py` | particle filter -> kidnapped -> inject particles -> recover | | `navigation/40_potential_field_escape.py` | `python examples/navigation/40_potential_field_escape.py` | gradient descent -> local minimum -> boundary follow -> goal | +| `navigation/41_rrt_replanning.py` | `python examples/navigation/41_rrt_replanning.py` | RRT plan -> sense hidden obstacle -> path blocked -> replan | ## Manipulation diff --git a/examples/navigation/41_rrt_replanning.py b/examples/navigation/41_rrt_replanning.py new file mode 100644 index 0000000..17c519a --- /dev/null +++ b/examples/navigation/41_rrt_replanning.py @@ -0,0 +1,339 @@ +"""Plan a path with an RRT, discover a hidden obstacle on it, and replan around it. + +A Rapidly-exploring Random Tree (LaValle, 1998) plans in continuous space by +growing a tree from the start: sample a random point, extend the nearest node a +short step toward it, keep the edge if it is collision-free, and stop when the +tree reaches the goal. It is the sampling-based counterpart to the grid A* in +`04_online_replanning_astar.py`. + +This example is the online-replanning loop in continuous space. The robot plans +with the obstacles it currently knows about and starts following the path — but +one obstacle is hidden until the robot is close enough to sense it. When the +newly sensed obstacle is found to block the remaining path, the world reports a +`path_blocked` failure, and the agent replans an RRT from its current position +with the updated obstacle set and continues. Plan, discover, replan, arrive. + +Success: the robot reaches the goal radius. +Failure: path_blocked (recoverable - a sensed obstacle invalidated the path and +triggered a replan), rrt_unreachable (terminal - sampling budget spent without a +path), collision (terminal), timeout (terminal). + +References: + * S. M. LaValle, "Rapidly-Exploring Random Trees: A New Tool for Path + Planning," TR 98-11, Iowa State University, 1998. + * S. M. LaValle and J. J. Kuffner, "Randomized Kinodynamic Planning," IJRR 2001. +""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from pir.core.random import make_rng +from pir.core.types import Failure, StepResult, Trace + + +@dataclass(frozen=True) +class Disc: + center: tuple[float, float] + radius: float + + +@dataclass +class RRTConfig: + world_size: float = 10.0 + start: tuple[float, float] = (1.0, 1.0) + goal: tuple[float, float] = (9.0, 9.0) + goal_radius: float = 0.4 + robot_radius: float = 0.2 + speed: float = 0.3 + sensor_range: float = 2.2 + max_steps: int = 200 + # Obstacles the planner knows from the start, flanking the start-goal diagonal. + known: tuple[Disc, ...] = (Disc((3.3, 4.0), 1.0), Disc((6.7, 6.0), 1.0)) + # An obstacle only sensed within sensor_range, sitting in the gap the first + # plan usually threads through. Because RRT samples randomly, the first tree + # occasionally routes around it and no replan is needed; usually it does not, + # and the robot discovers the disc and replans. Either way it reaches the goal. + hidden: Disc = Disc((5.0, 5.0), 1.4) + + +def _segment_hits_disc(a: np.ndarray, b: np.ndarray, disc: Disc, clearance: float) -> bool: + center = np.asarray(disc.center, dtype=float) + ab = b - a + length_sq = float(ab @ ab) + if length_sq < 1e-12: + return float(np.linalg.norm(a - center)) <= disc.radius + clearance + t = float(np.clip((center - a) @ ab / length_sq, 0.0, 1.0)) + closest = a + t * ab + return float(np.linalg.norm(closest - center)) <= disc.radius + clearance + + +def plan_rrt( + start: np.ndarray, + goal: np.ndarray, + obstacles: list[Disc], + rng: np.random.Generator, + *, + world_size: float, + clearance: float, + step: float = 0.6, + goal_sample_rate: float = 0.1, + max_iter: int = 2000, + goal_tol: float = 0.4, +) -> list[np.ndarray] | None: + """Grow an RRT from start to goal; return a waypoint path or None.""" + + nodes = [np.asarray(start, dtype=float)] + parents = [-1] + for _ in range(max_iter): + if rng.random() < goal_sample_rate: + sample = np.asarray(goal, dtype=float) + else: + sample = rng.uniform(0.0, world_size, size=2) + # Nearest existing node, then a step toward the sample. + dists = [float(np.linalg.norm(sample - n)) for n in nodes] + i = int(np.argmin(dists)) + nearest = nodes[i] + direction = sample - nearest + norm = float(np.linalg.norm(direction)) + if norm < 1e-9: + continue + new_node = nearest + direction / norm * min(step, norm) + if any(_segment_hits_disc(nearest, new_node, o, clearance) for o in obstacles): + continue + nodes.append(new_node) + parents.append(i) + if float(np.linalg.norm(new_node - goal)) <= goal_tol: + # Reconstruct the path from this node back to the root. + path = [np.asarray(goal, dtype=float)] + j = len(nodes) - 1 + while j != -1: + path.append(nodes[j]) + j = parents[j] + path.reverse() + return path + return None + + +class RRTWorld: + """Continuous 2D world with known and hidden discs; the robot senses locally.""" + + def __init__(self, *, seed: int | None = 0, config: RRTConfig | None = None) -> None: + self.config = config or RRTConfig() + self.seed = seed + self.reset(seed=seed) + + def reset(self, seed: int | None = None) -> dict[str, Any]: + if seed is not None: + self.seed = seed + self.rng = make_rng(self.seed) + self.pos = np.asarray(self.config.start, dtype=float).copy() + self.goal = np.asarray(self.config.goal, dtype=float) + self.time = 0 + self._hidden_found = False + return self.observe() + + def observe(self) -> dict[str, Any]: + sensed = list(self.config.known) + hidden = self.config.hidden + if ( + float(np.linalg.norm(self.pos - np.asarray(hidden.center))) - hidden.radius + <= self.config.sensor_range + ): + sensed.append(hidden) + self._hidden_found = True + return { + "time": self.time, + "position": self.pos.copy(), + "goal": self.goal.copy(), + "sensed_obstacles": sensed, + "hidden_found": self._hidden_found, + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + } + + def step(self, action: dict[str, Any]) -> StepResult: + self.time += 1 + velocity = np.asarray(action.get("velocity", np.zeros(2)), dtype=float) + self.pos = np.clip(self.pos + velocity, 0.0, self.config.world_size) + info: dict[str, Any] = { + "time": self.time, + "position": self.pos.copy(), + "distance_to_goal": float(np.linalg.norm(self.goal - self.pos)), + "replans": int(action.get("replans", 0)), + "success": False, + } + if action.get("path_blocked"): + info["failure"] = Failure("path_blocked", "a sensed obstacle invalidated the path", True) + if action.get("rrt_unreachable"): + info["failure"] = Failure("rrt_unreachable", "RRT found no path", False) + return StepResult(self.observe(), -1.0, True, info) + + all_obstacles = list(self.config.known) + [self.config.hidden] + for disc in all_obstacles: + if float(np.linalg.norm(self.pos - np.asarray(disc.center))) < disc.radius: + info["failure"] = Failure("collision", "robot entered an obstacle", False) + return StepResult(self.observe(), -1.0, True, info) + + if info["distance_to_goal"] <= self.config.goal_radius: + info["success"] = True + return StepResult(self.observe(), 1.0, True, info) + if self.time >= self.config.max_steps: + info["failure"] = Failure("timeout", "ran out of steps", False) + return StepResult(self.observe(), -0.5, True, info) + return StepResult(self.observe(), -0.01, False, info) + + +class RRTReplanAgent: + """Follows an RRT path; replans when a sensed obstacle blocks the route.""" + + def __init__(self, *, config: RRTConfig | None = None, seed: int | None = 0) -> None: + self.config = config or RRTConfig() + self.rng = make_rng(None if seed is None else seed + 104729) + self.reset() + + def reset(self) -> None: + self.path: list[np.ndarray] | None = None + self.waypoint = 0 + self.replans = 0 + self.known_count = -1 + + def _plan(self, pos: np.ndarray, obstacles: list[Disc]) -> bool: + path = plan_rrt( + pos, self.config.goal, obstacles, self.rng, + world_size=self.config.world_size, clearance=self.config.robot_radius, + ) + if path is None: + self.path = None + return False + self.path = path + self.waypoint = 1 if len(path) > 1 else 0 + return True + + def _path_blocked(self, pos: np.ndarray, obstacles: list[Disc]) -> bool: + if not self.path: + return True + pts = [pos] + self.path[self.waypoint:] + for a, b in zip(pts[:-1], pts[1:]): + if any(_segment_hits_disc(a, b, o, self.config.robot_radius) for o in obstacles): + return True + return False + + def act(self, obs: dict[str, Any]) -> dict[str, Any]: + pos = np.asarray(obs["position"], dtype=float) + obstacles = list(obs["sensed_obstacles"]) + blocked = False + + if self.path is None or self._path_blocked(pos, obstacles): + blocked = self.path is not None # a real invalidation, not the first plan + if blocked: + self.replans += 1 + ok = self._plan(pos, obstacles) + if not ok: + return {"velocity": np.zeros(2), "rrt_unreachable": True, "replans": self.replans} + + target = self.path[self.waypoint] + if float(np.linalg.norm(target - pos)) < self.config.speed and self.waypoint < len(self.path) - 1: + self.waypoint += 1 + target = self.path[self.waypoint] + direction = target - pos + norm = float(np.linalg.norm(direction)) + velocity = np.zeros(2) if norm < 1e-9 else direction / norm * min(self.config.speed, norm) + return {"velocity": velocity, "path_blocked": blocked, "replans": self.replans} + + def update(self, obs: dict[str, Any], reward: float, info: dict[str, Any]) -> None: + info["replans"] = self.replans + info["waypoint"] = self.waypoint + info["path_len"] = 0 if self.path is None else len(self.path) + + +def run(seed: int = 0, render: bool = True, max_steps: int | None = None) -> Trace: + config = RRTConfig() + if max_steps is not None: + config.max_steps = max_steps + world = RRTWorld(seed=seed, config=config) + agent = RRTReplanAgent(config=config, seed=seed) + obs = world.reset(seed=seed) + agent.reset() + trace = Trace() + + for _ in range(config.max_steps): + action = agent.act(obs) + result = world.step(action) + obs, reward, done, info = result.as_tuple() + agent.update(obs, reward, info) + trace.append(obs, action, reward, info) + + if render: + _render(world, agent, info) + + if done: + break + + return trace + + +def _render(world: RRTWorld, agent: RRTReplanAgent, info: dict[str, Any]) -> None: + import matplotlib.pyplot as plt + from matplotlib.patches import Circle + + if not hasattr(world, "_fig") or world._fig is None: + plt.ion() + world._fig, world._ax = plt.subplots(figsize=(5, 5)) + ax = world._ax + ax.clear() + ax.set_title("RRT replanning: plan, discover, replan, arrive") + ax.set_xlim(0, world.config.world_size) + ax.set_ylim(0, world.config.world_size) + ax.set_aspect("equal", adjustable="box") + for disc in world.config.known: + ax.add_patch(Circle(disc.center, disc.radius, color="0.3", alpha=0.6)) + h = world.config.hidden + ax.add_patch(Circle(h.center, h.radius, color="tab:orange" if world._hidden_found else "0.85", + alpha=0.6 if world._hidden_found else 0.3)) + if agent.path: + xs = [p[0] for p in agent.path] + ys = [p[1] for p in agent.path] + ax.plot(xs, ys, "-", color="tab:green", linewidth=1, alpha=0.8) + ax.plot(*world.goal, marker="*", color="tab:green", markersize=16) + ax.plot(*world.pos, marker="o", color="tab:blue", markersize=9) + ax.text(0.02, 0.97, f"d={info['distance_to_goal']:.2f} replans={info.get('replans', 0)}", + transform=ax.transAxes, va="top") + world._fig.canvas.draw_idle() + plt.pause(0.03) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--max-steps", type=int, default=200) + parser.add_argument("--no-render", action="store_true") + args = parser.parse_args() + + trace = run(seed=args.seed, render=not args.no_render, max_steps=args.max_steps) + final = trace.infos[-1] + failures = sorted({f.kind for f in trace.failures()}) + print( + f"reached={final.get('success', False)} steps={len(trace.actions)} " + f"replans={final.get('replans', 0)} " + f"final_dist={final.get('distance_to_goal', float('nan')):.2f} failures={failures}" + ) + + if not args.no_render: + import matplotlib.pyplot as plt + + plt.ioff() + plt.show() + + +if __name__ == "__main__": + main() diff --git a/examples/navigation/README.md b/examples/navigation/README.md index b4d7563..a847eaf 100644 --- a/examples/navigation/README.md +++ b/examples/navigation/README.md @@ -699,3 +699,45 @@ attractive + repulsive force -> step -> no net progress ? boundary-follow : desc - Move the obstacle off the start–goal line and watch the trap weaken. - Add a second obstacle to build a concave pocket and a deeper minimum. - Raise `k_rep` and watch the stagnation point sit further from the obstacle. + +## `41_rrt_replanning.py` + +### What this teaches + +A Rapidly-exploring Random Tree (LaValle, 1998) plans in continuous space by +growing a tree from the start — sample a point, extend the nearest node a short +step toward it, keep collision-free edges — until it reaches the goal. It is the +sampling-based counterpart to the grid A* in `04_online_replanning_astar.py`. +This runs the online-replanning loop in continuous space: the robot plans with +the obstacles it knows, follows the path, and when it senses a hidden obstacle +that blocks the remaining route the world reports `path_blocked`, the agent +replans an RRT from its current position, and it continues. Plan, discover, +replan, arrive. (RRT is randomized, so the first tree occasionally routes around +the hidden disc and no replan is needed; the robot reaches the goal either way.) + +### Run + +```bash +python examples/navigation/41_rrt_replanning.py +``` + +### Key loop + +```text +RRT plan -> follow path -> sense obstacle on the route ? replan : advance -> goal +``` + +### Simplifications + +- continuous 2D point robot; circular obstacles; segment–disc collision checks +- one obstacle is hidden until the robot is within `sensor_range` of it +- a path is "blocked" if any remaining segment intersects a sensed disc +- the RRT returns the first path found (not shortest); no smoothing or RRT\* + +### Things to try + +- Change the seed and watch the tree, the path, and the replan count change. +- Enlarge `hidden` until every seed must replan; shrink it until none do. +- Lower `sensor_range` so the robot discovers the obstacle later (and replans + with less room). +- Swap in `plan_rrt` goal-bias or step size and watch path quality change. diff --git a/tests/test_examples_smoke.py b/tests/test_examples_smoke.py index be6ee98..6e5da67 100644 --- a/tests/test_examples_smoke.py +++ b/tests/test_examples_smoke.py @@ -910,3 +910,29 @@ def test_plain_potential_field_stalls_in_the_minimum() -> None: # The escape recovers on the same scene and ends much closer to the goal. assert escaped.infos[-1]["success"] is True assert escaped.infos[-1]["distance_to_goal"] < plain.infos[-1]["distance_to_goal"] + + +def test_rrt_replans_around_a_discovered_obstacle() -> None: + module = load_example("examples/navigation/41_rrt_replanning.py") + + trace = module.run(seed=0, render=False) + final = trace.infos[-1] + + # The first RRT path threads the gap where the hidden disc sits; the robot + # senses it, reports path_blocked, replans, and still reaches the goal. + assert final["success"] is True + assert final["replans"] >= 1 + assert any(f.kind == "path_blocked" for f in trace.failures()) + assert not any(f.kind == "collision" for f in trace.failures()) + assert trace.observations[-1]["hidden_found"] is True + + +def test_rrt_reaches_goal_across_seeds() -> None: + module = load_example("examples/navigation/41_rrt_replanning.py") + + # RRT is randomized, but the planner + replanner reach the goal on every seed + # (whether or not the first sampled tree happened to dodge the hidden disc). + for seed in range(6): + trace = module.run(seed=seed, render=False) + assert trace.infos[-1]["success"] is True, f"seed {seed} did not reach the goal" + assert trace.infos[-1]["path_len"] > 0