In [3]:
import pandas as pd
import numpy as np
import time
import heapq
import random

# Disaster zone environment

In [16]:
class DisasterZoneEnv:
    """
    A 2D grid environment for a drone exploring a disaster zone.

    Legend (internally stored as integers):
        0 -> Empty cell
        1 -> Obstacle
        2 -> Survivor
        3 -> Recharging Point
    """

    def __init__(self, 
                 width=8, 
                 height=8, 
                 num_obstacles=5, 
                 num_survivors=3, 
                 num_resources=2, 
                 initial_energy=20, 
                 dynamic=False, 
                 predefined_grid=None, 
                 initial_position=None):
        """
        Initialize the environment. If a predefined grid is provided, use it; otherwise, generate randomly.

        :param width: Width of the grid.
        :param height: Height of the grid.
        :param num_obstacles: Number of obstacles.
        :param num_survivors: Number of survivors.
        :param num_resources: Number of recharging points.
        :param initial_energy: Initial energy of the drone.
        :param dynamic: Whether the environment is dynamic.
        :param predefined_grid: A predefined grid layout (optional).
        :param initial_position: Initial position of the drone (required if using predefined grid).
        """
        self.width = width
        self.height = height
        self.num_obstacles = num_obstacles
        self.num_survivors = num_survivors
        self.num_resources = num_resources
        self.initial_energy = initial_energy
        self.dynamic = dynamic
        self.energy = initial_energy
        self.dynamic_changes = 0  # Counter for dynamic changes

        # Use predefined grid if provided
        if predefined_grid is not None:
            if initial_position is None:
                raise ValueError("Initial position must be specified when using a predefined grid.")
            self.reset_with_scenario(predefined_grid, initial_position)
        else:
            self.reset()

    def _get_random_empty_cell(self):
        """
        Finds a random empty cell in the grid (cell == 0).
        """
        while True:
            x = random.randint(0, self.height - 1)
            y = random.randint(0, self.width - 1)
            if self.grid[x, y] == 0:  # Ensure the cell is empty
                return x, y

    def reset(self):
        """
        Resets the environment to a new random configuration.
        """
        self.grid = np.zeros((self.height, self.width), dtype=int)

        # Place obstacles
        for _ in range(self.num_obstacles):
            x, y = self._get_random_empty_cell()
            self.grid[x, y] = 1

        # Place survivors
        for _ in range(self.num_survivors):
            x, y = self._get_random_empty_cell()
            self.grid[x, y] = 2

        # Place resources
        for _ in range(self.num_resources):
            x, y = self._get_random_empty_cell()
            self.grid[x, y] = 3

        # Random start for the drone
        self.drone_x, self.drone_y = self._get_random_empty_cell()
        self.energy = self.initial_energy

    def reset_with_scenario(self, scenario, initial_position):
        """
        Loads a predefined grid into the environment and sets the drone's initial position.
        """
        scenario = np.array(scenario, dtype=int)  # Ensure the grid is a NumPy array of integers
        self.height, self.width = scenario.shape
        self.grid = scenario.copy()  # Copy the predefined grid

        # Set the drone's position
        self.drone_x, self.drone_y = initial_position
        self.energy = self.initial_energy

    def apply_dynamic_changes(self, step_count):
        """
        Applies dynamic changes to the environment if self.dynamic is True.

        :param step_count: Current step count of the simulation.
        """
        if self.dynamic:
            # Example dynamic behavior:
            # 1) Add a new obstacle every 5 steps
            if step_count % 5 == 0:
                x, y = self._get_random_empty_cell()
                self.grid[x, y] = 1
                self.dynamic_changes += 1

            # 2) Move all survivors every 3 steps
            if step_count % 3 == 0:
                survivor_positions = [(x, y) for x in range(self.height) 
                                      for y in range(self.width) if self.grid[x, y] == 2]
                for (sx, sy) in survivor_positions:
                    self.grid[sx, sy] = 0  # Remove old survivor
                    new_x, new_y = self._get_random_empty_cell()
                    self.grid[new_x, new_y] = 2

    def render(self):
        """
        Renders the grid for visualization in a text-based manner.
        """
        grid_copy = self.grid.astype(str)
        grid_copy[grid_copy == '0'] = '.'
        grid_copy[grid_copy == '1'] = '#'
        grid_copy[grid_copy == '2'] = 'S'
        grid_copy[grid_copy == '3'] = 'R'

        # Mark the drone in the visualization
        grid_copy[self.drone_x, self.drone_y] = 'D'

        for row in grid_copy:
            print(" ".join(row))
        print(f"Energy: {self.energy}\n")

# A* Search

In [13]:
import heapq

class AStarDrone:
    def __init__(self, env, initial_position):
        self.env = env
        self.position = initial_position
        self.start_position = initial_position
        self.visited = set()
        self.path = []
        self.energy = env.initial_energy
        self.steps_taken = 0
        self.survivors_rescued = 0
        self.resources_collected = 0
        self.energy_used = 0

    def move(self):
        while True:
            # Stop if energy is insufficient for further action
            if self.energy <= 0:
                print("Out of energy! Ending mission.")
                return False

            # Find the nearest target (survivor or recharge point)
            target = self.find_nearest_target()
            if not target:
                print("No reachable targets left. Ending mission.")
                return False

            # Use A* to move towards the target
            next_move = self.a_star_search(self.position, target)
            if next_move:
                action = self.calculate_action(self.position, next_move)
                self.position = next_move
                self.visited.add(self.position)
                self.path.append(self.position)

                # Deduct energy for the move
                self.energy -= 1
                self.energy_used += 1
                self.steps_taken += 1

                # Apply dynamic changes to the environment
                self.env.apply_dynamic_changes(self.steps_taken)

                # Check and handle grid value
                current_cell = self.env.grid[self.position[0]][self.position[1]]
                if current_cell == 2:  # Rescue survivor
                    self.survivors_rescued += 1
                    self.env.grid[self.position[0]][self.position[1]] = 0
                    print(f"Survivor rescued at {self.position}")
                elif current_cell == 3:  # Collect resource (recharge)
                    self.resources_collected += 1
                    self.env.grid[self.position[0]][self.position[1]] = 0
                    self.energy = min(self.energy + 5, self.env.initial_energy)
                    print(f"Recharged at {self.position}. Current energy: {self.energy}")
            else:
                print("Unable to proceed further. Ending mission.")
                return False

    def find_nearest_target(self):
        """Find the nearest survivor (2) or recharge point (3)."""
        targets = []
        for x in range(self.env.height):
            for y in range(self.env.width):
                if self.env.grid[x][y] in [2, 3]:
                    targets.append((x, y))
        
        if not targets:
            return None  # No targets left

        # Find the closest target based on Manhattan distance
        return min(targets, key=lambda t: self.heuristic(self.position, t))

    def a_star_search(self, start, goal):
        """Perform A* search to find the best path from start to goal."""
        open_list = []
        heapq.heappush(open_list, (0, start))
        came_from = {}
        g_score = {start: 0}
        f_score = {start: self.heuristic(start, goal)}
        visited = set()

        while open_list:
            _, current = heapq.heappop(open_list)
            if current == goal:
                return self.reconstruct_path(came_from, current)
            
            visited.add(current)

            for neighbor in self.get_neighbors(current):
                if neighbor in visited or self.env.grid[neighbor[0]][neighbor[1]] == 1:
                    continue

                tentative_g_score = g_score[current] + 1
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + self.heuristic(neighbor, goal)
                    heapq.heappush(open_list, (f_score[neighbor], neighbor))
        
        return None

    def get_neighbors(self, position):
        """Get valid neighboring cells."""
        x, y = position
        neighbors = []
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        for dx, dy in directions:
            nx, ny = x + dx, y + dy
            if 0 <= nx < self.env.height and 0 <= ny < self.env.width:
                if self.env.grid[nx][ny] != 1:
                    neighbors.append((nx, ny))
        return neighbors

    def heuristic(self, position, goal):
        """Heuristic prioritizing Manhattan distance with survivor proximity."""
        dist = abs(position[0] - goal[0]) + abs(position[1] - goal[1])
        # Favor survivors slightly over recharge points
        grid_value = self.env.grid[goal[0]][goal[1]]
        return dist - (5 if grid_value == 2 else 0)

    def reconstruct_path(self, came_from, current):
        """Reconstruct the path to the goal."""
        path = []
        while current in came_from:
            path.append(current)
            current = came_from[current]
        path.reverse()
        return path[0] if path else None

    def calculate_action(self, current, next_position):
        """Determine the action direction."""
        dx, dy = next_position[0] - current[0], next_position[1] - current[1]
        if dx == -1: return 'UP'
        if dx == 1: return 'DOWN'
        if dy == -1: return 'LEFT'
        if dy == 1: return 'RIGHT'
        raise ValueError("Invalid move.")

# Testing code

In [23]:
import time

class ScenarioTester:
    def __init__(self, agent_class, env_params):
        """
        Initialize the tester with the agent class and environment parameters.

        :param agent_class: The agent class to be tested (e.g., AStarDrone).
        :param env_params: Dictionary of environment parameters.
        """
        self.agent_class = agent_class
        self.env_params = env_params
        self.results = []  # Store results for each scenario
        
    def get_initial_position(self, grid):
        """
        Extract the initial position (marked as the first empty cell, 0, in the grid).
        
        :param grid: The predefined grid scenario.
        :return: A tuple (x, y) of the initial position.
        """
        for x in range(len(grid)):
            for y in range(len(grid[0])):
                if grid[x][y] == 0:  # Assuming the drone starts at the first empty cell
                    return (x, y)
        raise ValueError("The grid must contain at least one empty cell (0) for the drone's starting position.")

    def run_scenario(self, scenario, scenario_id):
        """
        Run a predefined scenario with the given grid and parameters.

        :param scenario: The grid representing the scenario.
        :param scenario_id: A unique identifier for the scenario.
        """
        # Get the drone's initial position
        initial_position = self.get_initial_position(scenario)

        # Create environment
        env = DisasterZoneEnv(
            predefined_grid=scenario,
            initial_energy=self.env_params.get("initial_energy", 20),
            dynamic=self.env_params.get("dynamic", False),
            initial_position=initial_position  # Pass the initial position here
        )

        # Initialize the agent (no fixed goal as the drone dynamically adapts)
        agent = self.agent_class(env, initial_position)

        # Measure execution time
        start_time = time.time()

        while True:
            # Capture time at the start of each loop iteration to measure per-loop performance
            loop_start_time = time.time()

            # Perform operations (dynamic changes, movement, etc.)
            if self.env_params.get("dynamic", False):
                self.apply_dynamic_changes(env, agent)

            if agent.energy <= 0 or not agent.move():
                print("Terminating scenario: Out of energy or no valid moves remaining.")
                break

            # Optionally, render the environment for visualization
            env.render()

            # Log the agent's current state
            print(f"Agent at position {agent.position}, Energy: {agent.energy}")
            print(f"Survivors Rescued: {agent.survivors_rescued}, Resources Collected: {agent.resources_collected}")

            # Measure loop duration
            loop_end_time = time.time()
            loop_duration = loop_end_time - loop_start_time
            print(f"Loop time: {loop_duration:.6f} seconds")

        # Final computation time
        end_time = time.time()
        computation_time = end_time - start_time

        # Record scenario results
        self.results.append({
            "Agent Name": self.agent_class.__name__,
            "Scenario ID": scenario_id,
            "Steps Taken": agent.steps_taken,
            "Survivors Rescued": agent.survivors_rescued,
            "Resources Collected": agent.resources_collected,
            "Energy Used": self.env_params.get("initial_energy", 20) - agent.energy,
            "Computation Time (s)": computation_time
        })

        print(f"Scenario {scenario_id} complete. Results:")
        print(f"Survivors Rescued: {agent.survivors_rescued}, Resources Collected: {agent.resources_collected}")
        print(f"Steps Taken: {agent.steps_taken}, Energy Used: {self.env_params.get('initial_energy', 20) - agent.energy}")
        print(f"Computation Time: {computation_time:.6f} seconds")

    def apply_dynamic_changes(self, env, agent):
        """
        Apply dynamic changes to the environment. This can include changing obstacles,
        altering resources, or reducing energy levels during the scenario.

        :param env: The environment in which the agent is operating.
        :param agent: The agent whose state may be affected by dynamic changes.
        """
        print("Applying dynamic changes to the environment...")

        # Example: Randomly change an obstacle in the environment grid
        import random
        x, y = random.randint(0, len(env.grid)-1), random.randint(0, len(env.grid[0])-1)
        env.grid[x][y] = random.choice([1, 2, 3])  # 1, 2, or 3 represent different obstacles

        # Example: Random energy reduction
        energy_decrease = random.randint(1, 3)
        agent.energy -= energy_decrease
        print(f"Energy decreased by {energy_decrease}. Current energy: {agent.energy}")

    def display_results(self):
        """
        Display results for all tested scenarios in a formatted table.
        """
        if not self.results:
            print("No scenarios have been tested yet.")
            return

        # Print results table header
        print("=" * 100)
        print(f"{'Agent Name':<15} {'Scenario ID':<15} {'Steps Taken':<15} {'Survivors Rescued':<20} {'Resources Collected':<20} {'Energy Used':<15} {'Computation Time (s)':<20}")
        print("=" * 100)

        # Print results row by row
        for result in self.results:
            print(f"{result['Agent Name']:<15} {result['Scenario ID']:<15} {result['Steps Taken']:<15} {result['Survivors Rescued']:<20} {result['Resources Collected']:<20} {result['Energy Used']:<15} {result['Computation Time (s)']:<20.6f}")
        print("=" * 100)

In [24]:
if __name__ == "__main__":
    # Agent class (replace with the actual agent class)
    agent_class = AStarDrone  # The updated drone class with enhanced A* implementation

    # Environment parameters (adjust as needed)
    env_params = {
        "initial_energy": 20,  # Initial energy for the drone
        "dynamic": True        # Enable dynamic changes in the environment
    }

    # Define different scenarios (predefined grid configurations)
    scenario_1 = [
        [0, 0, 0, 0, 1],
        [0, 1, 0, 2, 0],
        [0, 0, 3, 0, 0],
        [1, 0, 0, 0, 0],
        [0, 0, 0, 1, 0]
    ]

    scenario_2 = [
        [0, 0, 1, 2],
        [0, 1, 0, 0],
        [3, 0, 1, 0],
        [0, 0, 0, 0]
    ]


    # Initialize the ScenarioTester with the agent class and environment parameters
    tester = ScenarioTester(agent_class, env_params)

    # Run each scenario with a unique identifier
    tester.run_scenario(scenario_1, "Scenario 1")
    tester.run_scenario(scenario_2, "Scenario 2")

    # Display the aggregated results for all scenarios
    tester.display_results()

Applying dynamic changes to the environment...
Energy decreased by 1. Current energy: 19
Survivor rescued at (0, 3)
Recharged at (2, 2). Current energy: 18
Recharged at (4, 2). Current energy: 20
No reachable targets left. Ending mission.
Terminating scenario: Out of energy or no valid moves remaining.
Scenario Scenario 1 complete. Results:
Survivors Rescued: 1, Resources Collected: 2
Steps Taken: 8, Energy Used: 0
Computation Time: 0.002693 seconds
Applying dynamic changes to the environment...
Energy decreased by 2. Current energy: 18
Recharged at (2, 0). Current energy: 20
Survivor rescued at (3, 0)
Unable to proceed further. Ending mission.
Terminating scenario: Out of energy or no valid moves remaining.
Scenario Scenario 2 complete. Results:
Survivors Rescued: 1, Resources Collected: 1
Steps Taken: 6, Energy Used: 4
Computation Time: 0.001502 seconds
Agent Name      Scenario ID     Steps Taken     Survivors Rescued    Resources Collected  Energy Used     Computation Time (s)
AStar