In [1]:
!pip install mesa

Collecting mesa
  Downloading Mesa-2.1.1-py3-none-any.whl (1.8 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m27.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m22.3 MB/s[0m eta [36m0:00:00[0m
Collecting cookiecutter (from mesa)
  Downloading cookiecutter-2.3.0-py3-none-any.whl (39 kB)
Collecting solara (from mesa)
  Downloading solara-1.19.0-py2.py3-none-any.whl (13.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.1/13.1 MB[0m [31m77.5 MB/s[0m eta [36m0:00:00[0m
Collecting binaryornot>=0.4.4 (from cookiecutter->mesa)
  Downloading binaryornot-0.4.4-py2.py3-none-any.whl (9.0 kB)
Collecting arro

In [2]:
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import os
from mesa import Agent, Model
from mesa.time import RandomActivation
from mesa.space import MultiGrid

class MachineAsset:
    def __init__(self, unique_id, is_broken=False):
        self.unique_id = unique_id
        self.is_broken = is_broken

class RepairAgent(Agent):
    COLORS = ['yellow', 'blue', 'orange']

    def __init__(self, unique_id, model, prm):
        super().__init__(unique_id, model)
        self.assets_to_repair = []
        self.movement_history = []
        self.prm = prm

    def step(self):
        neighbors = self.model.grid.get_neighbors(self.pos, moore=True, include_center=False, radius=1)
        for neighbor in neighbors:
            if isinstance(neighbor, MachineAgent) and neighbor.asset.is_broken:
                self.fix_asset(neighbor.asset)
                if neighbor.asset in self.assets_to_repair:
                    self.assets_to_repair.remove(neighbor.asset)
                return

        if not self.assets_to_repair:
            self.find_assets_to_repair()

        if self.assets_to_repair:
            target_asset = self.assets_to_repair.pop()
            target_machine = next(agent for agent in self.model.schedule.agents if isinstance(agent, MachineAgent) and agent.asset == target_asset)
            self.move_towards(target_machine.pos)

        self.movement_history.append(self.pos)

    def find_assets_to_repair(self):
        for agent in self.model.schedule.agents:
            if isinstance(agent, MachineAgent) and agent.asset.is_broken:
                self.assets_to_repair.append(agent.asset)

    def fix_asset(self, asset):
        asset.is_broken = False

    def move_towards(self, target):
        path = self.prm.find_path(self.pos, target)
        if path and len(path) > 1:
            next_step = path[1]
            self.model.grid.move_agent(self, next_step)

class MachineAgent(Agent):
    BREAKDOWN_PROB = 0.01

    def __init__(self, unique_id, model, asset):
        super().__init__(unique_id, model)
        self.asset = asset
        self.broken_order = None

    def step(self):
        if not self.asset.is_broken and random.random() < self.BREAKDOWN_PROB:
            self.asset.is_broken = True
            if self.broken_order is None:
                self.broken_order = self.model.schedule.steps

class GridPRM:
    def __init__(self, model, num_samples):
        self.model = model
        self.nodes = []
        self.connections = {}

        for _ in range(num_samples):
            node = (random.randrange(model.grid.width), random.randrange(model.grid.height))
            self.nodes.append(node)
            self.connections[node] = []

        for i, node1 in enumerate(self.nodes):
            for node2 in self.nodes[i+1:]:
                if self.is_path_free(node1, node2):
                    self.connections[node1].append(node2)
                    self.connections[node2].append(node1)

    def is_path_free(self, start, end):
        x0, y0 = start
        x1, y1 = end
        dx = x1 - x0
        dy = y1 - y0

        x_sign = 1 if dx > 0 else -1
        y_sign = 1 if dy > 0 else -1

        dx = abs(dx)
        dy = abs(dy)

        if dx > dy:
            xx, xy, yx, yy = x_sign, 0, 0, y_sign
        else:
            dx, dy = dy, dx
            xx, xy, yx, yy = 0, y_sign, x_sign, 0

        D = 2*dy - dx
        y = 0

        for x in range(dx + 1):
            coord = (x0 + x*xx + y*yx, y0 + x*xy + y*yy)
            if not self.model.grid.is_cell_empty(coord):
                return False
            if D >= 0:
                y += 1
                D -= 2*dx
            D += 2*dy

        return True

    def find_path(self, start, end):
        if start not in self.nodes:
            self.nodes.append(start)
            self.connections[start] = []

        if end not in self.nodes:
            self.nodes.append(end)
            self.connections[end] = []

        distances = {node: float('infinity') for node in self.nodes}
        previous_nodes = {node: None for node in self.nodes}
        nodes = self.nodes.copy()
        distances[start] = 0

        while nodes:
            current_node = min(nodes, key=lambda node: distances[node])
            nodes.remove(current_node)

            if distances[current_node] == float('infinity'):
                break

            for neighbor in self.connections[current_node]:
                alternative_route = distances[current_node] + 1

                if alternative_route < distances[neighbor]:
                    distances[neighbor] = alternative_route
                    previous_nodes[neighbor] = current_node

        path, current_node = [], end
        while current_node:
            path.append(current_node)
            current_node = previous_nodes.get(current_node)

        return path[::-1] if path else None

class MachineModel(Model):
    def __init__(self, num_agents, num_machines, n):
        super().__init__()
        self.num_agents = num_agents
        self.grid = MultiGrid(n, n, torus=True)
        self.schedule = RandomActivation(self)
        self.prm = GridPRM(self, n*2)

        for i in range(self.num_agents):
            repair_agent = RepairAgent(i, self, self.prm)
            self.schedule.add(repair_agent)
            x, y = self.find_empty_location()
            self.grid.place_agent(repair_agent, (x, y))

        for i in range(num_machines):
            asset = MachineAsset(i + self.num_agents, is_broken=True if i % 3 == 0 else False)
            machine_agent = MachineAgent(i + self.num_agents, self, asset)
            self.schedule.add(machine_agent)
            x, y = self.find_empty_location()
            self.grid.place_agent(machine_agent, (x, y))

    def find_empty_location(self):
        x = random.randrange(self.grid.width)
        y = random.randrange(self.grid.height)
        while not self.grid.is_cell_empty((x, y)):
            x = random.randrange(self.grid.width)
            y = random.randrange(self.grid.height)
        return x, y

    def step(self):
        self.schedule.step()

def visualize(model, step_number):
    fig, ax = plt.subplots()
    grid = np.zeros((model.grid.width, model.grid.height, 3))
    for x in range(model.grid.width):
        for y in range(model.grid.height):
            cell_content = model.grid.get_cell_list_contents([(x, y)])
            agents_in_cell = [agent for agent in cell_content if isinstance(agent, RepairAgent)]
            if len(agents_in_cell) > 1:  # More than one agent in a cell
                grid[y, x] = [1, 0, 1]  # Set to magenta to indicate collision
            else:
                for agent in cell_content:
                    if isinstance(agent, RepairAgent):
                        grid[y, x] = colors.to_rgb(RepairAgent.COLORS[agent.unique_id % len(RepairAgent.COLORS)])
                    elif isinstance(agent, MachineAgent) and agent.asset.is_broken:
                        grid[y, x] = [1, 0, 0]
                    elif isinstance(agent, MachineAgent):
                        grid[y, x] = [0, 1, 0]

    ax.imshow(grid)

    for agent in model.schedule.agents:
        if isinstance(agent, RepairAgent) and len(agent.movement_history) > 1:
            y_vals, x_vals = zip(*agent.movement_history)
            ax.plot(x_vals, y_vals, '-b', linewidth=2.5)

    for agent in model.schedule.agents:
        if isinstance(agent, MachineAgent) and agent.asset.is_broken and agent.broken_order is not None:
            ax.text(agent.pos[0], agent.pos[1], str(agent.broken_order), color='black', fontsize=8, ha='center', va='center')

    plt.savefig(f"step_{step_number}.png", dpi=300)
    plt.close(fig)

def run_and_visualize(model, steps):
    if not os.path.exists('visualizations'):
        os.mkdir('visualizations')

    for i in range(steps):
        model.step()
        visualize(model, i)

model = MachineModel(3, 6, 10)
run_and_visualize(model, 100)
