In [1]:
# Verify required packages are available
import sys
print(f"Python version: {sys.version}")

try:
    import numpy as np
    print(f"✓ numpy {np.__version__} is available")
except ImportError as e:
    print(f"✗ numpy not available: {e}")

try:
    import matplotlib.pyplot as plt
    print("✓ matplotlib is available")
except ImportError as e:
    print(f"✗ matplotlib not available: {e}")

print("All required packages are ready!")

Python version: 3.12.12 (main, Oct  9 2025, 11:07:00) [Clang 17.0.0 (clang-1700.6.3.2)]
✓ numpy 2.4.1 is available
✓ matplotlib is available
All required packages are ready!


In [3]:
"""Uniform Cost Search (UCS) pathfinding for the warehouse grid."""
from __future__ import annotations
from dataclasses import dataclass
from heapq import heappop, heappush
from typing import Dict, Iterable, List, Optional, Tuple
Position = Tuple[int, int]
@dataclass(frozen=True)
class SearchResult:
    path: List[Position]
    cost: int
    nodes_expanded: int
    frontier_max: int
    time_sec: float
    expanded_order: List[Position]
def _neighbors(grid: List[str], pos: Position) -> Iterable[Position]:
    r, c = pos
    for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
        nr, nc = r + dr, c + dc
        if nr < 0 or nc < 0 or nr >= len(grid) or nc >= len(grid[0]):
            continue
        if grid[nr][nc] != "#":
            yield (nr, nc)
def _reconstruct(came_from: Dict[Position, Optional[Position]], goal: Position) -> List[Position]:
    path = [goal]
    current = goal
    while came_from[current] is not None:
        current = came_from[current]
        path.append(current)
    path.reverse()
    return path
def ucs_path(
    grid: List[str],
    start: Position,
    goal: Position,
    record_expansions: bool = False,
) -> SearchResult:
    """Compute the lowest-cost path on a grid using UCS (Dijkstra).
    Cost per move is 1. Walls are '#'. Returns path + search statistics.
    """
    import time
    start_time = time.perf_counter()
    # frontier is a min-heap of (path_cost, tie_breaker, position)
    frontier: List[Tuple[int, int, Position]] = []
    heappush(frontier, (0, 0, start))
    # came_from lets us reconstruct the path at the end
    came_from: Dict[Position, Optional[Position]] = {start: None}
    # cost_so_far stores the best known cost to each position
    cost_so_far: Dict[Position, int] = {start: 0}
    nodes_expanded = 0
    frontier_max = 1
    tie = 0
    expanded_order: List[Position] = []
    while frontier:
        # Always expand the cheapest path so far
        cost, _, current = heappop(frontier)
        nodes_expanded += 1
        if record_expansions:
            expanded_order.append(current)
        # Goal test: stop once we pop the goal from the frontier
        if current == goal:
            path = _reconstruct(came_from, goal)
            return SearchResult(
                path=path,
                cost=cost,
                nodes_expanded=nodes_expanded,
                frontier_max=frontier_max,
                time_sec=time.perf_counter() - start_time,
                expanded_order=expanded_order,
            )
        # Explore neighbors (up, down, left, right) and relax edges
        for nxt in _neighbors(grid, current):
            new_cost = cost_so_far[current] + 1  # each move costs 1
            # If this is a better path to nxt, record it and push to frontier
            if nxt not in cost_so_far or new_cost < cost_so_far[nxt]:
                cost_so_far[nxt] = new_cost
                came_from[nxt] = current
                tie += 1  # ensures heap order is stable when costs tie
                heappush(frontier, (new_cost, tie, nxt))
                if len(frontier) > frontier_max:
                    frontier_max = len(frontier)
    return SearchResult(
        path=[],
        cost=float("inf"),
        nodes_expanded=nodes_expanded,
        frontier_max=frontier_max,
        time_sec=time.perf_counter() - start_time,
        expanded_order=expanded_order,
    )
if __name__ == "__main__":
    # Simple smoke test on the default warehouse grid.
    from warehouse_env import WarehouseEnv
    env = WarehouseEnv()
    obs = env.reset(randomize=False)
    start = obs["robot_pos"]
    pickup = obs["pickup_pos"]
    if pickup is None:
        raise RuntimeError("No pickup tile found in grid.")
    result = ucs_path(env.grid, start, pickup)
    print("UCS path length:", len(result.path) - 1)
    print("Nodes expanded:", result.nodes_expanded)

UCS path length: 2
Nodes expanded: 6


In [4]:
"""A* pathfinding for the warehouse grid."""
from __future__ import annotations
from dataclasses import dataclass
from heapq import heappop, heappush
from typing import Dict, Iterable, List, Optional, Tuple
Position = Tuple[int, int]
@dataclass(frozen=True)
class SearchResult:
    path: List[Position]
    cost: int
    nodes_expanded: int
    frontier_max: int
    time_sec: float
    expanded_order: List[Position]
def manhattan(a: Position, b: Position) -> int:
    return abs(a[0] - b[0]) + abs(a[1] - b[1])
def _neighbors(grid: List[str], pos: Position) -> Iterable[Position]:
    r, c = pos
    for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
        nr, nc = r + dr, c + dc
        if nr < 0 or nc < 0 or nr >= len(grid) or nc >= len(grid[0]):
            continue
        if grid[nr][nc] != "#":
            yield (nr, nc)
def _reconstruct(came_from: Dict[Position, Optional[Position]], goal: Position) -> List[Position]:
    path = [goal]
    current = goal
    while came_from[current] is not None:
        current = came_from[current]
        path.append(current)
    path.reverse()
    return path
def astar_path(
    grid: List[str],
    start: Position,
    goal: Position,
    record_expansions: bool = False,
) -> SearchResult:
    """Compute a path on a grid using A* with Manhattan heuristic."""
    import time
    start_time = time.perf_counter()
    # frontier is a min-heap of (priority=f, path_cost=g, tie_breaker, position)
    frontier: List[Tuple[int, int, int, Position]] = []
    heappush(frontier, (manhattan(start, goal), 0, 0, start))
    # came_from lets us reconstruct the path at the end
    came_from: Dict[Position, Optional[Position]] = {start: None}
    # cost_so_far stores the best known g(n) to each position
    cost_so_far: Dict[Position, int] = {start: 0}
    nodes_expanded = 0
    frontier_max = 1
    tie = 0
    expanded_order: List[Position] = []
    while frontier:
        # Always expand the lowest f(n) = g(n) + h(n)
        _, cost, _, current = heappop(frontier)
        nodes_expanded += 1
        if record_expansions:
            expanded_order.append(current)
        # Goal test: stop once we pop the goal from the frontier
        if current == goal:
            path = _reconstruct(came_from, goal)
            return SearchResult(
                path=path,
                cost=cost,
                nodes_expanded=nodes_expanded,
                frontier_max=frontier_max,
                time_sec=time.perf_counter() - start_time,
                expanded_order=expanded_order,
            )
        # Explore neighbors (up, down, left, right) and relax edges
        for nxt in _neighbors(grid, current):
            new_cost = cost_so_far[current] + 1  # g(n) + cost of one step
            # If this is a better path to nxt, record it and push to frontier
            if nxt not in cost_so_far or new_cost < cost_so_far[nxt]:
                cost_so_far[nxt] = new_cost
                came_from[nxt] = current
                tie += 1  # ensures heap order is stable when priorities tie
                priority = new_cost + manhattan(nxt, goal)  # f(n) = g(n) + h(n)
                heappush(frontier, (priority, new_cost, tie, nxt))
                if len(frontier) > frontier_max:
                    frontier_max = len(frontier)
    return SearchResult(
        path=[],
        cost=float("inf"),
        nodes_expanded=nodes_expanded,
        frontier_max=frontier_max,
        time_sec=time.perf_counter() - start_time,
        expanded_order=expanded_order,
    )
if __name__ == "__main__":
    from warehouse_env import WarehouseEnv
    env = WarehouseEnv()
    obs = env.reset(randomize=False)
    start = obs["robot_pos"]
    pickup = obs["pickup_pos"]
    if pickup is None:
        raise RuntimeError("No pickup tile found in grid.")
    result = astar_path(env.grid, start, pickup)
    print("A* path length:", len(result.path) - 1)
    print("Nodes expanded:", result.nodes_expanded)

A* path length: 2
Nodes expanded: 3


In [None]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np

# Import warehouse environment and visualization
from warehouse_env import WarehouseEnv
from warehouse_viz import replay_animation

@dataclass
class TrialStats:
    path_len: int
    nodes_expanded: int
    frontier_max: int
    time_sec: float

# Use the SearchResult classes from the previous cells
# Note: Both UCS and A* have the same SearchResult structure, so we can use either
SearchResult = type(ucs_path([], (0, 0), (0, 0)))

ModuleNotFoundError: No module named 'ucs_pathfinder'