In [None]:
import mesa
import numpy as np

In [None]:
from mesa.experimental.continuous_space import ContinuousSpaceAgent


class ActiveWalker(ContinuousSpaceAgent):
    def __init__(
        self,
        model,
        space,
        position,
        destination,
        speed,
    ):
        super().__init__(space, model)
        self.model = model

        self.G = model.G_layer

        self.position = np.array(position)
        self.destination = np.array(destination)
        self.speed = speed

        self.direction = np.zeros(2)
        self.reached = False
        self.step_time = 0

        self.update_grid_indices()

    def update_grid_indices(self):
        r = int(self.position[1] / self.G.cell_h)
        c = int(self.position[0] / self.G.cell_w)
        self.r = np.clip(r, 0, self.G.height - 1)
        self.c = np.clip(c, 0, self.G.width - 1)

    def calculate_deltaU(self):
        displacement = self.destination - self.position
        d_norm = np.linalg.norm(displacement)

        d_u = np.zeros(2) if d_norm < 0.05 else (displacement / d_norm)

        return d_u

    def calculate_deltaV(self):
        v = self.G.V

        i = self.r
        j = self.c
        i_d = min(i + 1, self.G.height - 1)
        i_u = max(i - 1, 0)
        j_r = min(j + 1, self.G.width - 1)
        j_l = max(j - 1, 0)

        gx = (v[i, j_r] - v[i, j_l]) / (2 * self.G.cell_w)
        gy = (v[i_d, j] - v[i_u, j]) / (2 * self.G.cell_h)

        d_v = np.array([gx, gy], dtype=float)

        return d_v

    def calculate_new_direction(self):
        grad_u = self.calculate_deltaU()
        grad_v = self.calculate_deltaV()

        v_norm = np.linalg.norm(grad_v)
        v_dir = grad_v / v_norm if v_norm > 0 else np.zeros(2)

        # Dot product: 1.0 = aligned, 0.0 = perpendicular, -1.0 = backward
        alignment = np.dot(grad_u, v_dir)

        base_kappa = 4.0
        gamma = 0.2

        kappa = base_kappa * (1 + alignment) if alignment > -0.5 else 0

        effective_v = v_dir * min(v_norm, 1.0)

        # The Result: Destination + Trail + Inertia
        new_dir = grad_u + (effective_v * kappa) + (self.direction * gamma)

        dir_norm = np.linalg.norm(new_dir)
        self.direction = np.zeros(2) if dir_norm < 0.05 else new_dir / dir_norm

    def position_update(self):
        noise = np.random.normal(loc=0, scale=0.1, size=2)  # noise=0.2
        new_pos = self.position + self.direction * self.speed + noise

        xmax = self.model.space_w
        ymax = self.model.space_h

        eps = 1e-3
        new_pos[0] = np.clip(new_pos[0], eps, xmax - eps)
        new_pos[1] = np.clip(new_pos[1], eps, ymax - eps)

        return new_pos

    def step(self):
        if np.linalg.norm(self.destination - self.position) < 1.0:
            self.reached = True
        else:
            self.position = self.position_update()

        self.step_time += 1

        self.update_grid_indices()

In [None]:
class StopAgent(ContinuousSpaceAgent):
    def __init__(self, model, space, position):
        super().__init__(space, model)
        self.position = np.array(position)
        self.is_stop = True

    def step(self):
        pass

In [None]:
from scipy.signal import convolve2d


class StepDeposit:
    def __init__(
        self,
        model,
        width=1000,
        height=1000,
        default_value=0,
        max_value=1,
        print_strength=0.9,
        trail_dies_in=200,
        vision=20,
    ):
        self.model = model
        self.data = np.full((height, width), default_value)

        (_, xmax), (_, ymax) = self.model.space.dimensions
        self.cell_w = (xmax) / width
        self.cell_h = (ymax) / height

        self.width = width
        self.height = height

        self.sigma = vision

        self.V = np.zeros_like(self.data)

        stop_cells = []
        for x, y in self.model.stops:
            c = x // self.cell_w
            c = int(np.clip(c, 0, self.width - 1))
            r = y // self.cell_h
            r = int(np.clip(r, 0, self.height - 1))

            stop_cells.append([r, c])

        self.stop_cells = np.array(stop_cells)

        def create_diffuse_matrix():
            k_radius = int(3 * self.sigma)
            size = 1 + 2 * k_radius
            d = np.zeros((size, size), dtype=float)
            center = np.array([k_radius, k_radius])

            for r in range(size):
                for c in range(size):
                    dist = np.linalg.norm(np.array([r, c]) - center)
                    d[r, c] = np.exp(-dist / self.sigma)

            return d / d.sum()

        self.d = create_diffuse_matrix()

        self.max_value = max_value
        self.min_value = default_value
        self.print_strength = print_strength
        self.T = trail_dies_in

    def __getitem__(self, index):
        r, c = index
        return self.data[r, c]

    def updateV(self):
        self.V = convolve2d(self.data, self.d, mode="same")

    def decay(self):
        self.data = self.data - (self.data - self.min_value) / self.T

    def clean_stops(self):
        r_clear = self.sigma
        for stop_r, stop_c in self.stop_cells:
            r_min = max(0, stop_r - r_clear)
            r_max = min(self.height, stop_r + r_clear + 1)
            c_min = max(0, stop_c - r_clear)
            c_max = min(self.width, stop_c + r_clear + 1)

            self.data[r_min:r_max, c_min:c_max] = self.min_value

    def despositG(self):
        agents_pos_ind = np.array([[ag.r, ag.c] for ag in self.model.active_agents])
        if len(agents_pos_ind) == 0:
            return
        q = np.array([self.data[i, j] for i, j in agents_pos_ind])
        q = q / self.max_value
        q = (1 - q) * self.print_strength

        for ind, [i, j] in enumerate(agents_pos_ind):
            self.data[i, j] += q[ind]

        # self.clean_stops()

    def update(self):
        self.despositG()
        self.decay()

        self.updateV()

In [None]:
from mesa.agent import AgentSet
from mesa.experimental.continuous_space import ContinuousSpace


class ActiveModel(mesa.Model):
    def __init__(
        self,
        stops=None,
        population_size=50,
        width=50,
        height=50,
        speed_mean=1,
        default_value=0,
        max_value=5,
        print_strength=1.0,
        trail_dies_in=500,
        vision=6,
        resolution=2,
        rng=None,
    ):
        super().__init__(rng=rng)

        self.space = ContinuousSpace(
            [[0, width], [0, height]],
            torus=False,
            random=self.random,
            n_agents=population_size,
        )

        self.space_w = width
        self.space_h = height
        if stops is None:
            self.stops = [
                (int(width * 0.1), int(height * 0.1)),
                (int(width * 0.9), int(height * 0.1)),
                (int(width * 0.5), int(height * 0.9)),
            ]
        else:
            self.stops = stops

        self.active_agents = AgentSet([])

        d, p = self.origin_destination(population_size)
        s = np.random.normal(loc=speed_mean, scale=0.1, size=population_size)

        self.G_layer = StepDeposit(
            self,
            width=width * resolution,
            height=height * resolution,
            default_value=default_value,
            max_value=max_value,
            print_strength=print_strength,
            trail_dies_in=trail_dies_in,
            vision=vision,
        )

        ActiveWalker.create_agents(
            model=self,
            space=self.space,
            n=population_size,
            destination=d,
            speed=s,
            position=p,
        )

        # In activeModel.__init__, after creating regular agents
        # Add stops as special non-moving agents
        StopAgent.create_agents(
            model=self, space=self.space, n=len(self.stops), position=self.stops
        )

    def origin_destination(self, k):
        s = list(self.stops)
        d = self.random.choices(s, k=k)
        p = []
        for i in range(k):
            s.remove(d[i])
            p.append(self.random.choice(s))
            s.append(d[i])
        p = np.array(p)
        if k == 1:
            return d[0], p[0]
        return d, p

    def step(self):
        # grouped_agents=self.agents.groupby("destination")
        zero_time = self.agents.select(
            lambda a: isinstance(a, ActiveWalker) and a.step_time == 0
        )
        if self.time % 45 == 1:
            k = min(len(self.stops) + 2, len(zero_time))
            add_ag = self.random.choices(zero_time.to_list(), k=k)
            for add_a in add_ag:
                self.active_agents.add(add_a)

        self.active_agents.do("calculate_new_direction")
        self.active_agents.shuffle_do("step")

        agents_reached = self.active_agents.select(lambda a: a.reached)

        agent_to_remove = []
        for ag in agents_reached:
            if ag.step_time > 60:
                ag.step_time = 0
                agent_to_remove.append(ag)

            ag.destination, ag.position = self.origin_destination(1)
            ag.reached = False

        for remove_ag in agent_to_remove:
            self.active_agents.remove(remove_ag)

        self.G_layer.update()

In [None]:
import solara
from matplotlib.figure import Figure
from mesa.visualization import Slider, SolaraViz, SpaceRenderer
from mesa.visualization.components import AgentPortrayalStyle, PropertyLayerStyle
from mesa.visualization.utils import update_counter

In [None]:
@solara.component
def TrailHeatmap(model):
    update_counter.get()
    fig = Figure()
    ax = fig.subplots()
    if hasattr(model, "G_layer"):
        data = model.G_layer.V.copy()

        im = ax.imshow(
            data, cmap="magma", interpolation="nearest", origin="lower", aspect="auto"
        )

        fig.colorbar(im, ax=ax, label="Pheromone Intensity")

        ax.set_title(f"Trail Intensity (Step {model.time})")
        ax.set_xlabel("X Coordinate")
        ax.set_ylabel("Y Coordinate")

    solara.FigureMatplotlib(fig)


@solara.component
def Trail(model):
    update_counter.get()
    fig = Figure()
    ax = fig.subplots()
    if hasattr(model, "G_layer"):
        data = model.G_layer.data.copy()

        im = ax.imshow(
            data, cmap="Greens", interpolation="nearest", origin="lower", aspect="auto"
        )

        fig.colorbar(im, ax=ax, label="Pheromone Intensity")

        ax.set_title(f"Trail (Step {model.time})")
        ax.set_xlabel("X Coordinate")
        ax.set_ylabel("Y Coordinate")

    solara.FigureMatplotlib(fig)

In [None]:
# def post_process(ax):
#     """Customize the matplotlib axes after rendering."""
#     ax.set_title("Active Walker Model")
#     ax.set_xlabel("x")
#     ax.set_ylabel("y")
#     ax.grid(True, which="both", linestyle="--", linewidth=0.5, alpha=0.5)
#     ax.set_aspect("equal", adjustable="box")

#     # Draw stops
#     if hasattr(model, 'stops') and model.stops:
#         xs, ys = zip(*model.stops)
#         ax.scatter(
#             xs, ys, marker="X", s=400, c="yellow",
#             edgecolors="darkblue", linewidths=2.5, zorder=1000, label="Stops"
#         )
#         ax.legend(loc="upper right", framealpha=0.9, fontsize=10)

In [None]:
def agent_portrayal(agent):
    if isinstance(agent, StopAgent):
        return AgentPortrayalStyle(size=200, marker="^", zorder=100)

    boid_style = AgentPortrayalStyle(size=20, marker="o", color="red")
    if agent.reached:
        boid_style.update(("color", "green"), ("size", 30))

    return boid_style


def propertylayer_portrayal(layer):
    if layer.name == "stepDeposit":
        return PropertyLayerStyle(color="blue", alpha=0.8, colorbar=True)


model_params = {
    "population_size": Slider(
        label="Number",
        value=120,
        min=10,
        max=200,
        step=10,
    ),
    "width": 100,
    "height": 100,
}

In [None]:
parameters = {
    "stops": [(5, 5), (35, 5), (20, 35)],
    "population_size": 80,
    "width": 40,
    "height": 40,
    "speed_mean": 1.0,
    "default_value": 0,
    "max_value": 10,
    "print_strength": 5.0,
    "trail_dies_in": 2000,
    "vision": 4,
    "resolution": 2,
    "rng": None,
}

model = ActiveModel(**parameters)

In [None]:
renderer = SpaceRenderer(
    model,
    backend="matplotlib",
)
renderer.draw_agents(agent_portrayal)
# renderer.post_process=post_process

page = SolaraViz(
    model,
    renderer,
    components=[TrailHeatmap, Trail],
    model_params=model_params,
    name="Active Walker Model",
)

In [None]:
# for i in range(100):
#     model.step()
#     print(i)

In [None]:
page