In [2]:
import gymnasium as gym
import rware

try:
    # Adding the 'rware:' prefix is correct for Gymnasium 1.0.0
    env = gym.make("rware:rware-tiny-2ag-v2")
    obs, info = env.reset()
    
    # FIX: Use .unwrapped to access rware-specific attributes
    n_agents = env.unwrapped.n_agents 
    
    print(f"Success! Agents: {n_agents}")
    print(f"Observation Space: {env.observation_space}")
    print(f"Action Space: {env.action_space}")
except Exception as e:
    print(f"Error: {e}")

Success! Agents: 2
Observation Space: Tuple(Box(-inf, inf, (71,), float32), Box(-inf, inf, (71,), float32))
Action Space: Tuple(Discrete(5), Discrete(5))


In [3]:
import networkx as nx
import numpy as np

def create_warehouse_graph(env):
    """Converts the rware grid into a NetworkX graph for pathfinding."""
    grid_size = env.unwrapped.grid_size # (rows, cols)
    G = nx.grid_2d_graph(grid_size[0], grid_size[1])
    
    # In rware, shelves are obstacles unless the agent is picking one up.
    # For a simple baseline, we treat shelf locations as traversable 
    # but with a higher cost, or we remove nodes that are currently blocked.
    return G

def get_a_star_path(G, start, goal):
    """Calculates the shortest path between two points."""
    try:
        path = nx.astar_path(G, start, goal, heuristic=lambda u, v: abs(u[0]-v[0]) + abs(u[1]-v[1]))
        return path
    except nx.NetworkXNoPath:
        return None

# Test the baseline logic
grid_graph = create_warehouse_graph(env)
start_pos = (1, 1)
goal_pos = (5, 5)
path = get_a_star_path(grid_graph, start_pos, goal_pos)

print(f"Calculated A* Path: {path}")

Calculated A* Path: [(1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5)]


In [4]:
from scipy.optimize import linear_sum_assignment

def get_task_assignments(agents_pos, goals_pos):
    """
    Calculates the optimal assignment of robots to goals.
    agents_pos: List of (x, y) for each robot
    goals_pos: List of (x, y) for each shelf/goal
    """
    # Create Cost Matrix (Manhattan distance as a proxy for A* to save CPU)
    cost_matrix = np.zeros((len(agents_pos), len(goals_pos)))
    
    for i, a_pos in enumerate(agents_pos):
        for j, g_pos in enumerate(goals_pos):
            # Manhattan distance: |x1-x2| + |y1-y2|
            cost_matrix[i, j] = abs(a_pos[0] - g_pos[0]) + abs(a_pos[1] - g_pos[1])

    # Solve the linear sum assignment problem
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    
    # Returns a list of (agent_index, goal_index)
    return list(zip(row_ind, col_ind))

# Example usage:
agents = [(1,1), (1,2)]
goals = [(5,5), (4,3)]
assignments = get_task_assignments(agents, goals)
print(f"Optimal Assignments: {assignments}")

Optimal Assignments: [(np.int64(0), np.int64(1)), (np.int64(1), np.int64(0))]


In [5]:
import time

def run_baseline_evaluation(env, episodes=5):
    total_throughput = 0
    total_collisions = 0
    
    for ep in range(episodes):
        obs, info = env.reset()
        done = False
        steps = 0
        
        while not done and steps < 500: # Limit steps to prevent infinite loops
            # 1. Get assignments (Hungarian)
            # 2. Get next action for each agent (A*)
            # 3. env.step(actions)
            
            # For now, we simulate a random step to verify the loop structure
            actions = env.action_space.sample() 
            obs, reward, terminated, truncated, info = env.step(actions)
            
            # RWARE usually provides a 'messages' or 'info' dict for collisions
            if info.get("collisions", 0) > 0:
                total_collisions += info["collisions"]
                
            steps += 1
            if all(terminated): break
            
        print(f"Episode {ep+1} Complete.")

    return total_collisions