In [None]:
!pip install mesa

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mesa
  Downloading Mesa-0.9.0-py3-none-any.whl (691 kB)
[K     |████████████████████████████████| 691 kB 5.2 MB/s 
[?25hCollecting cookiecutter
  Downloading cookiecutter-2.1.1-py2.py3-none-any.whl (36 kB)
Collecting pyyaml>=5.3.1
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (596 kB)
[K     |████████████████████████████████| 596 kB 53.5 MB/s 
[?25hCollecting binaryornot>=0.4.4
  Downloading binaryornot-0.4.4-py2.py3-none-any.whl (9.0 kB)
Collecting jinja2-time>=0.2.0
  Downloading jinja2_time-0.2.0-py2.py3-none-any.whl (6.4 kB)
Collecting arrow
  Downloading arrow-1.2.2-py3-none-any.whl (64 kB)
[K     |████████████████████████████████| 64 kB 1.8 MB/s 
Installing collected packages: arrow, pyyaml, jinja2-time, binaryornot, cookiecutter, mesa
  Attempting uninstall: pyyaml
    Found existing install

In [None]:
# 'Model' sirve para definir los atributos a nivel del modelo, maneja los agentes
# 'Agent' es la unidad atómica y puede ser contenido en múltiples instancias en los modelos
from mesa import Agent, Model 

# 'SingleGrid' sirve para forzar a un solo objeto por celda (nuestro objetivo en este "juego")
from mesa.space import SingleGrid

# 'SimultaneousActivation' habilita la opción de activar todos los agentes de manera simultanea.
from mesa.time import SimultaneousActivation

# 'DataCollector' permite obtener el grid completo a cada paso (o generación), útil para visualizar
from mesa.datacollection import DataCollector

# 'Random' permite generar números aleatorios
import random

# 'Math' permite realizar operaciones matemáticas
import math

# 'json' permite exportar los datos a un archivo JSON
import json

# 'matplotlib' lo usamos para graficar/visualizar como evoluciona el autómata celular.
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
plt.rcParams["animation.html"] = "jshtml"
matplotlib.rcParams['animation.embed_limit'] = 2 ** 128

# Definimos los siguientes paquetes para manejar valores númericos: 'numpy' & 'pandas'
import numpy as np
import pandas as pd

# Definimos otros paquetes que vamos a usar para medir el tiempo de ejecución de nuestro algoritmo.
import time
import datetime

In [None]:
cmap = matplotlib.cm.get_cmap('viridis', 8)
cmap = cmap(np.linspace(0, 1, 8))
cmap[0] = np.array([50/256, 50/256, 50/256, 1])   		# Piso del almacen
# cmap[0] = np.array([256/256, 256/256, 256/256, 1]) 
cmap[1] = np.array([255/256, 255/256, 153/256, 1])  	# Repisa con una caja
cmap[2] = np.array([255/256, 204/256, 102/256, 1])    	# Repisa con dos cajas
cmap[3] = np.array([255/256, 153/256, 51/256, 1])    	# Repisa con tres cajas
cmap[4] = np.array([255/256, 51/256, 0/256, 1]) 		# Repisa con cuatro cajas
cmap[5] = np.array([153/256, 0/256, 0/256, 1])  		# Repisa con cinco cajas
cmap[6] = np.array([0/256, 204/256, 0/256, 1])  		# Caja en el almacén
cmap[7] = np.array([102/256, 102/256, 255/256, 1])   	# Robot en el almacén

new_cmap = matplotlib.colors.ListedColormap(cmap)

In [None]:
def dist (a, b):
	return math.sqrt((a[0] - b[0])**2 + (a[1] - b[1])**2)

In [None]:
class RobotAgent(Agent):
	def __init__(self, unique_id, model):
		super().__init__(unique_id, model)
		q = list(range(8))
		q.remove(4)
		self.next_pos = None
		self.direction = random.choices(q)[0]
		self.box = None
		self.model.shelf = (0, 0)
	
	def step(self):
		self.model.appendAgent(self, self.model.step_result.robots)

		self.next_pos = self.pos

		# Si el robot esta cargando una caja
		if self.box:
			# Si el robot esta en la repisa, intentamos guardar la caja
			if self.nextToShelf():					
				self.dropBox()
			# O acercanos a la repisa
			else:
				self.moveToShelf()
		# Si el robot no esta cargando una caja
		else:
			# Recogemos las cajas si estan cerca
			if self.nextToBox():
				return
			
			# Elegimos una dirección aleatoria que no salga del grid ni choque
			if random.random() < 0.1:
				self.chooseRandomDirection()
			elif self.model.grid.out_of_bounds(self.getNextPos()):
				self.direction = 4
				self.chooseRandomDirection()
			if not self.model.grid.is_cell_empty(self.getNextPos()):
				self.direction = 4
				self.chooseRandomDirection()
			self.continueDirection()

	def advance(self):
		# Confirmamos que la siguiente posicion siga siendo valida
		if self.model.grid.is_cell_empty(self.next_pos) and not self.model.grid.out_of_bounds(self.next_pos):
			self.model.grid.move_agent(self, self.next_pos)

	'''
	Si el robot esta al lado de una caja, la recoge 
	'''
	def nextToBox(self):
		# Recorremos todas los vecinos del robot
		neighbors = self.model.grid.get_neighbors(
			self.pos, 
			moore=True, 
			include_center=False)

		for neighbor in neighbors:
			# Si alguno de los vecinos es una caja lo recogemos
			if isinstance(neighbor, BoxAgent):
				self.box = neighbor
				self.model.grid.remove_agent(neighbor)
				# self.model.schedule.remove(neighbor)
				neighbor.remove = True
				return True
		return False

	'''
	Comprueba si el robot esta al lado de una repisa
	'''
	def nextToShelf(self):
		if dist(self.pos, self.model.shelf) == 1:
			return True

	'''
	Comprueba que haya una repisa con espacio libre para almacenar una caja
	Guarda la caja o crea una nueva repisa
	'''
	def dropBox(self):
		cell = self.model.grid.get_cell_list_contents(self.model.shelf)
		# Si hay un agente en el espacio de la repisa
		if cell:
			# Comprobamos que existe una repisa con espacio libre para guardarla
			if isinstance(cell[0], ShelfAgent):
				if len(cell[0].boxes) < 5:
					cell[0].boxes.append(self.box)
					self.model.stored_boxes += 1
					self.box = None
					self.direction = 4
					self.chooseRandomDirection()
				else:
					self.model.new_shelf = True
					
		# # Si no hay un agente en el espacio de la repisa
		else:
			# Creamos una nueva repisa
			self.model.new_shelf = True
				

	'''
	Mueve el robot a la siguinte posición mas cercana a la repisa que este desocupada
	'''
	def moveToShelf(self):
		# Recorremos todas las posiciones adyacentes a la posición del robot
		neighbors = self.model.grid.get_neighborhood(
			self.pos,
			moore=True,
			include_center=False)
		
		# Checamos que no estemos sobre una repisa
		if self.pos != self.model.shelf:
			closestDistance = dist(self.pos, self.model.shelf)
			closestCell = self.pos
		else:
			closestDistance = self.model.grid.width + self.model.grid.height
			closestCell = None

		for neighbor in neighbors:
			# Si la posición esta vacia y no es un espacio para la repisa
			if self.model.grid.is_cell_empty(neighbor) and neighbor != self.model.shelf:
				distance = dist(neighbor, self.model.shelf)
				# Guardamos la posición mas cercana
				if distance < closestDistance:
					closestDistance = distance
					closestCell = neighbor

		# Si la posición mas cercana es distinta a la actual la guardamos
		if closestCell:
			self.next_pos = closestCell
		# O se queda en su posición actual
		else:
			self.next_pos = self.pos

	'''
	Selecciona una dirección aleatoria que tenga una celda vacía y este dentro del grid
	'''
	def chooseRandomDirection(self):
		# Creamos una lista con las direcciones posibles
		q = list(range(9))
		if self.direction != 4:
			q.remove(self.direction)
		q.remove(4)

		random.shuffle(q)
		# Recorremos todas las direcciones posibles validas
		for direction in q: 
			self.direction = 8 - direction
			if self.model.grid.out_of_bounds(self.getNextPos()):
				direction = 4
			elif self.model.grid.is_cell_empty(self.getNextPos()):
				return
	
	'''
	Comprueba que la celda siguiente a la actual sea valida y no este ocupada para continuar la dirección
	'''
	def continueDirection(self):
		# Si la siguiente posicion esta afuerda del grid o esta ocupada elije una nueva dirección
		next_pos = self.getNextPos()
		if self.model.grid.out_of_bounds(next_pos):
			self.chooseRandomDirection()
		elif not self.model.grid.is_cell_empty(next_pos):
			self.chooseRandomDirection()
		# Regresamos la siguiente posición
		self.next_pos = self.getNextPos()

	'''
	Devuelve la siguiente posición del robot en función de su dirección
	'''
	def getNextPos(self):
		# Sumamos la dirección al punto actual
		directions = [[-1, 1],  [0, 1],  [1, 1], 
					  [-1, 0],  [0, 0],  [1, 0], 
					  [-1, -1], [0, -1], [1, -1]]
		next_pos = [self.pos[0] + directions[self.direction][0], self.pos[1] + directions[self.direction][1]]
		return tuple(next_pos)

'''
Clase que representa una caja
'''
class BoxAgent(Agent):
	def __init__(self, unique_id, model):
		super().__init__(unique_id, model)
		self.remove = False

	def step(self):
		if not self.remove:
			self.model.appendAgent(self, self.model.step_result.boxes)
	
	def advance(self):
		if self.remove:
			self.model.schedule.remove(self)

'''
Clase que representa una repisa
'''
class ShelfAgent(Agent):
	def __init__(self, unique_id, model):
		super().__init__(unique_id, model)
		self.boxes = []
	def step(self):
		self.model.appendAgentShelf(self, self.model.step_result.shelves)

'''
Modelo para simular el comportamiento de agentes que recogen y organizan cajas 
en repisas dentro de un almacén
'''
class StorageModel(Model):
	def __init__(self, width, height, k):
		self.shelf = (0, 0) 	# Posición de la repisa objetivo
		self.last_shelf = None	# Posición de la repisa objetivo
		self.new_shelf = False 	# Indica si hay que crear una nueva repisa
		self.stored_boxes = 0 	# Cantidad de cajas almacenadas
		self.simulation_results = SimulationResult()
		self.step_result = StepResult(0)
		self.grid = SingleGrid(width, height, False)
		self.schedule = SimultaneousActivation(self)

		# Inicializamos la grid en el resultado
		self.simulation_results.grid = Grid(width, height)

		a = ShelfAgent((0, "Shelf"), self)
		self.grid.place_agent(a, (0, 0))
		self.schedule.add(a)
		self.appendAgentShelf(a, self.step_result.shelves)

		self.last_shelf = (0, 0)

		# Creamos los 5 robots en posiciones aleatorias
		for i in range(5):
			cell = self.grid.find_empty()
			a = RobotAgent((i, "Robot"), self)
			self.grid.place_agent(a, cell)
			self.schedule.add(a)
			self.appendAgent(a, self.step_result.robots)

		# Validamos la cantidad de cajas posibles
		k = min(k, height * 5)
		# Creamos k cajas en posiciones aleatorias
		for i in range(k):
			cell = self.grid.find_empty()
			b = BoxAgent((i, "Box"), self)
			self.grid.place_agent(b, cell)
			self.schedule.add(b)
			self.appendAgent(b, self.step_result.boxes)
   
		self.simulation_results.steps.append(self.step_result)
  
		self.datacollector = DataCollector(
            model_reporters={"Grid": self.get_grid}
        )
	
	def step(self):
		# Aumentamos el contador de pasos
		self.simulation_results.stepCount += 1
		# Reiniciamos la variable de resultados de paso
		self.step_result = StepResult(self.simulation_results.stepCount)

		self.datacollector.collect(self)
		self.schedule.step()
		if self.new_shelf:
			self.createShelf()
		self.simulation_results.steps.append(self.step_result)

	def createShelf(self):
		# Creamos una nueva repisa
		self.shelf = (self.last_shelf[0], self.last_shelf[1] + 1)
		if not self.grid.out_of_bounds(self.shelf):
			if self.grid.is_cell_empty(self.shelf):
				a = ShelfAgent((self.shelf[1], "Shelf"), self)
				self.grid.place_agent(a, self.shelf)
				self.schedule.add(a)
				self.last_shelf = self.shelf
				self.new_shelf = False

	def appendAgent(self, agent, array):
		pos = Position(agent.pos)
		agent = Agent(agent.unique_id[0], pos)
		array.append(agent)
  
	def appendAgentShelf(self, agent, array):
		pos = Position(agent.pos)
		agent = AgentShelf(agent.unique_id[0], pos, len(agent.boxes))
		array.append(agent)

	def get_grid(self):
		grid = np.zeros((self.grid.height, self.grid.width))

		# Por cada celda asignamos un valor dependiendo de su contenido
		for cell in self.grid.coord_iter():
			cell_content, x, y = cell
			if isinstance(cell_content, RobotAgent):
				grid[y][x] = 7
			elif isinstance(cell_content, BoxAgent):
				grid[y][x] = 6
			elif isinstance(cell_content, ShelfAgent):
				grid[y][x] = len(cell_content.boxes)
		return grid

class SimulationResult(object):
	def __init__(self):
		self.stepCount = 0
		self.steps = []
		self.grid = None
	
	def toJSON(self):
		return json.dumps(self, default=lambda o: o.__dict__, 
						  sort_keys=True, indent=4)

class StepResult (object):
	def __init__(self, index):
		self.index = index
		self.robots = []
		self.boxes = []
		self.shelves = []

class Grid (object):
	def __init__(self, width, height):
		self.width = width
		self.height = height

class Agent (object):
	def __init__(self, id, pos):
		self.id = id
		self.pos = pos

class AgentShelf (object):
	def __init__(self, id, pos, stored):
		self.id = id
		self.pos = pos
		self.stored = stored
	
class Position (object):
	def __init__(self, pos):
		self.x = pos[0]
		self.y = 0
		self.z = pos[1]

In [None]:
# Definimos el tamaño del Grid
GRID_SIZE = 5

# Definimos el número de generaciones a correr
NUM_GENERATIONS = 0

NUM_BOXES = 7

# Registramos el tiempo de inicio y corremos el modelo
start_time = time.time()
model = StorageModel(GRID_SIZE, GRID_SIZE, NUM_BOXES)

# Contador de 1 segundo para que el modelo corra
EXECUTE_TIME = 2
start_time = time.time()


while (model.stored_boxes < NUM_BOXES) and ((time.time() - start_time) < EXECUTE_TIME):
    NUM_GENERATIONS += 1
    model.step()
model.step()
NUM_GENERATIONS += 1

simulationResult = model.simulation_results

# Imprimimos el tiempo que le tomó correr al modelo.
print('Tiempo de ejecución:', str(datetime.timedelta(seconds=(time.time() - start_time))))

Tiempo de ejecución: 0:00:00.006438


In [None]:
all_grid = model.datacollector.get_model_vars_dataframe()

In [None]:
%%capture

fig, axs = plt.subplots(figsize=(7,7))
axs.set_xticks([])
axs.set_yticks([])
patch = plt.imshow(all_grid.iloc[0][0], cmap=new_cmap)

def animate(i):
    patch.set_data(all_grid.iloc[i][0])
    
anim = animation.FuncAnimation(fig, animate, frames=NUM_GENERATIONS)

In [None]:
simulationResultJson = simulationResult.toJSON()
with open('simulation.json', 'w') as outfile:
	outfile.write(simulationResultJson)

In [None]:
anim