In [1]:
import heapq
from enum import Enum
import time
from types import SimpleNamespace

class Action(Enum):
    WAIT = '0'
    PICKUP = '1'
    DROP = '2'

# Testing scripts

In [2]:
map_base_path = "/kaggle/input/multiagents-map/"

all_args = [
    SimpleNamespace(seed=10, max_time_steps=1000, map=map_base_path + "map1.txt", num_agents=5, n_packages=100),
    SimpleNamespace(seed=10, max_time_steps=1000, map=map_base_path + "map2.txt", num_agents=5, n_packages=100),
    SimpleNamespace(seed=10, max_time_steps=1000, map=map_base_path + "map3.txt", num_agents=5, n_packages=500),
    SimpleNamespace(seed=10, max_time_steps=1000, map=map_base_path + "map4.txt", num_agents=10, n_packages=500),
    SimpleNamespace(seed=10, max_time_steps=1000, map=map_base_path + "map5.txt", num_agents=10, n_packages=1000),
]

# Environment

In [3]:
import numpy as np
from colorama import Fore, Style, init

class Robot: 
    def __init__(self, position): 
        self.position = position
        self.carrying = 0

class Package: 
    def __init__(self, start, start_time, target, deadline, package_id): 
        self.start = start
        self.start_time = start_time
        self.target = target
        self.deadline = deadline
        self.package_id = package_id
        self.status = 'None' # Possible statuses: 'waiting', 'in_transit', 'delivered'

class Environment: 

    def __init__(self, map_file, max_time_steps = 100, n_robots = 5, n_packages=20,
             move_cost=-0.01, delivery_reward=10., delay_reward=1., 
             seed=2025): 
        """ Initializes the simulation environment. :param map_file: Path to the map text file. :param move_cost: Cost incurred when a robot moves (LRUD). :param delivery_reward: Reward for delivering a package on time. """ 
        self.map_file = map_file
        self.grid = self.load_map()
        self.n_rows = len(self.grid)
        self.n_cols = len(self.grid[0]) if self.grid else 0 
        self.move_cost = move_cost 
        self.delivery_reward = delivery_reward 
        self.delay_reward = delay_reward
        self.t = 0 
        self.robots = [] # List of Robot objects.
        self.packages = [] # List of Package objects.
        self.total_reward = 0

        self.n_robots = n_robots
        self.max_time_steps = max_time_steps
        self.n_packages = n_packages

        self.rng = np.random.RandomState(seed)
        self.reset()
        self.done = False
        self.state = None

    def get_color(self, index):
        """
        Returns a random color from colorama.Fore.
        """
        colors = [
            Fore.BLACK, Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.MAGENTA, 
            Fore.CYAN, Fore.LIGHTBLACK_EX, Fore.LIGHTRED_EX, 
            Fore.LIGHTGREEN_EX, Fore.LIGHTYELLOW_EX, Fore.LIGHTBLUE_EX, 
            Fore.LIGHTMAGENTA_EX, Fore.LIGHTCYAN_EX
        ]
        return colors[index % len(colors)]
          
    def render_nice(self):
        """
        Render the map with colorama for better visual appeal.
        Obstacles are represented by 1 (red), free cells by 0 (white),
        robots by 'R' (green), and packages by 'P' (blue).
        """
        # Make a deep copy of the grid
        grid_copy = [row[:] for row in self.grid]
        
        for i, robot in enumerate(self.robots):
            r, c = robot.position
            if robot.carrying != 0:
                color = self.get_color(robot.carrying)
                grid_copy[r][c] = f'{Fore.BLUE}R{i}{color}P{robot.carrying}{Style.RESET_ALL}'
            else:
                grid_copy[r][c] = f'{Fore.BLUE}R{i}{Style.RESET_ALL}'
        
        for i, package in enumerate(self.packages):
            start_r, start_c = package.start
            target_r, target_c = package.target
            id = package.package_id
            color = self.get_color(id)
            if package.status == 'waiting':
                grid_copy[start_r][start_c] = f'{color}P{id}{Style.RESET_ALL}'
                grid_copy[target_r][target_c] = f'{color}T{id}{Style.RESET_ALL}'
            elif package.status == 'delivered':
                grid_copy[target_r][target_c] = f'{color}T{id}P{id}{Style.RESET_ALL}'
            else:
                grid_copy[target_r][target_c] = f'{color}T{id}{Style.RESET_ALL}'
        
        
        # Print column indices
        col_indices = '\t'.join([f'{Fore.YELLOW}{i}{Style.RESET_ALL}' for i in range(len(grid_copy[0]))])
        print(f'\t{col_indices}')
        
        for idx, row in enumerate(grid_copy):
            row_str = []
            for cell in row:
                if isinstance(cell, str):
                    row_str.append(f'{cell}')
                elif cell == 1:
                    row_str.append(f'{Fore.WHITE}{cell}{Style.RESET_ALL}')
                else:
                    row_str.append(f'{Fore.LIGHTBLACK_EX}{cell}{Style.RESET_ALL}')
            # Print row index followed by the row content
            print(f'{Fore.YELLOW}{idx}{Style.RESET_ALL}\t' + '\t'.join(row_str))
    def load_map(self):
        """
        Reads the map file and returns a 2D grid.
        Assumes that each line in the file contains numbers separated by space.
        0 indicates free cell and 1 indicates an obstacle.
        """
        grid = []
        with open(self.map_file, 'r') as f:
            for line in f:
                # Strip line breaks and split into numbers
                row = [int(x) for x in line.strip().split(' ')]
                grid.append(row)
        return grid
    
    def is_free_cell(self, position):
        """
        Checks if the cell at the given position is free (0) or occupied (1).
        :param position: Tuple (row, column) to check.
        :return: True if the cell is free, False otherwise.
        """
        r, c = position
        if r < 0 or r >= self.n_rows or c < 0 or c >= self.n_cols:
            return False
        return self.grid[r][c] == 0

    def add_robot(self, position):
        """
        Adds a robot at the given position if the cell is free.
        :param position: Tuple (row, column) for the robot's starting location.
        """
        if self.is_free_cell(position):
            robot = Robot(position)
            self.robots.append(robot)
        else:
            raise ValueError("Invalid robot position: must be on a free cell not occupied by an obstacle or another robot.")

    def reset(self):
        """
        Resets the environment to its initial state.
        Clears all robots and packages, and reinitializes the grid.
        """
        self.t = 0
        self.robots = []
        self.packages = []
        self.total_reward = 0
        self.done = False
        self.state = None

        # Reinitialize the grid
        #self.grid = self.load_map(sel)
        # Add robots and packages
        tmp_grid = np.array(self.grid)
        for i in range(self.n_robots):
            # Randomly select a free cell for the robot
            position, tmp_grid = self.get_random_free_cell(tmp_grid)
            self.add_robot(position)
        
        N = self.n_rows
        list_packages = []
        for i in range(self.n_packages):
            # Randomly select a free cell for the package
            start = self.get_random_free_cell_p()
            while True:
                target = self.get_random_free_cell_p()
                if start != target:
                    break
            
            to_deadline = 10 + self.rng.randint(N/2, 3*N)
            if i <= min(self.n_robots, 20):
                start_time = 0
            else:
                start_time = self.rng.randint(1, self.max_time_steps)
            list_packages.append((start_time, start, target, start_time + to_deadline ))

        list_packages.sort(key=lambda x: x[0])
        for i in range(self.n_packages):
            start_time, start, target, deadline = list_packages[i]
            package_id = i+1
            self.packages.append(Package(start, start_time, target, deadline, package_id))

        return self.get_state()
    
    def get_state(self):
        """
        Returns the current state of the environment.
        The state includes the positions of robots and packages.
        :return: State representation.
        """
        selected_packages = []
        for i in range(len(self.packages)):
            if self.packages[i].start_time == self.t:
                selected_packages.append(self.packages[i])
                self.packages[i].status = 'waiting'

        state = {
            'time_step': self.t,
            'map': self.grid,
            'robots': [(robot.position[0] + 1, robot.position[1] + 1,
                        robot.carrying) for robot in self.robots],
            'packages': [(package.package_id, package.start[0] + 1, package.start[1] + 1, 
                          package.target[0] + 1, package.target[1] + 1, package.start_time, package.deadline) for package in selected_packages]
        }
        return state
        

    def get_random_free_cell_p(self):
        """
        Returns a random free cell in the grid.
        :return: Tuple (row, col) of a free cell.
        """
        free_cells = [(i, j) for i in range(self.n_rows) for j in range(self.n_cols) \
                      if self.grid[i][j] == 0]
        i = self.rng.randint(0, len(free_cells))
        return free_cells[i]


    def get_random_free_cell(self, new_grid):
        """
        Returns a random free cell in the grid.
        :return: Tuple (row, col) of a free cell.
        """
        free_cells = [(i, j) for i in range(self.n_rows) for j in range(self.n_cols) \
                      if new_grid[i][j] == 0]
        i = self.rng.randint(0, len(free_cells))
        new_grid[free_cells[i][0]][free_cells[i][1]] = 1
        return free_cells[i], new_grid

    
    def step(self, actions):
        """
        Advances the simulation by one timestep.
        :param actions: A list where each element is a tuple (move_action, package_action) for a robot.
            move_action: one of 'S', 'L', 'R', 'U', 'D'.
            package_action: '1' (pickup), '2' (drop), or '0' (do nothing).
        :return: The updated state and total accumulated reward.
        """
        r = 0
        if len(actions) != len(self.robots):
            raise ValueError("The number of actions must match the number of robots.")

        #print("Package env: ")
        #print([p.status for p in self.packages])

        # -------- Process Movement --------
        proposed_positions = []
        # For each robot, compute the new position based on the movement action.
        old_pos = {}
        next_pos = {}
        for i, robot in enumerate(self.robots):
            move, pkg_act = actions[i]
            new_pos = self.compute_new_position(robot.position, move)
            # Check if the new position is valid (inside bounds and not an obstacle).
            if not self.valid_position(new_pos):
                new_pos = robot.position  # Invalid moves result in no change.
            proposed_positions.append(new_pos)
            old_pos[robot.position] = i
            next_pos[new_pos] = i

        moved_robots = [0 for _ in range(len(self.robots))]
        computed_moved = [0 for _ in range(len(self.robots))]
        final_positions = [None] * len(self.robots)
        occupied = {}  # Dictionary to record occupied cells.
        while True:
            updated = False
            for i in range(len(self.robots)):
            
                if computed_moved[i] != 0: 
                    continue

                pos = self.robots[i].position
                new_pos = proposed_positions[i]
                can_move = False
                if new_pos not in old_pos:
                    can_move = True
                else:
                    j = old_pos[new_pos]
                    if (j != i) and (computed_moved[j] == 0): # We must wait for the conflict resolve
                        continue
                    # We can decide where the robot can go now
                    can_move = True

                if can_move:
                    # print("Updated: ", i, new_pos)
                    if new_pos not in occupied:
                        occupied[new_pos] = i
                        final_positions[i] = new_pos
                        computed_moved[i] = 1
                        moved_robots[i] = 1
                        updated = True
                    else:
                        new_pos = pos
                        occupied[new_pos] = i
                        final_positions[i] = pos
                        computed_moved[i] = 1
                        moved_robots[i] = 0
                        updated = True

                if updated:
                    break

            if not updated:
                break
        #print("Computed postions: ", final_positions)
        for i in range(len(self.robots)):
            if computed_moved[i] == 0:
                final_positions[i] = self.robots[i].position 
        
        # Update robot positions and apply movement cost when applicable.
        for i, robot in enumerate(self.robots):
            move, pkg_act = actions[i]
            if move in ['L', 'R', 'U', 'D'] and final_positions[i] != robot.position:
                r += self.move_cost
            robot.position = final_positions[i]

        # -------- Process Package Actions --------
        for i, robot in enumerate(self.robots):
            move, pkg_act = actions[i]
            #print(i, move, pkg_act)
            # Pick up action.
            if pkg_act == '1':
                if robot.carrying == 0:
                    # Check for available packages at the current cell.
                    for j in range(len(self.packages)):
                        if self.packages[j].status == 'waiting' and self.packages[j].start == robot.position and self.packages[j].start_time <= self.t:
                            # Pick the package with the smallest package_id.
                            package_id = self.packages[j].package_id
                            robot.carrying = package_id
                            self.packages[j].status = 'in_transit'
                            # print(package_id, 'in transit')
                            break

            # Drop action.
            elif pkg_act == '2':
                if robot.carrying != 0:
                    package_id = robot.carrying
                    target = self.packages[package_id - 1].target
                    # Check if the robot is at the target position.
                    if robot.position == target:
                        # Update package status to delivered.
                        pkg = self.packages[package_id - 1]
                        pkg.status = 'delivered'
                        # Apply reward based on whether the delivery is on time.
                        if self.t <= pkg.deadline:
                            r += self.delivery_reward
                        else:
                            # Example: a reduced reward for late delivery.
                            r += self.delay_reward
                        robot.carrying = 0  
        
        # Increment the simulation timestep.
        self.t += 1

        self.total_reward += r

        done = False
        infos = {}
        if self.check_terminate():
            done = True
            infos['total_reward'] = self.total_reward
            infos['total_time_steps'] = self.t

        return self.get_state(), r, done, infos
    
    def check_terminate(self):
        if self.t == self.max_time_steps:
            return True
        
        for p in self.packages:
            if p.status != 'delivered':
                return False
            
        return True

    def compute_new_position(self, position, move):
        """
        Computes the intended new position for a robot given its current position and move command.
        """
        r, c = position
        if move == 'S':
            return (r, c)
        elif move == 'L':
            return (r, c - 1)
        elif move == 'R':
            return (r, c + 1)
        elif move == 'U':
            return (r - 1, c)
        elif move == 'D':
            return (r + 1, c)
        else:
            return (r, c)

    def valid_position(self, pos):
        """
        Checks if the new position is within the grid and not an obstacle.
        """
        r, c = pos
        if r < 0 or r >= self.n_rows or c < 0 or c >= self.n_cols:
            return False
        if self.grid[r][c] == 1:
            return False
        return True

    def render(self):
        """
        A simple text-based rendering of the map showing obstacles and robot positions.
        Obstacles are represented by 1, free cells by 0, and robots by 'R'.
        """
        # Make a deep copy of the grid
        grid_copy = [row[:] for row in self.grid]
        for i, robot in enumerate(self.robots):
            r, c = robot.position
            grid_copy[r][c] = 'R%i'%i
        for row in grid_copy:
            print('\t'.join(str(cell) for cell in row))

# Agents
## CBSAgents for map 1, 2, 3

In [4]:
class CBSAgents:
    def __init__(self, window_size=15):
        self.map = None
        self.n_rows = 0
        self.n_cols = 0
        self.window_size = window_size
        self.robots = []
        self.robots_target = []
        self.packages = []
        self.packages_free = []
        self.n_robots = 0
        self.is_init = False
        self.paths = {}  # agent_id -> list of positions by time step

    def init_agents(self, state):
        self.map = state['map']
        self.n_rows = len(self.map)
        self.n_cols = len(self.map[0])

        self.robots = [(robot[0] - 1, robot[1] - 1, 0) for robot in state['robots']]
        self.n_robots = len(self.robots)
        self.robots_target = ['free'] * self.n_robots

        self.packages = [(p[0], p[1] - 1, p[2] - 1, p[3] - 1, p[4] - 1, p[5]) for p in state['packages']]
        self.packages_free = [True] * len(self.packages)

        self.paths = {}

    def update_inner_state(self, state):
        for i in range(len(state['robots'])):
            prev = self.robots[i]
            robot = state['robots'][i]
            self.robots[i] = (robot[0] - 1, robot[1] - 1, robot[2])
            if prev[2] != 0 and self.robots[i][2] == 0:
                delivered_id = prev[2] - 1
                if 0 <= delivered_id < len(self.packages):
                    pkg = self.packages[delivered_id]
                    if pkg is not None and (self.robots[i][0], self.robots[i][1]) == (pkg[3], pkg[4]):
                        self.packages[delivered_id] = None
                        self.packages_free[delivered_id] = False
                self.robots_target[i] = 'free'
            elif self.robots[i][2] != 0:
                self.robots_target[i] = self.robots[i][2]
                
        for pkg_data in state['packages']:
            pkg_id, sr, sc, tr, tc, start_time, deadline = pkg_data
            current_pkgs = [p[0] for p in self.packages if p is not None]
            if pkg_id not in current_pkgs:
                self.packages.append((pkg_id, sr - 1, sc - 1, tr - 1, tc - 1, start_time))
                self.packages_free.append(True)

    def in_bounds(self, x, y):
        return 0 <= x < self.n_rows and 0 <= y < self.n_cols

    def passable(self, x, y):
        return self.map[x][y] == 0

    def neighbors(self, x, y):
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            nx, ny = x + dx, y + dy
            if self.in_bounds(nx, ny) and self.passable(nx, ny):
                yield (nx, ny)

    def a_star(self, start, goal, constraints):
        frontier = [(0 + self.heuristic(start, goal), 0, start, [])]
        visited = set()

        while frontier:
            f, g, node, path = heapq.heappop(frontier)
            if (node, g) in visited:
                continue
            visited.add((node, g))

            if node == goal:
                return path + [node]

            for neighbor in self.neighbors(*node):
                if neighbor in constraints.get(g + 1, set()):
                    continue
                heapq.heappush(frontier, (g + 1 + self.heuristic(neighbor, goal), g + 1, neighbor, path + [node]))

        return []

    def detect_conflict(self, paths):
        max_len = max(len(p) for p in paths.values())
        for t in range(max_len):
            # Check for vertex conflicts
            positions = {}
            for agent, path in paths.items():
                if t < len(path):
                    pos = path[t]
                else:
                    continue
                if pos in positions:
                    other = positions[pos]
                    return ("vertex", agent, other, t, pos)
                positions[pos] = agent

            # Check for edge conflicts
            for a1 in paths:
                for a2 in paths:
                    if a1 >= a2:
                        continue
                    p1 = paths[a1]
                    p2 = paths[a2]
                    if t + 1 >= min(len(p1), len(p2)):
                        continue
                    if p1[t] == p2[t + 1] and p1[t + 1] == p2[t]:
                        return ("edge", a1, a2, t + 1, p1[t + 1])
            
            # Check for move into static agent        
            for a1 in paths:
                for a2 in paths:
                    if a1 == a2:
                        continue
                    p1 = paths[a1]
                    p2 = paths[a2]
                    # a2 is idle
                    if len(p2) == 1:
                        if self.robots_target[a2] == 'free':
                            # Check if the moving agent is going to the idle agent's position
                            if t < len(p1) and p1[t] == p2[0]:
                                return ("idle_conflict", a1, a2, t, p2[0])
        return None

    def cbs(self, starts, goals):
        root = {'paths': {}, 'constraints': []}
        for i, (start, goal) in enumerate(zip(starts, goals)):
            path = self.a_star(start, goal, self.build_constraints([], i))
            if not path:
                return {}
            root['paths'][i] = path
        open_list = [root]

        while open_list:
            node = open_list.pop(0)
            conflict = self.detect_conflict(node['paths'])
            if not conflict:
                return node['paths']

            conflict_type, a1, a2, t, pos = conflict
            if conflict_type == "idle_conflict":
                # Handle idle conflict by forcing the idle agent to move
                idle_agent = a2
                moving_agent = a1
                idle_pos = node['paths'][idle_agent][0]

                new_constraints = node['constraints'] + [{
                    'agent': idle_agent,
                    'pos': idle_pos,
                    'time': t
                }]
                new_paths = dict(node['paths'])
                # Find a new path for the idle agent to move away
                neighbors = list(self.neighbors(*idle_pos))
                for neighbor in neighbors:
                    if neighbor != pos:  # Avoid moving to the conflicting position
                        new_path = self.a_star(idle_pos, neighbor, self.build_constraints(new_constraints, idle_agent))
                        if new_path:
                            new_paths[idle_agent] = new_path
                            open_list.append({'paths': new_paths, 'constraints': new_constraints})
                            break
            else:
                for agent in [a1, a2]:
                    new_constraints = node['constraints'] + [{
                        'agent': agent,
                        'pos': pos,
                        'time': t
                    }]
                    new_paths = dict(node['paths'])
                    new_path = self.a_star(starts[agent], goals[agent], self.build_constraints(new_constraints, agent))
                    if not new_path:
                        continue
                    new_paths[agent] = new_path
                    open_list.append({'paths': new_paths, 'constraints': new_constraints})
        return {}
    
    def build_constraints(self, constraints, agent_id):
        result = {}
        for c in constraints:
            if c['agent'] != agent_id:
                continue
            t = c['time']
            if t not in result:
                result[t] = set()
            result[t].add(c['pos'])
        return result


    def heuristic(self, a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def get_actions(self, state):
        if not self.is_init:
            self.is_init = True
            self.update_inner_state(state)
        else:
            self.update_inner_state(state)

        starts = [(r[0], r[1]) for r in self.robots]
        goals = []

        for i in range(self.n_robots):
            # Check if the robot is already carrying a package
            if self.robots_target[i] != 'free':
                pkg_id = self.robots_target[i] - 1
                # Check if the package is still valid (not delivered)
                if 0 <= pkg_id < len(self.packages) and self.packages[pkg_id] is not None:
                    pkg = self.packages[pkg_id]
                # If the package is not valid, set the target to 'free'
                else:
                    self.robots_target[i] = 'free'
                    # Set the goal to the robot's current position
                    goals.append((self.robots[i][0], self.robots[i][1]))
                    continue
                # If the package is valid, set the goal to the drop-off location
                if self.robots[i][2] == 0:
                    goals.append((pkg[1], pkg[2]))
                else:
                    goals.append((pkg[3], pkg[4]))
            # If the robot is not carrying a package, find the closest package
            else:
                closest_package_id = None
                min_dist = 1e9
                for j, pkg in enumerate(self.packages):
                    # Nếu gói hàng đã được giao hoặc không còn hợp lệ thì bỏ qua
                    if pkg is None or not self.packages_free[j]:
                        continue
                    d = self.heuristic((self.robots[i][0], self.robots[i][1]), (pkg[1], pkg[2]))
                    if d < min_dist:
                        min_dist = d
                        closest_package_id = j
                # If a package is found, set the target to the pickup location
                if closest_package_id is not None:
                    # Đánh dấu gói hàng đã được assigned
                    self.packages_free[closest_package_id] = False
                    # Đặt target = id gói hàng
                    self.robots_target[i] = self.packages[closest_package_id][0]
                    # Set the goal to the pickup location
                    pkg = self.packages[closest_package_id]
                    goals.append((pkg[1], pkg[2]))
                # If no package is found, set the goal to the robot's current position
                else:
                    goals.append((self.robots[i][0], self.robots[i][1]))

        # Find paths for all robots
        paths = self.cbs(starts, goals)
        actions = []
        for i in range(self.n_robots):
            path = paths.get(i, [starts[i]])
            current = starts[i]
            next_pos = path[1] if len(path) > 1 else current
            move = self.get_direction(current, next_pos)

            action = Action.WAIT.value
            if self.robots_target[i] != 'free' and current == goals[i]:
                # Tại điểm target (có thể là điểm giao hàng hoặc điểm nhặt hàng)
                if self.robots[i][2] == 0:
                    # Kiểm tra gói hàng chuẩn bị nhặt lên có phải là gói hàng đã được giao không
                    pkg_id = self.robots_target[i] - 1
                    
                    # Đảm bảo robot chỉ nhặt đúng gói hàng được gán
                    if 0 <= pkg_id < len(self.packages) and self.packages[pkg_id] is not None:
                        pkg = self.packages[pkg_id]
                        if (self.robots[i][0], self.robots[i][1]) == (pkg[1], pkg[2]) and self.robots_target[i] == pkg[0]:
                            # Chưa mang hàng → nhặt hàng
                            action = Action.PICKUP.value
                        else:
                            # Không đúng vị trí hoặc không đúng gói hàng → chờ
                            action = Action.WAIT.value 
                    else:
                        # Gói hàng không hợp lệ hoặc đã được nhặt
                        self.robots_target[i] = 'free'
                        action = Action.WAIT.value
                else:
                    # Đang mang hàng → giao hàng
                    action = Action.DROP.value
                    
            # Reset target if the action is invalid
            if action == Action.PICKUP.value and (self.robots_target[i] == 'free' or pkg is None):
                self.robots_target[i] = 'free'
                action = Action.WAIT.value
            elif action == Action.DROP.value and self.robots[i][2] == 0:
                self.robots_target[i] = 'free'
                action = Action.WAIT.value

            actions.append((move, action))

        return actions

    def get_direction(self, from_pos, to_pos):
        dx, dy = to_pos[0] - from_pos[0], to_pos[1] - from_pos[1]
        return {(0, 1): 'R', (0, -1): 'L', (1, 0): 'D', (-1, 0): 'U'}.get((dx, dy), 'S')

## AStarAgents for map 4, 5

In [5]:
# Hàm A* để tìm đường đi ngắn nhất
def run_astar(map_grid, start, goal):
    n_rows = len(map_grid)
    n_cols = len(map_grid[0])
    
    # Các hướng di chuyển có thể (lên, xuống, trái, phải) và hành động tương ứng
    # Sẽ trả về hành động đầu tiên cần thực hiện để đi theo đường ngắn nhất
    # movements = {'U': (-1, 0), 'D': (1, 0), 'L': (0, -1), 'R': (0, 1)}
    # action_keys = ['U', 'D', 'L', 'R']
    
    # Định nghĩa các hướng di chuyển (dx, dy) và hành động tương ứng
    # Ưu tiên các hướng nhất định có thể ảnh hưởng đến kết quả nếu có nhiều đường đi ngắn nhất bằng nhau
    # Thứ tự này ưu tiên U, D, L, R. Quan trọng là nó phải nhất quán.
    possible_moves = [
        ('U', (-1, 0)),  # Lên
        ('D', (1, 0)),   # Xuống
        ('L', (0, -1)),  # Trái
        ('R', (0, 1))    # Phải
    ]

    # Hàm heuristic (Manhattan distance)
    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    # Hàng đợi ưu tiên cho A*
    # (f_score, g_score, (row, col), path_taken_actions)
    # g_score là chi phí từ điểm bắt đầu đến nút hiện tại
    # f_score là g_score + heuristic(current_node, goal)
    # path_taken_actions là danh sách các hành động đã thực hiện
    pq = [(heuristic(start, goal), 0, start, [])]  # f_score, g_score, position, path_actions
    
    # Lưu trữ chi phí g_score tốt nhất cho mỗi nút đã khám phá
    g_scores = {start: 0}
    
    # Theo dõi các nút đã được xử lý hoàn toàn (đã lấy ra khỏi hàng đợi ưu tiên và mở rộng)
    # hoặc các nút đang ở trong hàng đợi ưu tiên nhưng có thể tìm thấy đường đi tốt hơn sau này
    came_from = {start: None} # Để truy vết đường đi (không cần thiết nếu chỉ trả về hành động đầu tiên)

    while pq:
        f_score, current_g_score, current_pos, current_path_actions = heapq.heappop(pq)

        # Nếu chi phí hiện tại để đến current_pos lớn hơn một chi phí đã biết trước đó, bỏ qua
        if current_g_score > g_scores.get(current_pos, float('inf')):
            continue

        # Nếu đã đến đích
        if current_pos == goal:
            if not current_path_actions: # Nếu đích là điểm bắt đầu
                return 'S', 0 # Đứng yên, chi phí 0
            return current_path_actions[0], len(current_path_actions) # Trả về hành động đầu tiên và độ dài đường đi

        # Khám phá các ô lân cận
        for action_char, (dr, dc) in possible_moves:
            next_pos = (current_pos[0] + dr, current_pos[1] + dc)

            # Kiểm tra tính hợp lệ của ô tiếp theo
            if 0 <= next_pos[0] < n_rows and 0 <= next_pos[1] < n_cols and map_grid[next_pos[0]][next_pos[1]] == 0:
                new_g_score = current_g_score + 1 # Chi phí mỗi bước là 1
                
                # Nếu tìm thấy đường đi tốt hơn đến next_pos
                if new_g_score < g_scores.get(next_pos, float('inf')):
                    g_scores[next_pos] = new_g_score
                    new_f_score = new_g_score + heuristic(next_pos, goal)
                    new_path_actions = current_path_actions + [action_char]
                    heapq.heappush(pq, (new_f_score, new_g_score, next_pos, new_path_actions))
                    came_from[next_pos] = current_pos


    return 'S', float('inf') # Nếu không tìm thấy đường đi, đứng yên

class AStarAgents:
    def __init__(self):
        # Hàm khởi tạo, thiết lập các thuộc tính ban đầu cho đối tượng Agents
        self.n_robots = 0  # Số lượng robot
        self.map_grid = []  # Bản đồ môi trường (dạng lưới)
        self.robots_info = []  # Danh sách thông tin của các robot
        # Mỗi robot có thông tin: {'pos': (r, c), 'carrying_package_id': None hoặc id gói hàng, 
        # 'target_package_id': None hoặc id gói hàng, 'phase': 'idle'/'to_pickup'/'to_dropoff'}
        self.packages_info = {}  # Thông tin các gói hàng
        # Dạng: {package_id: {'start_pos': (r, c), 'target_pos': (r, c), 'deadline': t, 
        # 'status': 'waiting'/'assigned'/'in_transit'/'delivered', 'assigned_robot': None hoặc id robot, 'start_time': t}}
        self.current_time = 0  # Thời gian hiện tại
        self.is_initialized = False  # Trạng thái khởi tạo

    def init_agents(self, state):
        # Khởi tạo trạng thái ban đầu của các robot và gói hàng từ dữ liệu đầu vào
        self.current_time = state['time_step']  # Cập nhật thời gian hiện tại
        self.map_grid = state['map']  # Cập nhật bản đồ
        self.n_robots = len(state['robots'])  # Số lượng robot

        if not self.is_initialized:
            # Khởi tạo thông tin robot
            self.robots_info = [{'pos': (r - 1, c - 1),  # Chuyển đổi sang chỉ số 0-indexed
                                 'carrying_package_id': None,
                                 'target_package_id': None,
                                 'phase': 'idle'} for r, c, carrying in state['robots']]
            self.is_initialized = True

        # Cập nhật thông tin gói hàng mới xuất hiện tại thời điểm này
        for pkg_data in state['packages']:
            pkg_id, sr, sc, tr, tc, start_time, deadline = pkg_data
            if pkg_id not in self.packages_info:  # Chỉ thêm nếu chưa có
                self.packages_info[pkg_id] = {
                    'id': pkg_id,
                    'start_pos': (sr - 1, sc - 1),
                    'target_pos': (tr - 1, tc - 1),
                    'deadline': deadline,
                    'status': 'waiting',  # Trạng thái ban đầu
                    'assigned_robot': None,
                    'start_time': start_time
                }

    def _update_internal_state(self, state):
        # Cập nhật trạng thái nội bộ của các robot và gói hàng từ trạng thái môi trường
        self.current_time = state['time_step']  # Cập nhật thời gian hiện tại

        # Cập nhật vị trí và trạng thái của robot
        for i in range(self.n_robots):
            # Lấy thông tin robot từ trạng thái môi trường
            r, c, carrying_id = state['robots'][i]
            current_robot_pos = (r - 1, c - 1)

            # Nếu robot vừa nhặt hàng
            if self.robots_info[i]['carrying_package_id'] is None and carrying_id != 0:
                self.robots_info[i]['carrying_package_id'] = carrying_id
                # Chuyển trạng thái robot sang giai đoạn giao hàng
                self.robots_info[i]['phase'] = 'to_dropoff'
                # Chuyển trạng thái gói hàng sang đang vận chuyển
                if carrying_id in self.packages_info:
                    self.packages_info[carrying_id]['status'] = 'in_transit'

            # Nếu robot vừa thả hàng
            elif self.robots_info[i]['carrying_package_id'] is not None and carrying_id == 0:
                dropped_pkg_id = self.robots_info[i]['carrying_package_id']
                # Chuyển trạng thái gói hàng sang đã giao
                # Và bỏ gán robot cho gói hàng
                if dropped_pkg_id in self.packages_info:
                    self.packages_info[dropped_pkg_id]['status'] = 'delivered'
                    self.packages_info[dropped_pkg_id]['assigned_robot'] = None
                # Đặt lại trạng thái robot
                self.robots_info[i]['carrying_package_id'] = None
                self.robots_info[i]['target_package_id'] = None
                self.robots_info[i]['phase'] = 'idle'

            self.robots_info[i]['pos'] = current_robot_pos

        # Cập nhật thông tin gói hàng mới (nếu có)
        for pkg_data in state['packages']:
            pkg_id, sr, sc, tr, tc, start_time, deadline = pkg_data
            # Nếu gói hàng có trong môi trường nhưng chưa có trong danh sách gói hàng
            # Thêm gói hàng mới vào danh sách
            if pkg_id not in self.packages_info or self.packages_info[pkg_id]['status'] == 'None':
                self.packages_info[pkg_id] = {
                    'id': pkg_id,
                    'start_pos': (sr - 1, sc - 1),
                    'target_pos': (tr - 1, tc - 1),
                    'deadline': deadline,
                    'status': 'waiting',  # Sẵn sàng để được gán
                    'assigned_robot': None,
                    'start_time': start_time
                }
            # Đảm bảo các gói hàng đã xuất hiện (start_time <= current_time) và đang chờ thì có status 'waiting'
            elif self.packages_info[pkg_id]['start_time'] <= self.current_time and self.packages_info[pkg_id]['status'] not in ['assigned', 'in_transit', 'delivered']:
                self.packages_info[pkg_id]['status'] = 'waiting'

    def _calculate_score(self, robot_idx, package_id):
        '''
        Tính toán điểm số cho một gói hàng dựa trên khoảng cách, thời gian và phần thưởng
        Điểm số càng cao thì gói hàng càng được ưu tiên hơn
        Điểm số = phần thưởng - (hệ số * thời gian) + chi phí di chuyển
        Điểm số sẽ giảm mạnh nếu không thể giao kịp hoặc rất sát hạn
        '''
        robot = self.robots_info[robot_idx]
        package = self.packages_info[package_id]

        # Thời gian ước tính để robot đến điểm lấy hàng
        _, time_to_pickup = run_astar(self.map_grid, robot['pos'], package['start_pos'])
        if time_to_pickup == float('inf'):  # Không thể đến được
            return -float('inf')

        # Thời gian ước tính từ điểm lấy hàng đến điểm giao hàng
        _, time_to_deliver_from_pickup = run_astar(self.map_grid, package['start_pos'], package['target_pos'])
        if time_to_deliver_from_pickup == float('inf'):  # Không thể giao được
            return -float('inf')

        total_estimated_time = time_to_pickup + time_to_deliver_from_pickup
        estimated_delivery_time = self.current_time + total_estimated_time

        # Phần thưởng cơ bản
        reward = 10.0  # Phần thưởng giao hàng đúng hạn
        if estimated_delivery_time > package['deadline']:
            reward = 1.0  # Phần thưởng giao hàng trễ

        # Chi phí di chuyển
        move_cost_pickup = time_to_pickup * -0.01 if time_to_pickup > 0 else 0
        move_cost_deliver = time_to_deliver_from_pickup * -0.01 if time_to_deliver_from_pickup > 0 else 0
        total_move_cost = move_cost_pickup + move_cost_deliver

        # Điểm số: phần thưởng - (hệ số * thời gian) + chi phí di chuyển
        time_penalty_factor = 0.001
        score = reward - (total_estimated_time * time_penalty_factor) + total_move_cost

        # Giảm mạnh điểm nếu không thể giao kịp hoặc rất sát hạn
        if estimated_delivery_time > package['deadline'] + (len(self.map_grid) * 2):  # Thêm một chút buffer
            return -float('inf')

        # Ưu tiên các gói hàng có deadline sớm hơn nếu điểm số tương đương
        score -= package['deadline'] * 0.0001

        return score

    def get_actions(self, state):
        '''
        Hàm chính để lấy hành động cho các robot
        '''
        if not self.is_initialized:
            self.init_agents(state)  # Khởi tạo lần đầu nếu chưa

        self._update_internal_state(state)  # Cập nhật trạng thái nội bộ từ state của môi trường

        actions = []
        assigned_packages_this_step = set()  # Để tránh nhiều robot cùng nhắm 1 gói

        # Gán nhiệm vụ cho các robot rảnh rỗi
        for i in range(self.n_robots):
            robot = self.robots_info[i]
            
            if robot['phase'] == 'idle':
                best_package_id = None
                best_score = -float('inf')

                # Lọc các gói hàng có thể giao
                available_packages = sorted(
                    [pkg_id for pkg_id, pkg_info in self.packages_info.items()
                     if pkg_info['status'] == 'waiting' and pkg_info['start_time'] <= self.current_time],
                    key=lambda pkg_id: pkg_id
                )

                for pkg_id in available_packages:
                    if pkg_id in assigned_packages_this_step:  # Gói này đã được robot khác chọn
                        continue

                    score = self._calculate_score(i, pkg_id)
                    if score > best_score:
                        best_score = score
                        best_package_id = pkg_id

                # Nếu tìm thấy gói hàng tốt nhất cho robot này
                if best_package_id is not None:
                    # Assign gói hàng cho robot
                    self.robots_info[i]['target_package_id'] = best_package_id
                    # Đặt trạng thái robot là đi đến gói hàng
                    self.robots_info[i]['phase'] = 'to_pickup'
                    # Cập nhật trạng thái gói hàng là đã được gán
                    self.packages_info[best_package_id]['status'] = 'assigned'
                    # Đánh dấu robot đã được gán gói hàng
                    self.packages_info[best_package_id]['assigned_robot'] = i
                    # Đánh dấu gói hàng này đã được gán trong bước này
                    assigned_packages_this_step.add(best_package_id)

        # Xác định hành động cho từng robot
        for i in range(self.n_robots):
            robot = self.robots_info[i]
            move_action = 'S'
            package_action = '0'  # 0: Không có hành động liên quan đến gói hàng

            if robot['phase'] == 'to_pickup':
                target_pkg_id = robot['target_package_id']
                if target_pkg_id is None or target_pkg_id not in self.packages_info or self.packages_info[target_pkg_id]['status'] == 'delivered':
                    # Gói hàng mục tiêu không còn hợp lệ
                    robot['phase'] = 'idle'
                    robot['target_package_id'] = None
                else:
                    package_to_pickup = self.packages_info[target_pkg_id]
                    if robot['pos'] == package_to_pickup['start_pos']:
                        move_action = 'S'  # Đứng yên để nhặt
                        package_action = '1'  # Nhặt hàng
                    else:
                        move_action, _ = run_astar(self.map_grid, robot['pos'], package_to_pickup['start_pos'])

            elif robot['phase'] == 'to_dropoff':
                carrying_pkg_id = robot['carrying_package_id']
                if carrying_pkg_id is None or carrying_pkg_id not in self.packages_info:
                    # Lỗi logic: robot đang ở phase to_dropoff nhưng không mang hàng
                    robot['phase'] = 'idle'
                else:
                    package_to_dropoff = self.packages_info[carrying_pkg_id]
                    if robot['pos'] == package_to_dropoff['target_pos']:
                        move_action = 'S'  # Đứng yên để thả
                        package_action = '2'  # Thả hàng
                    else:
                        move_action, _ = run_astar(self.map_grid, robot['pos'], package_to_dropoff['target_pos'])

            actions.append((move_action, package_action))

        return actions  # Trả về danh sách hành động

# Main

In [6]:
def run_simulation(args):
    np.random.seed(args.seed)

    map_name = args.map.split("/")[-1]
    print(f"--- Running {map_name} ---")

    env = Environment(map_file=args.map, max_time_steps=args.max_time_steps,
                      n_robots=args.num_agents, n_packages=args.n_packages,
                      seed=args.seed)

    state = env.reset()
    # Chọn agent class theo map
    if map_name in ["map1.txt", "map2.txt", "map3.txt"]:
        agents = CBSAgents()
    else:  # map4.txt, map5.txt
        agents = AStarAgents()
    agents.init_agents(state)
    done = False
    t = 0
    while not done:
        actions = agents.get_actions(state)
        next_state, reward, done, infos = env.step(actions)
        state = next_state
        # env.render_nice()
        t += 1

    print("Episode finished")
    print("Total reward:", infos['total_reward'])
    print("Total time steps:", infos['total_time_steps'])
    print()

# Run test scripts

In [7]:
for args in all_args:
    run_simulation(args)

--- Running map1.txt ---
Episode finished
Total reward: 921.8900000000068
Total time steps: 994

--- Running map2.txt ---
Episode finished
Total reward: 742.1400000000099
Total time steps: 1000

--- Running map3.txt ---
Episode finished
Total reward: 409.1099999999928
Total time steps: 1000

--- Running map4.txt ---
Episode finished
Total reward: 266.2500000000011
Total time steps: 1000

--- Running map5.txt ---
Episode finished
Total reward: 67.58999999999995
Total time steps: 1000

