In [1]:
import gym
from gym import spaces
import numpy as np

class GridCity(gym.Env):
    def __init__(self, width=10, height=10):
        self.width = width
        self.height = height
        self.bot_position = [0, 0]  # Initial position
        self.destination = [width-1, height-1]  # Target destination
        self.traffic_status = np.zeros((width, height))  # Traffic status grid
        self.other_agents = []  # List of other agents' positions
        
        # Define action and observation spaces
        self.action_space = spaces.Discrete(5)  # Up, Down, Left, Right, Stay
        self.observation_space = spaces.Box(low=0, high=max(width, height), shape=(6,), dtype=np.uint8)

    def reset(self, seed=None, options=None):
        self.bot_position = [0, 0]
        self.destination = [self.width-1, self.height-1]
        self.traffic_status = np.random.randint(0, 3, size=(self.width, self.height))  # Random traffic conditions (0=clear, 2=congested)
        self.other_agents = [[np.random.randint(self.width), np.random.randint(self.height)] for _ in range(5)]  # Random positions of other agents
        observation = self._get_observation()
        info = {}  # Additional information
        return observation, info

    def step(self, action):
        reward = 0
        done = False
        truncated = False

        # Update bot position based on action
        if action == 0:  # Move Up
            self.bot_position[1] = min(self.bot_position[1] + 1, self.height - 1)
        elif action == 1:  # Move Down
            self.bot_position[1] = max(self.bot_position[1] - 1, 0)
        elif action == 2:  # Move Left
            self.bot_position[0] = max(self.bot_position[0] - 1, 0)
        elif action == 3:  # Move Right
            self.bot_position[0] = min(self.bot_position[0] + 1, self.width - 1)

        # Check if destination reached
        if self.bot_position == self.destination:
            reward += 10
            done = True

        # Check for collisions with other agents
        if any(np.array_equal(self.bot_position, agent_pos) for agent_pos in self.other_agents):
            reward -= 5

        # Penalize entering congested areas
        if self.traffic_status[self.bot_position[0], self.bot_position[1]] == 2:
            reward -= 2

        # Reward congestion avoidance and route efficiency
        distance_to_destination = abs(self.bot_position[0] - self.destination[0]) + abs(self.bot_position[1] - self.destination[1])
        reward += -(distance_to_destination / (self.width + self.height))

        observation = self._get_observation()
        info = {}
        
        return observation, reward, done, truncated, info

    def _get_observation(self):
        observation = np.array([
            *self.bot_position,
            *self.destination,
            int(any(np.array_equal(self.bot_position, agent_pos) for agent_pos in self.other_agents)),
            int(self.traffic_status[self.bot_position[0], self.bot_position[1]])
        ])
        return observation

# Example usage
env = GridCity()
observation, info = env.reset()
done = False

while not done:
    action = env.action_space.sample()  # Random action for demonstration
    observation, reward, done, truncated, info = env.step(action)
    print(f"Action: {action}, Reward: {reward}, Done: {done}, Truncated: {truncated}")


Action: 4, Reward: -0.9, Done: False, Truncated: False
Action: 0, Reward: -2.85, Done: False, Truncated: False
Action: 1, Reward: -0.9, Done: False, Truncated: False
Action: 0, Reward: -2.85, Done: False, Truncated: False
Action: 2, Reward: -2.85, Done: False, Truncated: False
Action: 3, Reward: -0.8, Done: False, Truncated: False
Action: 4, Reward: -0.8, Done: False, Truncated: False
Action: 0, Reward: -0.75, Done: False, Truncated: False
Action: 0, Reward: -0.7, Done: False, Truncated: False
Action: 2, Reward: -0.75, Done: False, Truncated: False
Action: 1, Reward: -0.8, Done: False, Truncated: False
Action: 2, Reward: -0.8, Done: False, Truncated: False
Action: 3, Reward: -0.75, Done: False, Truncated: False
Action: 3, Reward: -0.7, Done: False, Truncated: False
Action: 2, Reward: -0.75, Done: False, Truncated: False
Action: 2, Reward: -0.8, Done: False, Truncated: False
Action: 2, Reward: -0.8, Done: False, Truncated: False
Action: 1, Reward: -2.85, Done: False, Truncated: False
Ac

In [2]:
import heapq

def dijkstra(grid, start, end):
    width, height = len(grid[0]), len(grid)
    distances = [[float('inf')] * width for _ in range(height)]
    distances[start[1]][start[0]] = 0
    priority_queue = [(0, start)]
    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]

    while priority_queue:
        current_distance, current_position = heapq.heappop(priority_queue)
        
        if current_position == end:
            return current_distance

        for direction in directions:
            neighbor = [current_position[0] + direction[0], current_position[1] + direction[1]]
            if 0 <= neighbor[0] < width and 0 <= neighbor[1] < height:
                new_distance = current_distance + grid[neighbor[1]][neighbor[0]]
                if new_distance < distances[neighbor[1]][neighbor[0]]:
                    distances[neighbor[1]][neighbor[0]] = new_distance
                    heapq.heappush(priority_queue, (new_distance, neighbor))

    return float('inf')  # If no path is found

# Example usage
traffic_status = np.random.randint(0, 3, size=(10, 10))  # Example grid
start = [0, 0]
end = [9, 9]
shortest_path_cost = dijkstra(traffic_status, start, end)
print(f"Shortest path cost: {shortest_path_cost}")


Shortest path cost: 8


In [3]:
import heapq

def a_star(grid, start, end):
    width, height = len(grid[0]), len(grid)
    open_set = []
    heapq.heappush(open_set, (0, start))
    came_from = {}
    g_score = {tuple(start): 0}
    
    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]

    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])  # Manhattan distance heuristic
    
    f_score = {tuple(start): heuristic(start, end)}
    
    while open_set:
        _, current = heapq.heappop(open_set)

        if current == end:
            return reconstruct_path(came_from, current)

        for direction in directions:
            neighbor = [current[0] + direction[0], current[1] + direction[1]]
            if 0 <= neighbor[0] < width and 0 <= neighbor[1] < height:
                tentative_g_score = g_score[tuple(current)] + grid[neighbor[1]][neighbor[0]]

                if tuple(neighbor) not in g_score or tentative_g_score < g_score[tuple(neighbor)]:
                    came_from[tuple(neighbor)] = current
                    g_score[tuple(neighbor)] = tentative_g_score
                    f_score[tuple(neighbor)] = tentative_g_score + heuristic(neighbor, end)
                    heapq.heappush(open_set, (f_score[tuple(neighbor)], neighbor))

    return None  # If no path is found

def reconstruct_path(came_from, current):
    path = [current]
    while tuple(current) in came_from:
        current = came_from[tuple(current)]
        path.append(current)
    path.reverse()
    return path

# Example usage
traffic_status = np.random.randint(0, 3, size=(10, 10))  # Example grid
start = [0, 0]
end = [9, 9]
shortest_path = a_star(traffic_status, start, end)
print(f"Shortest path: {shortest_path}")


Shortest path: [[0, 0], [0, 1], [0, 2], [0, 3], [0, 4], [1, 4], [1, 5], [1, 6], [2, 6], [3, 6], [4, 6], [5, 6], [6, 6], [7, 6], [7, 7], [7, 8], [7, 9], [8, 9], [9, 9]]


In [4]:
import gym
from gym import spaces
import numpy as np
import heapq

def heuristic(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])  # Manhattan distance heuristic

def a_star(grid, start, end):
    width, height = len(grid[0]), len(grid)
    open_set = []
    heapq.heappush(open_set, (0, start))
    came_from = {}
    g_score = {tuple(start): 0}
    f_score = {tuple(start): heuristic(start, end)}

    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]

    while open_set:
        _, current = heapq.heappop(open_set)

        if current == end:
            return reconstruct_path(came_from, current)

        for direction in directions:
            neighbor = [current[0] + direction[0], current[1] + direction[1]]
            if 0 <= neighbor[0] < width and 0 <= neighbor[1] < height:
                tentative_g_score = g_score[tuple(current)] + grid[neighbor[1]][neighbor[0]]

                if tuple(neighbor) not in g_score or tentative_g_score < g_score[tuple(neighbor)]:
                    came_from[tuple(neighbor)] = current
                    g_score[tuple(neighbor)] = tentative_g_score
                    f_score[tuple(neighbor)] = tentative_g_score + heuristic(neighbor, end)
                    heapq.heappush(open_set, (f_score[tuple(neighbor)], neighbor))

    return None  # If no path is found

def reconstruct_path(came_from, current):
    path = [current]
    while tuple(current) in came_from:
        current = came_from[tuple(current)]
        path.append(current)
    path.reverse()
    return path

class GridCity(gym.Env):
    def __init__(self, width=10, height=10):
        self.width = width
        self.height = height
        self.bot_position = [0, 0]  # Initial position
        self.destination = [width-1, height-1]  # Target destination
        self.traffic_status = np.zeros((width, height))  # Traffic status grid
        self.other_agents = []  # List of other agents' positions
        
        # Define action and observation spaces
        self.action_space = spaces.Discrete(5)  # Up, Down, Left, Right, Stay
        self.observation_space = spaces.Box(low=0, high=max(width, height), shape=(6,), dtype=np.uint8)
    
    def calculate_shortest_path(self):
        return a_star(self.traffic_status, self.bot_position, self.destination)


    def reset(self, seed=None, options=None):
        self.bot_position = [0, 0]
        self.destination = [self.width-1, self.height-1]
        self.traffic_status = np.random.randint(0, 3, size=(self.width, self.height))  # Random traffic conditions (0=clear, 2=congested)
        self.other_agents = [[np.random.randint(self.width), np.random.randint(self.height)] for _ in range(5)]  # Random positions of other agents
        observation = self._get_observation()
        self.shortest_path = self.calculate_shortest_path()
        info = {}  # Additional information
        return observation, info

    def step(self, action):
        reward = 0
        done = False
        truncated = False

        if self.shortest_path and len(self.shortest_path) > 1:
            next_position = self.shortest_path[1]
            # Update bot position based on next position in path
            if next_position[0] > self.bot_position[0]:
                action = 3  # Move Right
            elif next_position[0] < self.bot_position[0]:
                action = 2  # Move Left
            elif next_position[1] > self.bot_position[1]:
                action = 0  # Move Up
            elif next_position[1] < self.bot_position[1]:
                action = 1  # Move Down
            else:
                action = 4  # Stay

        # Update bot position based on action
        if action == 0:  # Move Up
            self.bot_position[1] = min(self.bot_position[1] + 1, self.height - 1)
        elif action == 1:  # Move Down
            self.bot_position[1] = max(self.bot_position[1] - 1, 0)
        elif action == 2:  # Move Left
            self.bot_position[0] = max(self.bot_position[0] - 1, 0)
        elif action == 3:  # Move Right
            self.bot_position[0] = min(self.bot_position[0] + 1, self.width - 1)

        # Update shortest path if necessary
        if self.shortest_path and self.bot_position == self.shortest_path[1]:
            self.shortest_path.pop(0)

        # Check if destination reached
        if self.bot_position == self.destination:
            reward += 10
            done = True

        # Check for collisions with other agents
        if any(np.array_equal(self.bot_position, agent_pos) for agent_pos in self.other_agents):
            reward -= 5

        # Penalize entering congested areas
        if self.traffic_status[self.bot_position[0], self.bot_position[1]] == 2:
            reward -= 2

        # Reward congestion avoidance and route efficiency
        distance_to_destination = abs(self.bot_position[0] - self.destination[0]) + abs(self.bot_position[1] - self.destination[1])
        reward += -(distance_to_destination / (self.width + self.height))

        observation = self._get_observation()
        info = {}
        
        return observation, reward, done, truncated, info

    def _get_observation(self):
        observation = np.array([
            *self.bot_position,
            *self.destination,
            int(any(np.array_equal(self.bot_position, agent_pos) for agent_pos in self.other_agents)),
            int(self.traffic_status[self.bot_position[0], self.bot_position[1]])
        ])
        return observation

# Example usage
env = GridCity()
observation, info = env.reset()
done = False

while not done:
    action = env.action_space.sample()  # Random action for demonstration
    observation, reward, done, truncated, info = env.step(action)
    print(f"Action: {action}, Reward: {reward}, Done: {done}, Truncated: {truncated}")


Action: 2, Reward: -0.85, Done: False, Truncated: False
Action: 2, Reward: -0.8, Done: False, Truncated: False
Action: 3, Reward: -0.75, Done: False, Truncated: False
Action: 4, Reward: -0.7, Done: False, Truncated: False
Action: 3, Reward: -0.65, Done: False, Truncated: False
Action: 2, Reward: -0.6, Done: False, Truncated: False
Action: 1, Reward: -2.55, Done: False, Truncated: False
Action: 2, Reward: -0.5, Done: False, Truncated: False
Action: 4, Reward: -2.45, Done: False, Truncated: False
Action: 2, Reward: -0.4, Done: False, Truncated: False
Action: 2, Reward: -2.35, Done: False, Truncated: False
Action: 2, Reward: -0.3, Done: False, Truncated: False
Action: 0, Reward: -0.25, Done: False, Truncated: False
Action: 0, Reward: -0.2, Done: False, Truncated: False
Action: 4, Reward: -0.15, Done: False, Truncated: False
Action: 2, Reward: -0.1, Done: False, Truncated: False
Action: 3, Reward: -0.05, Done: False, Truncated: False
Action: 3, Reward: 10.0, Done: True, Truncated: False


In [17]:
import numpy as np
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from gym import spaces

class GridCityMultiAgent(AECEnv):
    def __init__(self, width=10, height=10, num_agents=2):
        self.width = width
        self.height = height
        #self.num_agents = num_agents  # Uncommented line
        self.agents = [f"agent_{i}" for i in range(num_agents)]
        self.possible_agents = self.agents[:]
        self.agent_name_mapping = {i: f"agent_{i}" for i in range(num_agents)}
        self.bot_positions = [[0, 0] for _ in range(num_agents)]  # Initial positions
        self.destination = [width-1, height-1]  # Target destination
        self.traffic_status = np.zeros((width, height))  # Traffic status grid
        self.other_agents = []  # List of other agents' positions
        
        # Define action and observation spaces for each agent
        self.action_spaces = {agent: spaces.Discrete(5) for agent in self.agents}  # Up, Down, Left, Right, Stay
        self.observation_spaces = {agent: spaces.Box(low=0, high=max(width, height), shape=(6,), dtype=np.uint8) for agent in self.agents}
        
        # Initialize agent selector
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        
        # Initialize terminations, truncations, rewards, and cumulative rewards
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}

    def reset(self, seed=None, options=None):
        self.bot_positions = [[0, 0] for _ in range(self.num_agents)]
        self.destination = [self.width-1, self.height-1]
        self.traffic_status = np.random.randint(0, 3, size=(self.width, self.height))  # Random traffic conditions (0=clear, 2=congested)
        self.other_agents = [[np.random.randint(self.width), np.random.randint(self.height)] for _ in range(5)]  # Random positions of other agents
        self.agents = self.possible_agents[:]
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.observations = self._get_observations()
        return self.observations

    def _get_observations(self):
        observations = {}
        for agent in self.agents:
            agent_index = self.agents.index(agent)
            observation = np.array([
                *self.bot_positions[agent_index],
                *self.destination,
                int(any(np.array_equal(self.bot_positions[agent_index], pos) for pos in [p for i, p in enumerate(self.bot_positions) if i != agent_index] + self.other_agents)),
                int(self.traffic_status[self.bot_positions[agent_index][0], self.bot_positions[agent_index][1]])
            ])
            observations[agent] = observation
        return observations

    def observe(self, agent: str) -> np.ndarray:
        agent_index = self.agents.index(agent)
        return np.array([
            *self.bot_positions[agent_index],
            *self.destination,
            int(any(np.array_equal(self.bot_positions[agent_index], pos) for pos in [p for i, p in enumerate(self.bot_positions) if i != agent_index] + self.other_agents)),
            int(self.traffic_status[self.bot_positions[agent_index][0], self.bot_positions[agent_index][1]])
        ])

    def step(self, action):
        if self.terminations[self.agent_selection] or self.truncations[self.agent_selection]:
            self._was_done_step(action)
            return
        
        agent_index = self.agents.index(self.agent_selection)
        old_position = self.bot_positions[agent_index][:]
        new_position = old_position.copy()

        # Update position based on action
        if action == 0:  # Up
            new_position[1] = min(new_position[1] + 1, self.height - 1)
        elif action == 1:  # Down
            new_position[1] = max(new_position[1] - 1, 0)
        elif action == 2:  # Left
            new_position[0] = max(new_position[0] - 1, 0)
        elif action == 3:  # Right
            new_position[0] = min(new_position[0] + 1, self.width - 1)
        # Action 4 is Stay, no change

        # Check for collisions with other agents before moving
        other_main_positions = [pos for idx, pos in enumerate(self.bot_positions) if idx != agent_index]
        collision = any(np.array_equal(new_position, pos) for pos in other_main_positions + self.other_agents)
        if collision:
            new_position = old_position  # Revert to old position if collision

        self.bot_positions[agent_index] = new_position

        # Calculate rewards
        rewards = {agent: 0 for agent in self.agents}
        terminations = {agent: False for agent in self.agents}
        truncations = {agent: False for agent in self.agents}
        infos = {agent: {} for agent in self.agents}

        # Destination check
        if self.bot_positions[agent_index] == self.destination:
            rewards[self.agent_selection] += 10
            terminations[self.agent_selection] = True

        # Collision penalty
        if collision:
            rewards[self.agent_selection] -= 5

        # Traffic congestion penalty
        traffic_level = self.traffic_status[new_position[0], new_position[1]]
        if traffic_level == 2:
            rewards[self.agent_selection] -= 2

        # Distance-based reward
        distance = abs(new_position[0] - self.destination[0]) + abs(new_position[1] - self.destination[1])
        rewards[self.agent_selection] += -distance / (self.width + self.height)

        # Update environment state
        self.rewards = rewards
        self.terminations = terminations
        self.truncations = truncations
        self.infos = infos
        self.observations = self._get_observations()

        # Handle agent termination/truncation
        if terminations[self.agent_selection] or truncations[self.agent_selection]:
            self.agents.remove(self.agent_selection)
            del self.terminations[self.agent_selection]
            del self.truncations[self.agent_selection]
            del self.rewards[self.agent_selection]
            del self._cumulative_rewards[self.agent_selection]
            del self.infos[self.agent_selection]

        # Update cumulative rewards
        for agent in self.agents:
            self._cumulative_rewards[agent] += self.rewards.get(agent, 0)

        # Proceed to next agent
        if self.agents:
            self._agent_selector = agent_selector(self.agents)
            self.agent_selection = self._agent_selector.next()

    def _was_done_step(self, action):
        # Skip processing for done agents
        pass

    def render(self, mode="human"):
        pass

# Example usage remains the same
env = GridCityMultiAgent()
env.reset()
for agent in env.agent_iter():
    observation, reward, termination, truncation, info = env.last()
    if termination or truncation:
        action = None
    else:
        action = env.action_spaces[agent].sample()
    env.step(action)
env.close()

In [23]:
import numpy as np
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from gym import spaces
import heapq

def heuristic(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])  # Manhattan distance heuristic

def a_star(grid, start, end):
    width, height = len(grid[0]), len(grid)
    open_set = []
    heapq.heappush(open_set, (0, start))
    came_from = {}
    g_score = {tuple(start): 0}
    f_score = {tuple(start): heuristic(start, end)}

    directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]

    while open_set:
        _, current = heapq.heappop(open_set)

        if current == end:
            return reconstruct_path(came_from, current)

        for direction in directions:
            neighbor = [current[0] + direction[0], current[1] + direction[1]]
            if 0 <= neighbor[0] < width and 0 <= neighbor[1] < height:
                tentative_g_score = g_score[tuple(current)] + grid[neighbor[1]][neighbor[0]]

                if tuple(neighbor) not in g_score or tentative_g_score < g_score[tuple(neighbor)]:
                    came_from[tuple(neighbor)] = current
                    g_score[tuple(neighbor)] = tentative_g_score
                    f_score[tuple(neighbor)] = tentative_g_score + heuristic(neighbor, end)
                    heapq.heappush(open_set, (f_score[tuple(neighbor)], neighbor))

    return None  # If no path is found

def reconstruct_path(came_from, current):
    path = [current]
    while tuple(current) in came_from:
        current = came_from[tuple(current)]
        path.append(current)
    path.reverse()
    return path

class GridCityMultiAgent(AECEnv):
    def __init__(self, width=10, height=10, num_agents=2):
        self.width = width
        self.height = height
        self.agents = [f"agent_{i}" for i in range(num_agents)]
        self.possible_agents = self.agents[:]
        self.agent_name_mapping = {i: f"agent_{i}" for i in range(num_agents)}
        self.bot_positions = [[0, 0] for _ in range(num_agents)]  # Initial positions
        self.destination = [width-1, height-1]  # Target destination
        self.traffic_status = np.zeros((width, height))  # Traffic status grid
        self.other_agents = []  # List of other agents' positions
        
        # Define action and observation spaces for each agent
        self.action_spaces = {agent: spaces.Discrete(5) for agent in self.agents}  # Up, Down, Left, Right, Stay
        self.observation_spaces = {agent: spaces.Box(low=0, high=max(width, height), shape=(6,), dtype=np.uint8) for agent in self.agents}
        
        # Initialize agent selector
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        
        # Initialize terminations, truncations, rewards, and cumulative rewards
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        
        # Initialize paths for each agent using A*
        self.paths = {agent: a_star(self.traffic_status, self.bot_positions[i], self.destination) for i, agent in enumerate(self.agents)}

    def reset(self, seed=None, options=None):
        self.bot_positions = [[0, 0] for _ in range(len(self.agents))]
        self.destination = [self.width-1, self.height-1]
        self.traffic_status = np.random.randint(0, 3, size=(self.width, self.height))  # Random traffic conditions (0=clear, 2=congested)
        self.other_agents = [[np.random.randint(self.width), np.random.randint(self.height)] for _ in range(5)]  # Random positions of other agents
        self.agents = self.possible_agents[:]
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.paths = {agent: a_star(self.traffic_status, self.bot_positions[i], self.destination) for i, agent in enumerate(self.agents)}
        self.observations = self._get_observations()
        return self.observations

    def _get_observations(self):
        observations = {}
        for agent in self.agents:
            agent_index = self.agents.index(agent)
            observation = np.array([
                *self.bot_positions[agent_index],
                *self.destination,
                int(any(np.array_equal(self.bot_positions[agent_index], agent_pos) for agent_pos in self.other_agents + [self.bot_positions[j] for j in range(len(self.agents)) if j != agent_index])),
                int(self.traffic_status[self.bot_positions[agent_index][0], self.bot_positions[agent_index][1]])
            ])
            observations[agent] = observation
        return observations

    def observe(self, agent: str) -> np.ndarray:
        """Returns the observation an agent currently can make."""
        agent_index = self.agents.index(agent)
        observation = np.array([
            *self.bot_positions[agent_index],
            *self.destination,
            int(any(np.array_equal(self.bot_positions[agent_index], agent_pos) for agent_pos in self.other_agents + [self.bot_positions[j] for j in range(len(self.agents)) if j != agent_index])),
            int(self.traffic_status[self.bot_positions[agent_index][0], self.bot_positions[agent_index][1]])
        ])
        return observation

    def step(self, action):
        if self.terminations[self.agent_selection]:
            return self._was_done_step(action)
        if not self.agents:  # Check if there are no agents left
            return self._get_observations(), self.rewards, self.terminations, self.truncations, self.infos
        
        rewards = {agent: 0 for agent in self.agents}
        terminations = {agent: False for agent in self.agents}
        truncations = {agent: False for agent in self.agents}
        infos = {agent: {} for agent in self.agents}

        agent_index = self.agents.index(self.agent_selection)
        
        # Update bot position based on action or A* path
        if self.paths[self.agent_selection] and len(self.paths[self.agent_selection]) > 1:
            next_position = self.paths[self.agent_selection][1]
            # Update bot position based on next position in path
            if next_position[0] > self.bot_positions[agent_index][0]:
                action = 3  # Move Right
            elif next_position[0] < self.bot_positions[agent_index][0]:
                action = 2  # Move Left
            elif next_position[1] > self.bot_positions[agent_index][1]:
                action = 0  # Move Up
            elif next_position[1] < self.bot_positions[agent_index][1]:
                action = 1  # Move Down
            else:
                action = 4  # Stay

        # Update bot position based on action
        if action == 0:  # Move Up
            self.bot_positions[agent_index][1] = min(self.bot_positions[agent_index][1] + 1, self.height - 1)
        elif action == 1:  # Move Down
            self.bot_positions[agent_index][1] = max(self.bot_positions[agent_index][1] - 1, 0)
        elif action == 2:  # Move Left
            self.bot_positions[agent_index][0] = max(self.bot_positions[agent_index][0] - 1, 0)
        elif action == 3:  # Move Right
            self.bot_positions[agent_index][0] = min(self.bot_positions[agent_index][0] + 1, self.width - 1)
        
        

        # Check for collisions with other agents
        if any(np.array_equal(self.bot_positions[agent_index], agent_pos) for agent_pos in self.other_agents + [self.bot_positions[j] for j in range(len(self.agents)) if j != agent_index]):
            rewards[self.agent_selection] -= 5

        # Check if destination reached
        if self.bot_positions[agent_index] == self.destination:
            rewards[self.agent_selection] += 10
            terminations[self.agent_selection] = True

        # Penalize entering congested areas
        if self.traffic_status[self.bot_positions[agent_index][0], self.bot_positions[agent_index][1]] == 2:
            rewards[self.agent_selection] -= 2

        # Reward congestion avoidance and route efficiency
        distance_to_destination = abs(self.bot_positions[agent_index][0] - self.destination[0]) + abs(self.bot_positions[agent_index][1] - self.destination[1])
        rewards[self.agent_selection] += -(distance_to_destination / (self.width + self.height))

        # Update paths if necessary
        if self.paths[self.agent_selection] and self.bot_positions[agent_index] == self.paths[self.agent_selection][1]:
            self.paths[self.agent_selection].pop(0)

        self.observations = self._get_observations()
        self.rewards = rewards
        self.terminations = terminations
        self.truncations = truncations
        self.infos = infos

        # Accumulate rewards
        for agent, reward in self.rewards.items():
            self._cumulative_rewards[agent] += reward

        # Remove terminated agents
        if self.terminations[self.agent_selection]:
            self.agents.remove(self.agent_selection)
            if not self.agents:  # Check if there are no agents left
                return self._get_observations(), self.rewards, self.terminations, self.truncations, self.infos
            self._agent_selector = agent_selector(self.agents)
            self.agent_selection = self._agent_selector.next()

        # Update agent selection
        if self.agent_selection not in self.agents:
            self.agent_selection = self._agent_selector.next()

        return self.observations, rewards, terminations, truncations, infos

    def _was_done_step(self, action):
        # If an agent was already done, mark all of its observations as `done` too
        if self.terminations[self.agent_selection]:
            self._dones_step_first()
            if not self.agents:  # Check if there are no agents left
                return self._get_observations(), self.rewards, self.terminations, self.truncations, self.infos
            return self._get_observations(), self.rewards, self.terminations, self.truncations, self.infos
        else:
            return self.step(action)

    def _dones_step_first(self):
        pass  # Only mark the current agent as done

    def render(self, mode="human"):
        """Renders the environment as specified by self.render_mode."""
        pass

# Example usage
env = GridCityMultiAgent()
env.reset()
for agent in env.agent_iter():
    observation, reward, termination, truncation, info = env.last()
    if termination or truncation:
        action = None
    else:
        action = env.action_spaces[agent].sample()
    if not env.agents:  # Check if there are no agents left
        break
    env.step(action)
env.close()


In [27]:
import numpy as np
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from gymnasium import spaces
import heapq

def heuristic(a, b):
    """Calculate Manhattan distance heuristic between two points."""
    return abs(a[0] - b[0]) + abs(a[1] - b[1])

def a_star(grid, start, end):
    """
    A* pathfinding algorithm.
    
    Args:
        grid: 2D numpy array where values represent movement costs
        start: Starting position [x, y]
        end: Target position [x, y]
        
    Returns:
        List of positions forming the path from start to end
    """
    height, width = grid.shape
    start_tuple = tuple(start)
    end_tuple = tuple(end)
    
    # Priority queue for open set
    open_set = []
    heapq.heappush(open_set, (0, start_tuple))
    
    # Track where each node came from
    came_from = {}
    
    # Cost from start to current node
    g_score = {start_tuple: 0}
    
    # Estimated total cost from start to goal through current node
    f_score = {start_tuple: heuristic(start, end)}
    
    # Possible movement directions: right, down, left, up
    directions = [(1, 0), (0, -1), (-1, 0), (0, 1)]
    
    while open_set:
        _, current = heapq.heappop(open_set)
        
        if current == end_tuple:
            # Reconstruct path
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            path.reverse()
            return [list(pos) for pos in path]  # Convert tuples to lists
        
        for dx, dy in directions:
            nx, ny = current[0] + dx, current[1] + dy
            
            # Check if neighbor is within grid bounds
            if 0 <= nx < width and 0 <= ny < height:
                neighbor = (nx, ny)
                
                # Calculate tentative g_score (include traffic cost)
                # Higher grid values = more traffic = higher cost
                tentative_g_score = g_score[current] + (1 + grid[ny][nx])
                
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    # This path to neighbor is better than any previous one
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + heuristic(neighbor, end)
                    heapq.heappush(open_set, (f_score[neighbor], neighbor))
    
    # No path found
    return None

class GridCityMultiAgent(AECEnv):
    metadata = {"render_modes": ["human"], "name": "grid_city_multi_agent_v0"}
    
    def __init__(self, width=10, height=10, num_agents=2, render_mode=None):
        """
        Initialize Grid City multi-agent environment.
        
        Args:
            width: Width of the grid
            height: Height of the grid
            num_agents: Number of agents in the environment
            render_mode: The render mode to use
        """
        self.width = width
        self.height = height
        self.render_mode = render_mode
        
        # Setup agents
        self.possible_agents = [f"agent_{i}" for i in range(num_agents)]
        self.agents = self.possible_agents.copy()
        
        # Initial positions and destinations
        self.bot_positions = []
        self.destinations = []
        
        # Traffic grid
        self.traffic_status = np.zeros((height, width))
        
        # Other non-controlled agents in the environment
        self.other_agents_positions = []
        
        # Agent paths from A*
        self.paths = {}
        
        # Define action and observation spaces
        self.action_spaces = {agent: spaces.Discrete(5) for agent in self.possible_agents}  # Up, Down, Left, Right, Stay
        
        # Observation: [x, y, dest_x, dest_y, collision, traffic]
        self.observation_spaces = {
            agent: spaces.Box(
                low=0, 
                high=max(width, height), 
                shape=(6,), 
                dtype=np.uint8
            ) for agent in self.possible_agents
        }
        
        # Initialize required variables
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = None
        
        # Initialize game state variables
        self.terminations = {}
        self.truncations = {}
        self.rewards = {}
        self._cumulative_rewards = {}
        self.infos = {}
        self.observations = {}
        
        # Step count for potential truncation
        self.step_count = 0
        self.max_steps = width * height * 4  # Reasonable upper bound on steps
    
    def reset(self, seed=None, options=None):
        """
        Reset the environment.
        
        Args:
            seed: Random seed
            options: Additional options
            
        Returns:
            Observations for each agent
        """
        if seed is not None:
            np.random.seed(seed)
        
        # Reset agents
        self.agents = self.possible_agents.copy()
        
        # Reset position of agents (all start at different random positions)
        positions = set()
        self.bot_positions = []
        
        for _ in range(len(self.agents)):
            while True:
                x, y = np.random.randint(0, self.width), np.random.randint(0, self.height)
                if (x, y) not in positions:
                    positions.add((x, y))
                    self.bot_positions.append([x, y])
                    break
        
        # Set destinations (different for each agent)
        self.destinations = []
        for _ in range(len(self.agents)):
            while True:
                x, y = np.random.randint(0, self.width), np.random.randint(0, self.height)
                if (x, y) not in positions and [x, y] not in self.destinations:
                    self.destinations.append([x, y])
                    break
        
        # Generate random traffic conditions (0=clear, 1=moderate, 2=congested)
        self.traffic_status = np.random.randint(0, 3, size=(self.height, self.width))
        
        # Place other agents randomly (obstacles)
        self.other_agents_positions = []
        for _ in range(min(5, self.width * self.height // 10)):  # Limit number of other agents
            while True:
                x, y = np.random.randint(0, self.width), np.random.randint(0, self.height)
                pos = [x, y]
                if pos not in self.bot_positions and pos not in self.destinations and pos not in self.other_agents_positions:
                    self.other_agents_positions.append(pos)
                    break
        
        # Reset agent selector
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        
        # Reset state variables
        self.terminations = {agent: False for agent in self.possible_agents}
        self.truncations = {agent: False for agent in self.possible_agents}
        self.rewards = {agent: 0 for agent in self.possible_agents}
        self._cumulative_rewards = {agent: 0 for agent in self.possible_agents}
        self.infos = {agent: {} for agent in self.possible_agents}
        
        # Calculate A* paths for each agent
        self.paths = {}
        for i, agent in enumerate(self.agents):
            path = a_star(self.traffic_status, self.bot_positions[i], self.destinations[i])
            if path:
                self.paths[agent] = path
            else:
                # If no path is found, create a simple direct path
                self.paths[agent] = [self.bot_positions[i], self.destinations[i]]
        
        # Get initial observations
        self.observations = self._get_observations()
        
        # Reset step counter
        self.step_count = 0
        
        # If using human rendering, render the initial state
        if self.render_mode == "human":
            self.render()
        
        return self.observations
    
    def _get_observations(self):
        """Get observations for all agents."""
        observations = {}
        
        for i, agent in enumerate(self.possible_agents):
            if i < len(self.bot_positions):  # Make sure index is valid
                agent_pos = self.bot_positions[i]
                
                # Check if position is valid
                if 0 <= agent_pos[0] < self.width and 0 <= agent_pos[1] < self.height:
                    # Check for collisions with other agents or obstacles
                    has_collision = False
                    
                    # Check collision with other controlled agents
                    for j, other_pos in enumerate(self.bot_positions):
                        if i != j and np.array_equal(agent_pos, other_pos):
                            has_collision = True
                            break
                    
                    # Check collision with non-controlled agents
                    if not has_collision:
                        for other_pos in self.other_agents_positions:
                            if np.array_equal(agent_pos, other_pos):
                                has_collision = True
                                break
                    
                    # Get traffic at current position
                    traffic = int(self.traffic_status[agent_pos[1]][agent_pos[0]])
                    
                    observation = np.array([
                        agent_pos[0],                   # x position
                        agent_pos[1],                   # y position
                        self.destinations[i][0],        # destination x
                        self.destinations[i][1],        # destination y
                        int(has_collision),             # collision status
                        traffic                         # traffic level
                    ], dtype=np.uint8)
                    
                    observations[agent] = observation
                else:
                    # Default observation if agent is out of bounds
                    observations[agent] = np.zeros(6, dtype=np.uint8)
        
        return observations
    
    def observe(self, agent):
        """Return observation for the specified agent."""
        if agent in self.observations:
            return self.observations[agent]
        else:
            # If agent not found, return zeros
            return np.zeros(6, dtype=np.uint8)
    
    def step(self, action):
        """
        Step the environment forward.
        
        Args:
            action: Action to take
            
        Returns:
            tuple of observations, rewards, terminations, truncations, infos
        """
        if self.agent_selection not in self.agents:
            return self.observations, self.rewards, self.terminations, self.truncations, self.infos
        
        if self.terminations[self.agent_selection] or self.truncations[self.agent_selection]:
            # Agent already done, select next agent
            self.agent_selection = self._agent_selector.next()
            return self.observations, self.rewards, self.terminations, self.truncations, self.infos
        
        # Increment step counter
        self.step_count += 1
        
        # Get agent index
        agent_idx = self.possible_agents.index(self.agent_selection)
        
        # Store original position for reward calculation
        original_position = self.bot_positions[agent_idx].copy()
        
        # Process the action
        # 0: Up, 1: Down, 2: Left, 3: Right, 4: Stay
        if action == 0:  # Up
            self.bot_positions[agent_idx][1] = max(0, self.bot_positions[agent_idx][1] - 1)
        elif action == 1:  # Down
            self.bot_positions[agent_idx][1] = min(self.height - 1, self.bot_positions[agent_idx][1] + 1)
        elif action == 2:  # Left
            self.bot_positions[agent_idx][0] = max(0, self.bot_positions[agent_idx][0] - 1)
        elif action == 3:  # Right
            self.bot_positions[agent_idx][0] = min(self.width - 1, self.bot_positions[agent_idx][0] + 1)
        # action 4 is Stay - no position change
        
        # Calculate reward
        reward = 0
        
        # Check if agent moved
        if not np.array_equal(original_position, self.bot_positions[agent_idx]):
            # Small penalty for movement to encourage efficiency
            reward -= 0.1
            
            # Additional penalty for moving into congested areas
            x, y = self.bot_positions[agent_idx]
            traffic_level = self.traffic_status[y][x]
            reward -= 0.2 * traffic_level  # Penalty increases with traffic level
        
        # Check for collisions with other agents (both controlled and non-controlled)
        has_collision = False
        
        # Check collision with other controlled agents
        for j, other_pos in enumerate(self.bot_positions):
            if agent_idx != j and np.array_equal(self.bot_positions[agent_idx], other_pos):
                has_collision = True
                break
        
        # Check collision with non-controlled agents
        if not has_collision:
            for other_pos in self.other_agents_positions:
                if np.array_equal(self.bot_positions[agent_idx], other_pos):
                    has_collision = True
                    break
        
        if has_collision:
            reward -= 5  # Large penalty for collision
            
            # Move back to original position on collision
            self.bot_positions[agent_idx] = original_position.copy()
        
        # Check if destination reached
        if np.array_equal(self.bot_positions[agent_idx], self.destinations[agent_idx]):
            reward += 20  # Large reward for reaching destination
            self.terminations[self.agent_selection] = True
        
        # Reward for getting closer to destination (or penalty for getting further)
        old_distance = heuristic(original_position, self.destinations[agent_idx])
        new_distance = heuristic(self.bot_positions[agent_idx], self.destinations[agent_idx])
        reward += (old_distance - new_distance) * 0.5
        
        # Check if max steps reached
        if self.step_count >= self.max_steps:
            self.truncations[self.agent_selection] = True
        
        # Update A* path if needed
        if self.agent_selection in self.paths and len(self.paths[self.agent_selection]) > 1:
            # If we're on the first position in our path, remove it
            if np.array_equal(self.bot_positions[agent_idx], self.paths[self.agent_selection][0]):
                self.paths[self.agent_selection].pop(0)
            # If we've deviated from the path, recalculate
            elif not np.array_equal(self.bot_positions[agent_idx], self.paths[self.agent_selection][0]):
                new_path = a_star(self.traffic_status, self.bot_positions[agent_idx], self.destinations[agent_idx])
                if new_path:
                    self.paths[self.agent_selection] = new_path
        
        # Update rewards
        self.rewards[self.agent_selection] = reward
        self._cumulative_rewards[self.agent_selection] += reward
        
        # Update observations
        self.observations = self._get_observations()
        
        # Update agent selection
        if self.terminations[self.agent_selection]:
            # Remove terminated agent from active agents
            self.agents.remove(self.agent_selection)
            if not self.agents:
                # All agents have terminated
                pass
            else:
                # Reset agent selector with remaining agents
                self._agent_selector = agent_selector(self.agents)
        
        # Move to next agent
        if self.agents:
            self.agent_selection = self._agent_selector.next()
        
        # If using human rendering, render the current state
        if self.render_mode == "human":
            self.render()
        
        return self.observations, self.rewards, self.terminations, self.truncations, self.infos
    
    def render(self):
        """Render the environment using ASCII art (for console output)."""
        if self.render_mode != "human":
            return
        
        # Create grid with agent positions and traffic
        grid = [['.' for _ in range(self.width)] for _ in range(self.height)]
        
        # Mark traffic
        for y in range(self.height):
            for x in range(self.width):
                if self.traffic_status[y][x] == 1:
                    grid[y][x] = ','  # Light traffic
                elif self.traffic_status[y][x] == 2:
                    grid[y][x] = ':'  # Heavy traffic
        
        # Mark non-controlled agents
        for pos in self.other_agents_positions:
            x, y = pos
            if 0 <= x < self.width and 0 <= y < self.height:
                grid[y][x] = 'X'
        
        # Mark destinations
        for i, dest in enumerate(self.destinations):
            x, y = dest
            if 0 <= x < self.width and 0 <= y < self.height:
                grid[y][x] = chr(48 + i)  # Use numbers 0, 1, 2, etc.
        
        # Mark agent positions
        for i, pos in enumerate(self.bot_positions):
            x, y = pos
            if 0 <= x < self.width and 0 <= y < self.height:
                grid[y][x] = chr(65 + i)  # Use letters A, B, C, etc.
        
        # Print the grid
        print("\n" + "-" * (self.width + 2))
        for row in grid:
            print("|" + "".join(row) + "|")
        print("-" * (self.width + 2))
        
        # Print agent info
        for i, agent in enumerate(self.possible_agents):
            if i < len(self.bot_positions):
                terminated = self.terminations.get(agent, False)
                truncated = self.truncations.get(agent, False)
                reward = self._cumulative_rewards.get(agent, 0)
                status = "terminated" if terminated else "truncated" if truncated else "active"
                print(f"Agent {agent}: pos={self.bot_positions[i]}, dest={self.destinations[i]}, reward={reward:.1f}, status={status}")
        
        print("\nLegend:")
        print("A,B,... = Agents")
        print("0,1,... = Destinations")
        print("X = Obstacles")
        print(". = Clear road")
        print(", = Light traffic")
        print(": = Heavy traffic")
        print("\n")
    
    def close(self):
        """Close the environment."""
        pass

# Example usage
if __name__ == "__main__":
    env = GridCityMultiAgent(width=8, height=6, num_agents=3, render_mode="human")
    observations = env.reset()
    
    done = False
    total_rewards = {agent: 0 for agent in env.possible_agents}
    
    while env.agents:
        agent = env.agent_selection
        observation = env.observe(agent)
        
        if env.terminations[agent] or env.truncations[agent]:
            action = None
        else:
            # You can use your own policy here
            # For now, use a simple heuristic:
            agent_idx = env.possible_agents.index(agent)
            agent_pos = env.bot_positions[agent_idx]
            dest = env.destinations[agent_idx]
            
            # Simple greedy movement toward destination
            dx, dy = dest[0] - agent_pos[0], dest[1] - agent_pos[1]
            
            if abs(dx) > abs(dy):
                # Move horizontally
                action = 3 if dx > 0 else 2  # Right or Left
            else:
                # Move vertically
                action = 1 if dy > 0 else 0  # Down or Up
            
            # Add randomness occasionally
            if np.random.random() < 0.1:
                action = np.random.randint(0, 5)
        
        # Take a step
        observations, rewards, terminations, truncations, infos = env.step(action)
        
        # Update total rewards
        for a, r in rewards.items():
            total_rewards[a] += r
    
    print("\nSimulation complete!")
    for agent, reward in total_rewards.items():
        print(f"{agent} total reward: {reward:.2f}")
    
    env.close()



----------
|,,.,.,XC|
|0A:,.:::|
|.,.,::,:|
|XX:...B:|
|,:1.,,.2|
|X:,,,.,.|
----------
Agent agent_0: pos=[1, 1], dest=[0, 1], reward=0.0, status=active
Agent agent_1: pos=[6, 3], dest=[2, 4], reward=0.0, status=active
Agent agent_2: pos=[7, 0], dest=[7, 4], reward=0.0, status=active

Legend:
A,B,... = Agents
0,1,... = Destinations
X = Obstacles
. = Clear road
, = Light traffic
: = Heavy traffic



----------
|,A.,.,XC|
|0,:,.:::|
|.,.,::,:|
|XX:...B:|
|,:1.,,.2|
|X:,,,.,.|
----------
Agent agent_0: pos=[1, 0], dest=[0, 1], reward=-0.8, status=active
Agent agent_1: pos=[6, 3], dest=[2, 4], reward=0.0, status=active
Agent agent_2: pos=[7, 0], dest=[7, 4], reward=0.0, status=active

Legend:
A,B,... = Agents
0,1,... = Destinations
X = Obstacles
. = Clear road
, = Light traffic
: = Heavy traffic



----------
|,A.,.,XC|
|0,:,.:::|
|.,.,::,:|
|XX:..B.:|
|,:1.,,.2|
|X:,,,.,.|
----------
Agent agent_0: pos=[1, 0], dest=[0, 1], reward=-0.8, status=active
Agent agent_1: pos=[5, 3], dest=[2, 4