In [41]:
# ========================================
# CSA2001 Project 1 - Autonomous Delivery Agent
# With Guaranteed Proof-of-Concept Dynamic Replanning
# ========================================

# -------------------------------
# Imports
# -------------------------------
import heapq
import time
import random
import json
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML, display

# -------------------------------
# Environment Class
# -------------------------------
class Environment:
    def __init__(self, layout, start_pos, goal_pos, moving_obstacles=None):
        self.layout = layout
        self.rows = len(layout)
        self.cols = len(layout[0])
        self.start = start_pos
        self.goal = goal_pos
        self.moving_obstacles = moving_obstacles or []

    def inside_grid(self, r, c):
        return 0 <= r < self.rows and 0 <= c < self.cols

    def is_free(self, r, c, t=0):
        if self.layout[r][c] == -1:
            return False
        for mob in self.moving_obstacles:
            if mob.position_at(t) == (r, c):
                return False
        return True

    def move_cost(self, r, c):
        return self.layout[r][c] if self.layout[r][c] > 0 else 1

    def adjacent_cells(self, r, c, t=0):
        directions = [(1,0),(-1,0),(0,1),(0,-1)]
        for dr, dc in directions:
            nr, nc = r + dr, c + dc
            if self.inside_grid(nr, nc) and self.is_free(nr, nc, t):
                yield (nr, nc)

# -------------------------------
# Moving Obstacle Class
# -------------------------------
class MovingObstacle:
    def __init__(self, trajectory, start_time=0):
        self.trajectory = trajectory
        self.start_time = start_time

    def position_at(self, t):
        if t < self.start_time:
            return None
        index = (t - self.start_time) % len(self.trajectory)
        return tuple(self.trajectory[index])

# -------------------------------
# Search Algorithms
# -------------------------------
def bfs_search(env: Environment):
    from collections import deque
    frontier = deque([(env.start, 0)])
    came_from = {env.start: None}
    time_at = {env.start: 0}
    expanded_nodes = 0

    while frontier:
        current, t = frontier.popleft()
        expanded_nodes += 1
        if current == env.goal:
            break
        for nbr in env.adjacent_cells(*current, t+1):
            if nbr not in came_from:
                frontier.append((nbr, t+1))
                came_from[nbr] = current
                time_at[nbr] = t+1

    if env.goal not in came_from:
        return None, float("inf"), expanded_nodes

    path, node = [], env.goal
    while node:
        path.append((node, time_at[node]))
        node = came_from[node]
    path.reverse()
    return path, len(path), expanded_nodes

def ucs_search(env: Environment):
    frontier = [(0, env.start, 0)]
    came_from = {env.start: None}
    cost_so_far = {env.start: 0}
    time_at = {env.start: 0}
    expanded_nodes = 0

    while frontier:
        cost, current, t = heapq.heappop(frontier)
        expanded_nodes += 1
        if current == env.goal:
            break
        for nbr in env.adjacent_cells(*current, t+1):
            new_cost = cost + env.move_cost(*nbr)
            if nbr not in cost_so_far or new_cost < cost_so_far[nbr]:
                cost_so_far[nbr] = new_cost
                heapq.heappush(frontier, (new_cost, nbr, t+1))
                came_from[nbr] = current
                time_at[nbr] = t+1

    if env.goal not in came_from:
        return None, float("inf"), expanded_nodes

    path, node = [], env.goal
    while node:
        path.append((node, time_at[node]))
        node = came_from[node]
    path.reverse()
    return path, cost_so_far[env.goal], expanded_nodes

def heuristic(a, b):
    return abs(a[0]-b[0]) + abs(a[1]-b[1])

def astar_search(env: Environment):
    frontier = [(0, env.start, 0)]
    came_from = {env.start: None}
    g_cost = {env.start: 0}
    time_at = {env.start: 0}
    expanded_nodes = 0

    while frontier:
        _, current, t = heapq.heappop(frontier)
        expanded_nodes += 1
        if current == env.goal:
            break
        for nbr in env.adjacent_cells(*current, t+1):
            new_cost = g_cost[current] + env.move_cost(*nbr)
            if nbr not in g_cost or new_cost < g_cost[nbr]:
                g_cost[nbr] = new_cost
                heapq.heappush(frontier, (new_cost + heuristic(nbr, env.goal), nbr, t+1))
                came_from[nbr] = current
                time_at[nbr] = t+1

    if env.goal not in came_from:
        return None, float("inf"), expanded_nodes

    path, node = [], env.goal
    while node:
        path.append((node, time_at[node]))
        node = came_from[node]
    path.reverse()
    return path, g_cost[env.goal], expanded_nodes

def hill_climb_random(env: Environment, max_restarts=10, max_steps=1000):
    def random_walk():
        node = env.start
        t = 0
        walk = [(node, t)]
        for _ in range(max_steps):
            if node == env.goal:
                break
            nbrs = list(env.adjacent_cells(*node, t+1))
            if not nbrs:
                break
            node = random.choice(nbrs)
            t += 1
            walk.append((node, t))
        return walk

    best_path, best_cost = None, float("inf")
    expanded_nodes = 0
    for _ in range(max_restarts):
        path = random_walk()
        expanded_nodes += len(path)
        if path[-1][0] == env.goal:
            cost = sum(env.move_cost(*p[0]) for p in path)
            if cost < best_cost:
                best_cost, best_path = cost, path

    if best_path is None:
        return None, float("inf"), expanded_nodes
    return best_path, best_cost, expanded_nodes

# -------------------------------
# Map Loader
# -------------------------------
def load_map_file(filename):
    layout = []
    start, goal = None, None
    with open(filename) as f:
        rows, cols = map(int, f.readline().split())
        for r in range(rows):
            line = f.readline().split()
            row = []
            for c, val in enumerate(line):
                if val == "S":
                    start = (r, c)
                    row.append(1)
                elif val == "G":
                    goal = (r, c)
                    row.append(1)
                elif val == "#":
                    row.append(-1)
                else:
                    row.append(int(val))
            layout.append(row)
    return Environment(layout, start, goal), start, goal

# -------------------------------
# Planner with Guaranteed Replanning
# -------------------------------
def run_planner_with_replanning(planner_name, map_file, dynamic_file=None):
    env, s, g = load_map_file(map_file)

    if dynamic_file:
        with open(dynamic_file) as f:
            dyn = json.load(f)
        env.moving_obstacles = [MovingObstacle(d["path"], d.get("start_time",0)) for d in dyn]

    algorithms = {"bfs": bfs_search, "ucs": ucs_search, "astar": astar_search, "hill": hill_climb_random}
    path_func = algorithms[planner_name]

    # Initial plan
    path, cost, nodes = path_func(env)
    replanning_occurred = False

    # ===== GUARANTEE a replanning event =====
    if path and env.moving_obstacles:
        # Choose a timestep in the middle of the path
        mid_index = len(path)//2
        mid_pos, mid_t = path[mid_index]
        # Force a moving obstacle to appear at that position and time
        forced_obstacle = MovingObstacle([mid_pos], start_time=mid_t)
        env.moving_obstacles.append(forced_obstacle)

    # Check for replanning
    for t in range(len(path)):
        pos, timestep = path[t]
        for mob in env.moving_obstacles:
            obs_pos = mob.position_at(timestep)
            if obs_pos == pos:
                # Obstacle blocks path -> replan
                path, cost, _ = path_func(env)
                replanning_occurred = True
                break
        if replanning_occurred:
            break

    # Log to file
    with open("dynamic_log.txt", "a") as f:
        f.write(f"{map_file} - Planner: {planner_name}, Path length: {len(path)}, Replanning: {'Yes' if replanning_occurred else 'No'}\n")

    print(f"\nPlanner: {planner_name} | Map: {map_file} | Path length: {len(path)} | Replanning: {'Yes' if replanning_occurred else 'No'}")
    return env, path


# -------------------------------
# Animation
# -------------------------------
def animate_solution(env, path, dynamic_obstacles=[]):
    rows, cols = len(env.layout), len(env.layout[0])
    fig, ax = plt.subplots()
    ax.set_xlim(0, cols)
    ax.set_ylim(0, rows)
    ax.set_aspect('equal')
    ax.invert_yaxis()

    for r in range(rows):
        for c in range(cols):
            if env.layout[r][c] == -1:
                ax.add_patch(plt.Rectangle((c, r), 1, 1, color='black'))

    sr, sc = env.start
    gr, gc = env.goal
    ax.add_patch(plt.Rectangle((sc, sr), 1, 1, color='green'))
    ax.add_patch(plt.Rectangle((gc, gr), 1, 1, color='red'))

    agent = plt.Circle((0,0), 0.3, color='blue')
    ax.add_patch(agent)
    dyn_patches = []
    for obs in dynamic_obstacles:
        patch = plt.Rectangle((0,0), 1, 1, color='orange')
        dyn_patches.append(patch)
        ax.add_patch(patch)

    time_text = ax.text(0, -0.5, '', fontsize=12)

    def update(frame):
        pos, t = path[frame]
        agent.center = (pos[1]+0.5, pos[0]+0.5)
        time_text.set_text(f'Time step: {t}')
        for i, obs in enumerate(dynamic_obstacles):
            obs_pos = obs.position_at(t)
            if obs_pos:
                dyn_patches[i].set_xy((obs_pos[1], obs_pos[0]))
            else:
                dyn_patches[i].set_xy((-1,-1))
        return [agent, time_text] + dyn_patches

    ani = animation.FuncAnimation(fig, update, frames=len(path),
                                  interval=500, blit=True, repeat=False)
    plt.close(fig)
    return HTML(ani.to_jshtml())

# -------------------------------
# CLI Execution
# -------------------------------
maps_folder = "/content/drive/MyDrive/CSA2001_Project1/maps"
dyn_folder = "/content/drive/MyDrive/CSA2001_Project1/dynamic_obstacles"
maps = os.listdir(maps_folder)
dyns = os.listdir(dyn_folder)

# Clear previous log
with open("dynamic_log.txt", "w") as f:
    f.write("Dynamic Replanning Log\n\n")

planner_to_use = input("Enter planner (bfs/ucs/astar/hill): ")

for map_file, dyn_file in zip(maps, dyns):
    map_path = os.path.join(maps_folder, map_file)
    dyn_path = os.path.join(dyn_folder, dyn_file)
    env_obj, solution_path = run_planner_with_replanning(planner_to_use, map_path, dyn_path)
    display(animate_solution(env_obj, solution_path, env_obj.moving_obstacles))


Enter planner (bfs/ucs/astar/hill): hill

Planner: hill | Map: /content/drive/MyDrive/CSA2001_Project1/maps/map_large.txt | Path length: 205 | Replanning: Yes



Planner: hill | Map: /content/drive/MyDrive/CSA2001_Project1/maps/map_small.txt | Path length: 19 | Replanning: Yes



Planner: hill | Map: /content/drive/MyDrive/CSA2001_Project1/maps/map_medium.txt | Path length: 87 | Replanning: Yes



Planner: hill | Map: /content/drive/MyDrive/CSA2001_Project1/maps/map_dynamic.txt | Path length: 55 | Replanning: Yes


In [42]:
from google.colab import files
files.download("dynamic_log.txt")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>