# Warehouse Management with a Multi-Agent AI System

**Copyright (c) 2026 Shrikara Kaudambady. All rights reserved.**

This notebook simulates a modern warehouse managed by a team of AI agents. A `ManagerAgent` oversees the process of storing new inventory. It uses a `StoragePlannerAgent` to decide *where* to put items and a `RobotCommanderAgent` to instruct a robot on *how* to get there using A* pathfinding.

### 1. Setup and Environment Classes

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import heapq
from time import sleep
from IPython.display import display, clear_output

class Warehouse:
    """Represents the warehouse environment: its layout, inventory, and robots."""
    def __init__(self, layout, robots):
        self.layout = layout
        self.robots = {r.robot_id: r for r in robots}
        self.inventory = {}
    
    def visualize(self, ax):
        ax.imshow(self.layout, cmap='Greys', interpolation='nearest')
        for robot_id, robot in self.robots.items():
            ax.text(robot.pos[1], robot.pos[0], 'R', color='red', ha='center', va='center', fontsize=12)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_title("Warehouse State")

class Robot:
    """Represents a single warehouse robot."""
    def __init__(self, robot_id, start_pos):
        self.robot_id = robot_id
        self.pos = start_pos
        self.payload = None
    
    def move_to(self, new_pos):
        self.pos = new_pos

### 2. Agent Tools: A* Pathfinding Algorithm
This is a tool that the `RobotCommanderAgent` will use. It finds the shortest path between two points on a grid.

In [None]:
def a_star_pathfinding(grid, start, end):
    """A* algorithm to find the shortest path in a grid."""
    neighbors = [(0, 1), (0, -1), (1, 0), (-1, 0)]
    close_set = set()
    came_from = {}
    gscore = {start: 0}
    fscore = {start: np.linalg.norm(np.array(start) - np.array(end))}
    oheap = []
    heapq.heappush(oheap, (fscore[start], start))
    
    while oheap:
        current = heapq.heappop(oheap)[1]
        if current == end:
            data = []
            while current in came_from:
                data.append(current)
                current = came_from[current]
            return data[::-1]
        
        close_set.add(current)
        for i, j in neighbors:
            neighbor = current[0] + i, current[1] + j
            if 0 <= neighbor[0] < grid.shape[0] and 0 <= neighbor[1] < grid.shape[1]:
                if grid[neighbor[0]][neighbor[1]] == 1: # Wall/Obstacle
                    continue
            else:
                continue
            
            tentative_g_score = gscore[current] + 1
            if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0):
                continue
            
            if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1] for i in oheap]:
                came_from[neighbor] = current
                gscore[neighbor] = tentative_g_score
                fscore[neighbor] = tentative_g_score + np.linalg.norm(np.array(neighbor) - np.array(end))
                heapq.heappush(oheap, (fscore[neighbor], neighbor))
    return []

### 3. The AI Agents
We define our three specialized agents: `StoragePlannerAgent`, `RobotCommanderAgent`, and the `ManagerAgent` who orchestrates them.

In [None]:
class StoragePlannerAgent:
    def find_optimal_location(self, warehouse, product_category, start_pos):
        print(f"\nPLANNER: Finding best location for a category '{product_category}' item...")
        empty_shelves = np.argwhere(warehouse.layout == 2) # 2 represents a shelf
        
        # Filter out occupied shelves
        available_shelves = [tuple(loc) for loc in empty_shelves if tuple(loc) not in warehouse.inventory]
        
        if not available_shelves:
            return None
        
        # Prioritize based on category (closer for 'A', further for 'C')
        distances = [np.linalg.norm(np.array(loc) - np.array(start_pos)) for loc in available_shelves]
        if product_category == 'A':
            best_loc = available_shelves[np.argmin(distances)]
        else: # Simple logic for B/C: find the furthest spot
            best_loc = available_shelves[np.argmax(distances)]
        
        print(f"PLANNER: Recommending location {best_loc}")
        return best_loc

class RobotCommanderAgent:
    def generate_path(self, warehouse_layout, start_pos, end_pos):
        print(f"\nCOMMANDER: Planning path from {start_pos} to {end_pos}...")
        path = a_star_pathfinding(warehouse_layout, start_pos, end_pos)
        print(f"COMMANDER: Path found with {len(path)} steps.")
        return path
    
    def execute_path(self, warehouse, robot_id, path):
        robot = warehouse.robots[robot_id]
        fig, ax = plt.subplots(figsize=(8, 8))
        for pos in path:
            robot.move_to(pos)
            clear_output(wait=True)
            warehouse.visualize(ax)
            display(fig)
            sleep(0.1)
        plt.close(fig)
        print(f"COMMANDER: Robot {robot_id} has arrived at {robot.pos}.")

class ManagerAgent:
    def __init__(self, warehouse, planner, commander):
        self.warehouse = warehouse
        self.planner = planner
        self.commander = commander
    
    def process_new_shipment(self, shipment, start_pos):
        print("==============================================")
        print(f"MANAGER: New shipment received at {start_pos}: {shipment['quantity']}x {shipment['product_id']}")
        
        # Delegate to Planner
        target_loc = self.planner.find_optimal_location(self.warehouse, shipment['category'], start_pos)
        if not target_loc:
            print("MANAGER: Process failed. No storage location available.")
            return
        
        # Delegate to Commander
        robot_id = 'R1'
        path = self.commander.generate_path(self.warehouse.layout, start_pos, target_loc)
        if not path:
            print("MANAGER: Process failed. Could not find a path.")
            return
        
        # Execute the plan
        print("MANAGER: Instructing robot to move item...")
        self.commander.execute_path(self.warehouse, robot_id, path)
        
        # Update inventory
        self.warehouse.inventory[target_loc] = {'product_id': shipment['product_id'], 'quantity': shipment['quantity']}
        print(f"MANAGER: Process complete. Inventory updated at {target_loc}.")
        print("==============================================")

### 4. Run the Simulation
Let's create the warehouse layout, initialize the agents, and process a new shipment of a high-priority 'A' category item.

In [None]:
# 0=Floor, 1=Wall, 2=Shelf, 3=Docking Bay
layout_grid = np.array([
    [1, 1, 1, 1, 1, 3, 1, 1, 1, 1, 1],
    [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
    [1, 0, 2, 2, 1, 0, 1, 2, 2, 0, 1],
    [1, 0, 2, 2, 1, 0, 1, 2, 2, 0, 1],
    [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
    [1, 0, 2, 2, 1, 0, 1, 2, 2, 0, 1],
    [1, 0, 2, 2, 1, 0, 1, 2, 2, 0, 1],
    [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
])

docking_bay_pos = tuple(np.argwhere(layout_grid == 3)[0])
robot1 = Robot('R1', start_pos=docking_bay_pos)
warehouse_env = Warehouse(layout_grid, [robot1])

# Initialize Agents
planner = StoragePlannerAgent()
commander = RobotCommanderAgent()
manager = ManagerAgent(warehouse_env, planner, commander)

# Define a new shipment
new_shipment = {'product_id': 'P505', 'quantity': 100, 'category': 'A'}

# Run the simulation
manager.process_new_shipment(new_shipment, start_pos=docking_bay_pos)