In [None]:
import pygame
import sys
import numpy as np
from scipy.optimize import linear_sum_assignment
import networkx as nx
import time

# --- CONFIG ---
GRID_SIZE = 50
GRID_WIDTH, GRID_HEIGHT = 10, 10
SCREEN_WIDTH, SCREEN_HEIGHT = GRID_WIDTH * GRID_SIZE, GRID_HEIGHT * GRID_SIZE
FPS = 30

robots = [(0, 0), (0, 9), (9, 0)]
tasks = [(7, 7), (2, 8), (6, 2)]
NUM_ROBOTS = len(robots)
NUM_TASKS = len(tasks)

OBSTACLE_COUNT = 15  # Number of obstacles

# Colors
BG_COLOR = (30, 30, 30)
GRID_COLOR = (50, 50, 50)
ROBOT_COLORS = [(0, 0, 255), (0, 255, 0), (255, 0, 255)]
TASK_COLOR = (255, 50, 50)
OBSTACLE_COLOR = (100, 100, 100)
PATH_COLOR = (200, 200, 200)
TEXT_COLOR = (255, 255, 255)

pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption("Warehouse Robot Coordination Expanded")
clock = pygame.time.Clock()
font = pygame.font.SysFont(None, 24)

# --- Generate Obstacles randomly ---
np.random.seed(42)
obstacles = set()
while len(obstacles) < OBSTACLE_COUNT:
    cell = (np.random.randint(0, GRID_HEIGHT), np.random.randint(0, GRID_WIDTH))
    if cell not in robots and cell not in tasks:
        obstacles.add(cell)

# --- TASK ASSIGNMENT (Hungarian Algorithm) ---
cost_matrix = np.zeros((NUM_ROBOTS, NUM_TASKS))
for i, r in enumerate(robots):
    for j, t in enumerate(tasks):
        cost_matrix[i][j] = abs(r[0] - t[0]) + abs(r[1] - t[1])

row_ind, col_ind = linear_sum_assignment(cost_matrix)
assignments = list(zip(row_ind, col_ind))

# --- PATH PLANNING (A*) avoiding obstacles ---
def get_graph_with_obstacles():
    G = nx.grid_2d_graph(GRID_WIDTH, GRID_HEIGHT)
    for ob in obstacles:
        if ob in G:
            G.remove_node(ob)
    return G

G = get_graph_with_obstacles()
paths = []
for r_idx, t_idx in assignments:
    start = robots[r_idx]
    goal = tasks[t_idx]
    try:
        path = nx.astar_path(G, start, goal)
    except nx.NetworkXNoPath:
        path = [start]  # No path found, stay put
    paths.append(path)

max_len = max(len(p) for p in paths)
for i in range(len(paths)):
    paths[i] += [paths[i][-1]] * (max_len - len(paths[i]))

# --- Simulation State ---
step = 0
last_move_time = time.time()
move_interval = 1.0  # seconds

def draw_grid():
    for x in range(0, SCREEN_WIDTH, GRID_SIZE):
        pygame.draw.line(screen, GRID_COLOR, (x, 0), (x, SCREEN_HEIGHT))
    for y in range(0, SCREEN_HEIGHT, GRID_SIZE):
        pygame.draw.line(screen, GRID_COLOR, (0, y), (SCREEN_WIDTH, y))

def draw_tasks():
    for t in tasks:
        rect = pygame.Rect(t[1]*GRID_SIZE, t[0]*GRID_SIZE, GRID_SIZE, GRID_SIZE)
        pygame.draw.rect(screen, TASK_COLOR, rect)
        pygame.draw.line(screen, (255,255,255), rect.topleft, rect.bottomright, 2)
        pygame.draw.line(screen, (255,255,255), rect.topright, rect.bottomleft, 2)

def draw_obstacles():
    for ob in obstacles:
        rect = pygame.Rect(ob[1]*GRID_SIZE, ob[0]*GRID_SIZE, GRID_SIZE, GRID_SIZE)
        pygame.draw.rect(screen, OBSTACLE_COLOR, rect)

def draw_robots():
    for i, path in enumerate(paths):
        # Draw trail
        trail_points = path[:step+1]
        for pos in trail_points:
            center = (pos[1]*GRID_SIZE + GRID_SIZE//2, pos[0]*GRID_SIZE + GRID_SIZE//2)
            pygame.draw.circle(screen, PATH_COLOR, center, GRID_SIZE//6)
        # Draw robot
        x, y = path[step]
        center = (y*GRID_SIZE + GRID_SIZE//2, x*GRID_SIZE + GRID_SIZE//2)
        pygame.draw.circle(screen, ROBOT_COLORS[i], center, GRID_SIZE//3)
        label = font.render(f"R{i}", True, TEXT_COLOR)
        screen.blit(label, (center[0] - label.get_width()//2, center[1] - label.get_height()//2))

def can_move_to(next_pos, current_positions):
    # Check obstacles and other robots
    if next_pos in obstacles:
        return False
    if next_pos in current_positions:
        return False
    return True

def main():
    global step, last_move_time

    running = True
    paused = False

    instructions = [
        "SPACE: Pause/Resume",
        "R: Reset",
        "ESC or Q: Quit"
    ]

    # Track robot positions
    robot_positions = [path[0] for path in paths]

    while running:
        screen.fill(BG_COLOR)
        draw_grid()
        draw_tasks()
        draw_obstacles()
        draw_robots()

        for i, line in enumerate(instructions):
            txt = font.render(line, True, TEXT_COLOR)
            screen.blit(txt, (10, 10 + 20*i))

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_SPACE:
                    paused = not paused
                elif event.key == pygame.K_r:
                    step = 0
                    robot_positions[:] = [path[0] for path in paths]
                elif event.key in (pygame.K_ESCAPE, pygame.K_q):
                    running = False

        # Move robots autonomously if not paused
        if not paused and time.time() - last_move_time > move_interval:
            last_move_time = time.time()

            next_positions = []
            # Calculate desired next positions
            for i in range(NUM_ROBOTS):
                if step + 1 < len(paths[i]):
                    next_pos = paths[i][step + 1]
                else:
                    next_pos = paths[i][step]
                next_positions.append(next_pos)

            # Check collisions and decide which robots can move
            can_move_flags = []
            for i in range(NUM_ROBOTS):
                # Robots can move if next cell free of obstacles and other robots
                # Note: For simultaneous movement, check conflicts here
                others_positions = robot_positions[:i] + robot_positions[i+1:]
                if can_move_to(next_positions[i], others_positions):
                    can_move_flags.append(True)
                else:
                    can_move_flags.append(False)

            # Update positions and step counter
            for i in range(NUM_ROBOTS):
                if can_move_flags[i]:
                    robot_positions[i] = next_positions[i]
                # else stays put

            # Only increase step if all robots that can move did move
            # or some rule (for simplicity, just increase step)
            step = min(step + 1, max_len - 1)

        pygame.display.flip()
        clock.tick(FPS)

    pygame.quit()
    sys.exit()

if __name__ == "__main__":
    main()


SystemExit: 

In [2]:
import pygame
import sys
import numpy as np
import networkx as nx
import time
from collections import deque

# --- Config ---
GRID_SIZE = 50
GRID_WIDTH, GRID_HEIGHT = 12, 12
SCREEN_WIDTH, SCREEN_HEIGHT = GRID_WIDTH * GRID_SIZE, GRID_HEIGHT * GRID_SIZE
FPS = 30

NUM_ROBOTS = 3
NUM_TASKS = 6
OBSTACLE_COUNT = 20
CHARGING_STATIONS = [(0, GRID_HEIGHT-1), (GRID_WIDTH-1, 0)]

BATTERY_CAPACITY = 100
BATTERY_CONSUMPTION_PER_MOVE = 1
BATTERY_THRESHOLD = 20
CHARGING_RATE = 5  # battery units per step charging

# Colors
BG_COLOR = (30, 30, 30)
GRID_COLOR = (50, 50, 50)
ROBOT_COLORS = [(0, 0, 255), (0, 255, 0), (255, 0, 255)]
TASK_COLOR = (255, 50, 50)
OBSTACLE_COLOR = (100, 100, 100)
CHARGING_COLOR = (255, 255, 0)
PATH_COLOR = (200, 200, 200)
TEXT_COLOR = (255, 255, 255)
BATTERY_LOW_COLOR = (255, 100, 0)

pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption("Advanced Warehouse Robot Multi-Agent Simulation")
clock = pygame.time.Clock()
font = pygame.font.SysFont(None, 24)

# --- Environment setup ---
np.random.seed(42)
obstacles = set()
while len(obstacles) < OBSTACLE_COUNT:
    cell = (np.random.randint(0, GRID_WIDTH), np.random.randint(0, GRID_HEIGHT))
    if cell not in CHARGING_STATIONS:
        obstacles.add(cell)

tasks = []
for i in range(NUM_TASKS):
    while True:
        pos = (np.random.randint(0, GRID_WIDTH), np.random.randint(0, GRID_HEIGHT))
        if pos not in obstacles and pos not in CHARGING_STATIONS:
            break
    tasks.append({
        'id': i,
        'pos': pos,
        'weight': np.random.randint(1, 5),
        'priority': np.random.choice(['high', 'medium', 'low']),
        'deadline': None,
        'assigned_to': None,
        'completed': False,
    })

# --- Robot Class ---
class Robot:
    def __init__(self, id, pos, capacity):
        self.id = id
        self.pos = pos
        self.capacity = capacity
        self.task_queue = deque()
        self.battery = BATTERY_CAPACITY
        self.path = []
        self.state = 'idle'  # 'idle', 'moving', 'charging'
        self.charge_ticks = 0

    def bid_for_task(self, task):
        if task['assigned_to'] is not None or task['completed']:
            return float('inf')  # Already taken/completed
        dist = abs(self.pos[0] - task['pos'][0]) + abs(self.pos[1] - task['pos'][1])
        load_ok = task['weight'] <= self.capacity
        if not load_ok:
            return float('inf')
        cost = dist
        if self.battery < BATTERY_THRESHOLD:
            cost += 50
        return cost

    def plan_path(self, goal, graph):
        try:
            self.path = nx.astar_path(graph, self.pos, goal)
            if len(self.path) > 1:
                self.path = self.path[1:]  # exclude current pos
        except nx.NetworkXNoPath:
            self.path = []

    def move_step(self, occupied_positions):
        if self.state == 'charging':
            self.charge_ticks += 1
            self.battery = min(BATTERY_CAPACITY, self.battery + CHARGING_RATE)
            if self.battery >= BATTERY_CAPACITY:
                self.state = 'idle'
                self.charge_ticks = 0
            return

        if not self.path:
            self.state = 'idle'
            return

        next_pos = self.path[0]
        if next_pos in obstacles or next_pos in occupied_positions:
            # Can't move now, wait and replan next update
            self.state = 'idle'
            self.path = []  # Clear path to trigger replanning next update
            return

        self.pos = next_pos
        self.path.pop(0)
        self.battery = max(0, self.battery - BATTERY_CONSUMPTION_PER_MOVE)
        self.state = 'moving'

    def find_nearest_charging_station(self):
        """Find the nearest charging station"""
        min_dist = float('inf')
        nearest_station = CHARGING_STATIONS[0]
        for station in CHARGING_STATIONS:
            dist = abs(self.pos[0] - station[0]) + abs(self.pos[1] - station[1])
            if dist < min_dist:
                min_dist = dist
                nearest_station = station
        return nearest_station

    def update(self, graph, occupied_positions):
        if self.battery < BATTERY_THRESHOLD and self.state != 'charging':
            # Need charging - find nearest station
            nearest_station = self.find_nearest_charging_station()
            # Clear current tasks and add charging task
            self.task_queue.clear()
            self.task_queue.appendleft({'pos': nearest_station, 'charge': True, 'completed': False})
            self.path = []  # Clear current path
            self.state = 'idle'

        if self.state == 'idle':
            if self.task_queue:
                current_task = self.task_queue[0]
                if current_task.get('charge', False):
                    if self.pos == current_task['pos']:
                        self.state = 'charging'
                        self.charge_ticks = 0
                    else:
                        self.plan_path(current_task['pos'], graph)
                        if self.path:
                            self.state = 'moving'
                else:
                    # Normal task
                    if current_task.get('completed', False):
                        self.task_queue.popleft()
                        return
                    if self.pos == current_task['pos']:
                        current_task['completed'] = True
                        self.task_queue.popleft()
                    else:
                        self.plan_path(current_task['pos'], graph)
                        if self.path:
                            self.state = 'moving'
        
        # Execute movement
        if self.state == 'moving':
            self.move_step(occupied_positions)
        elif self.state == 'charging':
            self.move_step(occupied_positions)  # This handles charging logic

# --- Initialize Robots ---
robots = [
    Robot(0, (0, 0), capacity=10),
    Robot(1, (GRID_WIDTH-1, 0), capacity=6),
    Robot(2, (0, GRID_HEIGHT-1), capacity=8),
]

# --- Graph with obstacles ---
def build_graph():
    G = nx.grid_2d_graph(GRID_WIDTH, GRID_HEIGHT)
    for ob in obstacles:
        if ob in G:
            G.remove_node(ob)
    return G

# --- Decentralized Auction for Tasks ---
def decentralized_auction(tasks, robots):
    unassigned_tasks = [t for t in tasks if t['assigned_to'] is None and not t['completed']]
    for task in unassigned_tasks:
        bids = []
        for r in robots:
            # Only bid if robot is not charging or going to charge
            if r.battery >= BATTERY_THRESHOLD or not any(t.get('charge', False) for t in r.task_queue):
                bid = r.bid_for_task(task)
                bids.append((bid, r))
        
        if bids:  # Only assign if there are valid bids
            bids.sort(key=lambda x: x[0])
            best_bid, winner = bids[0]
            if best_bid != float('inf'):
                task['assigned_to'] = winner.id
                winner.task_queue.append(task)

# --- Draw Functions ---
def draw_grid():
    for x in range(0, SCREEN_WIDTH, GRID_SIZE):
        pygame.draw.line(screen, GRID_COLOR, (x, 0), (x, SCREEN_HEIGHT))
    for y in range(0, SCREEN_HEIGHT, GRID_SIZE):
        pygame.draw.line(screen, GRID_COLOR, (0, y), (SCREEN_WIDTH, y))

def draw_obstacles():
    for ob in obstacles:
        rect = pygame.Rect(ob[0]*GRID_SIZE, ob[1]*GRID_SIZE, GRID_SIZE, GRID_SIZE)
        pygame.draw.rect(screen, OBSTACLE_COLOR, rect)

def draw_charging_stations():
    for cs in CHARGING_STATIONS:
        rect = pygame.Rect(cs[0]*GRID_SIZE, cs[1]*GRID_SIZE, GRID_SIZE, GRID_SIZE)
        pygame.draw.rect(screen, CHARGING_COLOR, rect)
        # Add charging station label
        label = font.render("CS", True, TEXT_COLOR)
        screen.blit(label, (rect.x+5, rect.y+5))

def draw_tasks():
    for task in tasks:
        color = TASK_COLOR if not task['completed'] else (100, 100, 100)
        rect = pygame.Rect(task['pos'][0]*GRID_SIZE, task['pos'][1]*GRID_SIZE, GRID_SIZE, GRID_SIZE)
        pygame.draw.rect(screen, color, rect)
        label = font.render(f"T{task['id']}", True, TEXT_COLOR)
        screen.blit(label, (rect.x+5, rect.y+5))

def draw_robots():
    for r in robots:
        x, y = r.pos
        center = (x*GRID_SIZE + GRID_SIZE//2, y*GRID_SIZE + GRID_SIZE//2)
        color = ROBOT_COLORS[r.id]
        
        # Draw robot with state indication
        if r.state == 'charging':
            pygame.draw.circle(screen, CHARGING_COLOR, center, GRID_SIZE//3, 3)
        pygame.draw.circle(screen, color, center, GRID_SIZE//3)
        
        label = font.render(f"R{r.id}", True, TEXT_COLOR)
        screen.blit(label, (center[0]-10, center[1]-10))

        # Battery bar
        bar_x = center[0] - GRID_SIZE//3
        bar_y = center[1] + GRID_SIZE//3
        bar_width = GRID_SIZE * 2//3
        bar_height = 5
        pygame.draw.rect(screen, (80, 80, 80), (bar_x, bar_y, bar_width, bar_height))
        fill_width = int(bar_width * (r.battery / BATTERY_CAPACITY))
        fill_color = BATTERY_LOW_COLOR if r.battery < BATTERY_THRESHOLD else (0, 255, 0)
        pygame.draw.rect(screen, fill_color, (bar_x, bar_y, fill_width, bar_height))

        # Draw task queue count and state
        queue_text = font.render(f"Q:{len(r.task_queue)} {r.state[:1].upper()}", True, TEXT_COLOR)
        screen.blit(queue_text, (center[0]-15, center[1] + 15))

def draw_stats():
    """Draw simulation statistics"""
    completed_tasks = sum(1 for task in tasks if task['completed'])
    stats_text = [
        f"Tasks Completed: {completed_tasks}/{NUM_TASKS}",
        f"Controls: SPACE=Pause, R=Reset, ESC=Exit"
    ]
    
    for i, text in enumerate(stats_text):
        rendered = font.render(text, True, TEXT_COLOR)
        screen.blit(rendered, (10, 10 + i * 25))

# --- Main ---
def main():
    paused = False
    last_update_time = time.time()
    MOVE_INTERVAL = 0.5  # seconds

    # Initial task assignment
    decentralized_auction(tasks, robots)

    while True:
        screen.fill(BG_COLOR)
        draw_grid()
        draw_obstacles()
        draw_charging_stations()
        draw_tasks()
        draw_robots()
        draw_stats()

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_SPACE:
                    paused = not paused
                elif event.key == pygame.K_r:
                    # Reset simulation
                    for r in robots:
                        r.pos = [(0, 0), (GRID_WIDTH-1, 0), (0, GRID_HEIGHT-1)][r.id]
                        r.task_queue.clear()
                        r.battery = BATTERY_CAPACITY
                        r.state = 'idle'
                        r.path = []
                        r.charge_ticks = 0
                    for task in tasks:
                        task['assigned_to'] = None
                        task['completed'] = False
                    decentralized_auction(tasks, robots)
                elif event.key == pygame.K_ESCAPE:
                    pygame.quit()
                    sys.exit()

        if not paused and time.time() - last_update_time > MOVE_INTERVAL:
            last_update_time = time.time()
            graph = build_graph()

            # Collect occupied positions
            occupied_positions = {r.pos for r in robots}

            for r in robots:
                # Exclude current robot position so it can move
                temp_occupied = occupied_positions - {r.pos}
                r.update(graph, temp_occupied)
                # Update positions after each robot moves
                occupied_positions = {rob.pos for rob in robots}

            # Only auction if there are unassigned tasks and robots available
            decentralized_auction(tasks, robots)

        if paused:
            pause_text = font.render("PAUSED - Press SPACE to Resume", True, (255, 255, 0))
            screen.blit(pause_text, (10, SCREEN_HEIGHT - 30))

        pygame.display.flip()
        clock.tick(FPS)

if __name__ == "__main__":
    main()

SystemExit: 