In [1]:
import heapq
from collections import defaultdict
import time
import math
import random

class GridWorld:
    """
    2D grid world with varying terrain costs and static obstacles.
    """
    def __init__(self, grid, start, goal):
        self.grid = [row[:] for row in grid]  # Deep copy
        self.rows = len(grid)
        self.cols = len(grid[0])
        self.start = start  # Tuple (r, c)
        self.goal = goal
        # Min cost for heuristic
        self.min_cost = min((c for row in grid for c in row if c > 0))

    def is_valid(self, r, c):
        return 0 <= r < self.rows and 0 <= c < self.cols and self.grid[r][c] > 0

    def get_cost(self, r, c):
        return self.grid[r][c]

    def neighbors(self, r, c):
        dirs = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # 4-connected
        for dr, dc in dirs:
            nr, nc = r + dr, c + dc
            if self.is_valid(nr, nc):
                yield nr, nc, self.get_cost(nr, nc)  # Position and edge cost to enter

def manhattan_heuristic(r, c, goal, min_cost):
    gr, gc = goal
    return (abs(r - gr) + abs(c - gc)) * min_cost  # Admissible

def uniform_cost_search(gw):
    """
    Uninformed: UCS for optimal path under varying costs.
    """
    start = gw.start
    goal = gw.goal
    frontier = []
    heapq.heappush(frontier, (0, start[0], start[1]))  # g, r, c
    came_from = {}
    g_score = defaultdict(lambda: float('inf'))
    g_score[start] = 0
    came_from[start] = None
    nodes_expanded = 0
    start_time = time.time()
    visited = set()
    while frontier:
        current_cost, r, c = heapq.heappop(frontier)
        current = (r, c)
        if current in visited:
            continue
        visited.add(current)
        nodes_expanded += 1
        if current == goal:
            end_time = time.time()
            # Reconstruct path
            path = []
            while current is not None:
                path.append(current)
                current = came_from[current]
            path.reverse()
            total_cost = g_score[goal]
            exec_time = end_time - start_time
            return total_cost, len(path), nodes_expanded, exec_time, path
        for nr, nc, edge_cost in gw.neighbors(r, c):
            neighbor = (nr, nc)
            tent_g = g_score[current] + edge_cost
            if tent_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tent_g
                heapq.heappush(frontier, (tent_g, nr, nc))
    return None

def a_star_search(gw):
    """
    Informed: A* with admissible heuristic.
    """
    start = gw.start
    goal = gw.goal
    h_func = lambda r, c: manhattan_heuristic(r, c, goal, gw.min_cost)
    frontier = []
    heapq.heappush(frontier, (h_func(start[0], start[1]), 0, start[0], start[1]))  # f, g, r, c
    came_from = {}
    came_from[start] = None
    g_score = defaultdict(lambda: float('inf'))
    g_score[start] = 0
    f_score = defaultdict(lambda: float('inf'))
    f_score[start] = h_func(start[0], start[1])
    nodes_expanded = 0
    start_time = time.time()
    visited = set()
    while frontier:
        _, g, r, c = heapq.heappop(frontier)
        current = (r, c)
        if current in visited:
            continue
        visited.add(current)
        nodes_expanded += 1
        if current == goal:
            end_time = time.time()
            path = []
            while current is not None:
                path.append(current)
                current = came_from[current]
            path.reverse()
            total_cost = g_score[goal]
            exec_time = end_time - start_time
            return total_cost, len(path), nodes_expanded, exec_time, path
        for nr, nc, edge_cost in gw.neighbors(r, c):
            neighbor = (nr, nc)
            tent_g = g_score[current] + edge_cost
            if tent_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tent_g
                f = tent_g + h_func(nr, nc)
                f_score[neighbor] = f
                heapq.heappush(frontier, (f, tent_g, nr, nc))
    return None

def path_cost(gw, path):
    """
    Compute total cost of a path.
    """
    if len(path) < 2:
        return 0
    total = 0
    for i in range(len(path) - 1):
        r1, c1 = path[i]
        r2, c2 = path[i + 1]
        if abs(r1 - r2) + abs(c1 - c2) != 1:  # Adjacent?
            return float('inf')
        total += gw.get_cost(r2, c2)  # Cost to enter next cell
    return total

def generate_random_path(gw, start, goal, max_length=100):
    """
    Generate initial random path for local search.
    """
    path = [start]
    current = start
    while current != goal and len(path) < max_length:
        neighbors_list = list(gw.neighbors(current[0], current[1]))
        if not neighbors_list:
            break
        nr, nc, _ = random.choice(neighbors_list)
        next_pos = (nr, nc)
        path.append(next_pos)
        current = next_pos
    if current == goal:
        return path
    else:
        if abs(current[0] - goal[0]) + abs(current[1] - goal[1]) == 1 and gw.is_valid(goal[0], goal[1]):
            path.append(goal)
            return path
    return [start]  # Fallback

def perturb_path(path):
    """
    Perturb path by random adjacent swap (hill-climb like).
    """
    if len(path) < 3:
        return path
    i = random.randint(1, len(path) - 2)
    dirs = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    dr, dc = random.choice(dirs)
    r, c = path[i]
    nr, nc = r + dr, c + dc
    if 0 <= nr < 20 and 0 <= nc < 20:  # Bound check (extend for larger grids)
        new_path = path[:i] + [(nr, nc)] + path[i + 1:]
        return new_path
    return path

def is_valid_path(gw, path):
    """
    Validate path: starts/ends correctly, adjacent, no obstacles.
    """
    if path[0] != gw.start or path[-1] != gw.goal:
        return False
    for i in range(len(path) - 1):
        r1, c1 = path[i]
        r2, c2 = path[i + 1]
        if not gw.is_valid(r1, c1) or not gw.is_valid(r2, c2):
            return False
        if abs(r1 - r2) + abs(c1 - c2) != 1:
            return False
    return True

def simulated_annealing(gw, start, goal, max_iter=500, initial_temp=50, cooling_rate=0.99):
    """
    Local search: SA for approximate optimization; replans on dynamic changes.
    """
    current_path = generate_random_path(gw, start, goal)
    current_cost = path_cost(gw, current_path)
    best_path = current_path[:]
    best_cost = current_cost
    temp = initial_temp
    nodes_expanded = 0
    start_time = time.time()
    for i in range(max_iter):
        new_path = perturb_path(current_path)
        if is_valid_path(gw, new_path):
            new_cost = path_cost(gw, new_path)
            delta = new_cost - current_cost
            if delta < 0 or random.random() < math.exp(-delta / temp):
                current_path = new_path
                current_cost = new_cost
                if new_cost < best_cost:
                    best_cost = new_cost
                    best_path = new_path[:]
        temp *= cooling_rate
        nodes_expanded += 1
    end_time = time.time()
    exec_time = end_time - start_time
    path_length = len(best_path)
    return best_cost, path_length, nodes_expanded, exec_time, best_path

def create_maps():
    """
    Generate 4 test maps (small, medium, large, dynamic=small copy).
    """
    maps = {}
    # Small (5x5 with obstacles)
    small_grid = [
        [1, 1, 1, 1, 1],
        [1, 0, 0, 0, 1],
        [1, 0, 1, 0, 1],
        [1, 0, 0, 2, 1],
        [1, 1, 1, 1, 1]
    ]
    small_start = (0, 0)
    small_goal = (4, 4)
    maps['small'] = GridWorld(small_grid, small_start, small_goal)
    # Medium (10x10)
    medium_grid = [[1 for _ in range(10)] for _ in range(10)]
    for i in range(2, 8):
        medium_grid[4][i] = 0  # Wall
    medium_grid[2][2] = 0
    medium_grid[7][5] = 0
    medium_start = (0, 0)
    medium_goal = (9, 9)
    maps['medium'] = GridWorld(medium_grid, medium_start, medium_goal)
    # Large (20x20, 100 random obstacles)
    large_grid = [[1 for _ in range(20)] for _ in range(20)]
    random.seed(42)  # Reproducible
    for _ in range(100):
        r = random.randint(1, 18)
        c = random.randint(1, 18)
        large_grid[r][c] = 0
    large_start = (0, 0)
    large_goal = (19, 19)
    maps['large'] = GridWorld(large_grid, large_start, large_goal)
    # Dynamic (copy of small; simulate moving vehicle via replan)
    dynamic_grid = [row[:] for row in small_grid]
    maps['dynamic'] = GridWorld(dynamic_grid, small_start, small_goal)
    return maps

if __name__ == "__main__":
    """
    CLI: Run all planners on all maps; outputs results.
    """
    maps = create_maps()
    for name, gw in maps.items():
        print(f"\n--- {name.upper()} Map ---")
        print(f"Size: {gw.rows}x{gw.cols}")
        # UCS
        result_ucs = uniform_cost_search(gw)
        print(f"UCS: cost={result_ucs[0] if result_ucs else 'None'}, path_len={result_ucs[1]}, nodes={result_ucs[2]}, time={result_ucs[3]:.4f}")
        # A*
        result_astar = a_star_search(gw)
        print(f"A*: cost={result_astar[0] if result_astar else 'None'}, path_len={result_astar[1]}, nodes={result_astar[2]}, time={result_astar[3]:.4f}")
        # SA (local)
        result_sa = simulated_annealing(gw, gw.start, gw.goal)
        print(f"SA: cost={result_sa[0]}, path_len={result_sa[1]}, nodes={result_sa[2]}, time={result_sa[3]:.4f}")
    # Dynamic replanning POC log
    print("\n--- Dynamic Replanning Demo ---")
    gw_dynamic = maps['dynamic']
    result_initial = a_star_search(gw_dynamic)
    print(f"Initial path using A*: {result_initial[4]}")
    path = result_initial[4]
    current_pos = path[3] if len(path) > 3 else path[-1]  # After 3 steps (time=3)
    print(f"After 3 steps, agent at: {current_pos}")
    print("Obstacle appears (e.g., moving vehicle blocks (0,4)); replan with SA:")
    new_gw = GridWorld(gw_dynamic.grid, current_pos, gw_dynamic.goal)
    # Simulate block: set grid[0][4] = 0 temporarily for replan (unpredictable case)
    new_gw.grid[0][4] = 0  # Block next cell
    result_replan = simulated_annealing(new_gw, current_pos, new_gw.goal)
    print(f"Replanned path (positions): {result_replan[4]}")
    print(f"Replan cost: {result_replan[0]}, nodes: {result_replan[2]}")


--- SMALL Map ---
Size: 5x5
UCS: cost=8, path_len=9, nodes=16, time=0.0000
A*: cost=8, path_len=9, nodes=16, time=0.0000
SA: cost=0, path_len=1, nodes=500, time=0.0000

--- MEDIUM Map ---
Size: 10x10
UCS: cost=18, path_len=19, nodes=92, time=0.0000
A*: cost=18, path_len=19, nodes=92, time=0.0010
SA: cost=0, path_len=1, nodes=500, time=0.0000

--- LARGE Map ---
Size: 20x20
UCS: cost=38, path_len=39, nodes=311, time=0.0010
A*: cost=38, path_len=39, nodes=279, time=0.0020
SA: cost=0, path_len=1, nodes=500, time=0.0000

--- DYNAMIC Map ---
Size: 5x5
UCS: cost=8, path_len=9, nodes=16, time=0.0000
A*: cost=8, path_len=9, nodes=16, time=0.0000
SA: cost=16, path_len=17, nodes=500, time=0.0071

--- Dynamic Replanning Demo ---
Initial path using A*: [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
After 3 steps, agent at: (0, 3)
Obstacle appears (e.g., moving vehicle blocks (0,4)); replan with SA:
Replanned path (positions): [(0, 3), (0, 2), (0, 3), (0, 2), (0, 3), (0, 2