In [None]:
# La clase `Model` se hace cargo de los atributos a nivel del modelo, maneja los agentes. 
# Cada modelo puede contener múltiples agentes y todos ellos son instancias de la clase `Agent`.
from mesa import Agent, Model

# Debido a que necesitamos un solo agente por celda elegimos `SingleGrid` que fuerza un solo objeto por celda.
from mesa.space import MultiGrid

from mesa.batchrunner import BatchRunnerMP
# Con `SimultaneousActivation` hacemos que todos los agentes se activen de manera simultanea.
from mesa.time import SimultaneousActivation

# Vamos a hacer uso de `DataCollector` para obtener el grid completo cada paso (o generación) y lo usaremos para graficarlo.
from mesa.datacollection import DataCollector

# mathplotlib lo usamos para graficar/visualizar como evoluciona el autómata celular.
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
plt.rcParams["animation.html"] = "jshtml"
matplotlib.rcParams['animation.embed_limit'] = 2**128

# Definimos los siguientes paquetes para manejar valores númericos.
import numpy as np
import pandas as pd

import itertools

from typing import Union

from functools import reduce

import math

# Definimos otros paquetes que vamos a usar para medir el tiempo de ejecución de nuestro algoritmo.
import time
import datetime

## Creacion de Agentes, Modelos y funcion para colorear output grid

In [None]:
MAX_PER_STACK=100

class CleaningAgent(Agent):
    def __init__(self,id, x, y, model):
        super().__init__(id,model)
        self.id: int = id
        self.coords: tuple[int,int] = x,y
        self.model: CleaningModel = model
        self.next_pos: tuple[int,int] = self.coords
        self.total_steps: int = 0
        self.cur_shelf: Union[None,BoxAgent] = None
        self.carrying_box: bool = False
        self.directions: Union[None,tuple[tuple,tuple]] = None
    
    def step(self):
        nearby_boxes = [agent for agent in self.model.grid.iter_neighbors(self.coords, False, False, 1) if isinstance(agent,BoxAgent) and not agent.grabbed]
        
        valid_neighborhood = list(self.model.grid.iter_neighborhood(self.coords,False,False,1))
        choices = valid_neighborhood
        
        if not nearby_boxes and not self.carrying_box:
            # choose next position
            self.next_pos =choices[np.random.choice(len(choices))]
        elif nearby_boxes and not self.carrying_box:
            for box in nearby_boxes:
                if not box.stack_size:
                    if not self.cur_shelf:
                        box.stack_size = 1
                        self.model.dirt_amount -= 1
                        self.cur_shelf = box
                        return
                    else:
                        box.grabbed = True
                        self.carrying_box = True
                if self.carrying_box:
                    self.next_pos =choices[np.random.choice(len(choices))]
                    return
        if self.cur_shelf and self.carrying_box:
            xdir: int = 0
            if self.cur_shelf.coords[0] < self.coords[0]:
                xdir = -1
            elif self.cur_shelf.coords[0] == self.coords[0]:
                xdir = 0
            else:
                xdir = 1
            if self.cur_shelf.coords[1] < self.coords[1]:
                ydir = -1
            elif self.cur_shelf.coords[1] == self.coords[1]:
                ydir = 0
            else:
                ydir = 1
            self.directions = (xdir,0), (0,ydir)
            if xdir != 0:
                self.next_pos = self.coords[0] + self.directions[0][0], self.coords[1] + self.directions[0][1]
            else:
                self.next_pos = self.coords[0] + self.directions[1][0], self.coords[1] + self.directions[1][1]
        else:
            self.next_pos =choices[np.random.choice(len(choices))]
                        
    def advance(self):
        canMove = self.next_pos not in self.model.next_occupied_spaces
        while not canMove:
            valid_neighborhood = list(self.model.grid.iter_neighborhood(self.coords,False,False,1))
            choices = valid_neighborhood
            self.next_pos =choices[np.random.choice(len(choices))]
            canMove = self.next_pos not in self.model.next_occupied_spaces
        if self.carrying_box and self.cur_shelf and self.next_pos == self.cur_shelf.coords:
            self.cur_shelf.stack_size += 1
            #print(self.cur_shelf.stack_size)
            if self.cur_shelf.stack_size == MAX_PER_STACK:
                self.cur_shelf = None
            self.carrying_box = False
            return
        if self.next_pos != self.coords:
            self.total_steps = self.total_steps + 1
        self.coords = self.next_pos
        self.model.grid.move_agent(self,self.coords)
        self.model.next_occupied_spaces.append(self.coords)

class BoxAgent(Agent):
    def __init__(self,id, x: int, y: int, model):
        super().__init__(id, model)
        self.id = id
        self.coords = x, y
        self.model = model
        self.next_state = None
        self.stack_size = None
        self.grabbed = False
        self.done = False
        
    def step(self):
        # if there are any vacuum agents in the same cell next state is clean
        if self.grabbed:
            if not self.done:
                self.model.dirt_amount = self.model.dirt_amount-1
                self.done = True
        
    def advance(self):
        self.grabbed = self.grabbed or self.next_state
        
def get_grid(model) -> np.ndarray: 
    dimensions = model.grid.width, model.grid.height
    grid = np.zeros(dimensions)
    for x, line in enumerate(grid):
        for y, _ in enumerate(line):
            # amount of dirt agents in cell
            box = len(list(filter(lambda agent: isinstance(agent,BoxAgent) and not (agent.grabbed or agent.stack_size), model.grid.iter_neighbors((x,y), False, True, 0))))
            #print(f'{x=} {y=} {dirt=}')
            stacks = len(list(filter(lambda agent: isinstance(agent,BoxAgent) and agent.stack_size, model.grid.iter_neighbors((x,y), False, True, 0))))
            # len of dirt agents in cell
            cleaner = len(list(filter(lambda agent: isinstance(agent,CleaningAgent), model.grid.iter_neighbors((x,y), False, True, 0))))
            #print(f'{vacuums=}')
            if box and not stacks and not cleaner:
                grid[x][y] = 0
            elif stacks and cleaner:
                grid[x][y] = 0.75
            elif stacks and not cleaner:
                grid[x][y] = 0.25
            elif not box and cleaner:
                grid[x][y] = 1
            elif not box and not cleaner:
                grid[x][y] = 0.5
        
    return grid

def get_dirt_amount(model)-> int: 
    return model.dirt_amount

        
class CleaningModel(Model):
    def __init__(self,M: int,N: int,robots: int,box_percentage: float):
        self.grid = MultiGrid(M,N,False)
        self.x = M
        self.y = N
        self.schedule = SimultaneousActivation(self)
        self.dirt_amount = int(M*N*(box_percentage*100))//100
        self.running = True
        self.start_time = time.time()
        self.cur_time = 0
        self.next_occupied_spaces = []
        
        id = 0
        shuffled_coords = list((x, y) for agent, x, y in self.grid.coord_iter())
        np.random.shuffle(shuffled_coords)
        self.spawn_coords = shuffled_coords[0:robots]
        for ( x, y) in self.spawn_coords:
            b = CleaningAgent(id, x, y, self)
            self.grid.place_agent(b, (x, y))
            self.schedule.add(b)
            id = id + 1

        for (x, y) in shuffled_coords[robots+1:self.dirt_amount]:
            a = BoxAgent(id, x, y, self)
            self.grid.place_agent(a, (x, y))
            self.schedule.add(a)
            id = id + 1
        
        
        avg_x = reduce(lambda acc, p: acc + p[0], self.spawn_coords,0) / len(self.spawn_coords)
        avg_y = reduce(lambda acc, p: acc + p[1], self.spawn_coords,0) / len(self.spawn_coords)
        self.avg_distance_to_center = math.sqrt((avg_x - (M-1)/2)**2 + (avg_y-(N-1)/2)**2)
        #self.average_distance_to_center = 
    
                
        self.datacollector = DataCollector(
            model_reporters={"Grid": get_grid, "Dirt_amount":get_dirt_amount})
    def step(self):
        self.datacollector.collect(self)
        self.schedule.step()
        self.next_occupied_spaces = []
        self.cur_time = time.time() - self.start_time 
        if self.dirt_amount == 0:
            self.running = False

## Correr modelo con combinaciones de variables para ver correlaciones entre variables de entrada y salidas

In [None]:
M = [10,20,40]
N = [10,20,40]
num_robots = [1,5,10]
porc_cajas = [0.1,0.5,0.9]
exec_settings = dict(M=M,N=N,robots=num_robots,box_percentage=porc_cajas)

model_reporter = {"Time until done": lambda m: (m.cur_time), "Clean Percentage": lambda m: 1 - m.dirt_amount/(m.x*m.y) ,"Average distance to center": lambda m: m.avg_distance_to_center, "Average_steps_per_agent": lambda m:(lambda lst: sum(lst)/len(lst))([agent.total_steps for agent in m.schedule.agent_buffer() if isinstance(agent,CleaningAgent)])}
    # Imprimimos el tiempo que le tomó correr al modelo.
param_run = BatchRunnerMP(CleaningModel, None, variable_parameters=exec_settings, model_reporters=model_reporter)

In [None]:
param_run.run_all()

## Convertir resultado de batch run, pasarlo a dataframe y quitarle una columna innecesaria

In [None]:
df = param_run.get_model_vars_dataframe()
df = df.drop(["Run"],axis=1)
df.describe()

## Sacar la matriz de correlacion y hacer un heatmap

In [None]:
import seaborn as sn
import matplotlib.pyplot as plt

corrMatrix = df.corr()

mask = np.triu(np.ones_like(corrMatrix, dtype=bool))

# adjust mask and df
mask = mask[1:, :-1]
corr = corrMatrix.iloc[1:,:-1].copy()

colormap = sn.color_palette("magma", as_cmap=True)

sn.heatmap(corr, mask=mask, annot=True, fmt=".2f", cmap=colormap)
plt.show()

## Correr una vez el modelo para ver como funciona el modelo con una grafica

In [None]:
start = time.time()
exec_time = 1
gens = 0
X = 10
Y = 10
vacuums = 3
dirt_percentage = 0.8
model = CleaningModel(X, Y, vacuums, dirt_percentage)
while gens < 100:
    model.step()
    gens = gens +1

dirt = model.datacollector.get_model_vars_dataframe()['Dirt_amount']
all_grid = model.datacollector.get_model_vars_dataframe()['Grid']
dirt.plot()

fig, axs = plt.subplots(figsize=(X,Y))
axs.set_xticks([])
axs.set_yticks([])
patch = plt.imshow(all_grid.iloc[0], cmap=plt.cm.coolwarm)

def animate(i):
    patch.set_data(all_grid.iloc[i])
    
anim = animation.FuncAnimation(fig, animate, frames=gens)

In [None]:
anim