In [131]:
import numpy as np
import cv2
import random
import time
import json
np.set_printoptions(precision=2)

begin_startup = time.time()

In [132]:
def distance(pos1,pos2,norm=2):
    return (abs(pos1[1] - pos2[1])**norm + abs(pos1[0] - pos2[0])**norm)**(1/norm)

def checkIfPosValid(pos):
    # this is where we can add walls and such later
    return (0<=pos[0]<grid_size[1]) and (0<=pos[1]<grid_size[0])

def rand_pos():
    while True:
        pos = (random.randint(0,grid_size[1]),random.randint(0,grid_size[0]))
        if checkIfPosValid(pos):
            return pos

def statistics():
    output_dict = {}
    output_dict["timestep"] = t

    output_dict["food_stats"] = {}
    output_dict["food_stats"]["nest_amount"] = nest.amount

    output_dict["walker_stats"] = {}
    output_dict["walker_stats"]["total_walkers"] = len(walker_list)
    for job in list(Walker.job_dict.keys()):
        output_dict["walker_stats"]["{}s".format(job)] = 0
    
    output_dict["food_stats"]["walker_total_amount"] = 0
    for walker in walker_list:
        job = walker.job
        output_dict["walker_stats"]["{}s".format(job)] += 1
        output_dict["food_stats"]["walker_total_amount"] += walker.food_count

    output_dict["food_stats"]["food_pile_total_amount"] = 0
    for food in food_tracker.tracker:
        output_dict["food_stats"]["food_pile_total_amount"] += food["amount"]

    output_dict["food_stats"]["grand_total"] = output_dict["food_stats"]["nest_amount"] + output_dict["food_stats"]["walker_total_amount"] + output_dict["food_stats"]["food_pile_total_amount"]

    output_dict["marker_stats"] = {}
    output_dict["marker_stats"]["total_count"] = 0

    for marker_label in list(marker_lookup.keys()):
        marker_tracker = marker_lookup[marker_label]
        output_dict["marker_stats"][marker_label] = {}
        output_dict["marker_stats"][marker_label]["number_of_markers"] = len(marker_tracker.tracker)
        output_dict["marker_stats"]["total_count"] += len(marker_tracker.tracker)

    return output_dict

    

In [133]:
class MarkerTracker:
	falloff = 4
	duration = 60
	min_strength = 0.001

	def __init__(self,colour,behaviour="unique"):
		self.behaviour=behaviour
		if behaviour == "stack":
			self.tracker = []
		elif behaviour == "unique":
			self.tracker ={}
		self.conc_map = np.full(grid_size,0,dtype=np.float64)
		self.colour = colour
		self.precalc = self.generatePrecalc()
	
	def addMarker(self,pos,strength=1,will_decay=True):
		marker = {"deposited":t,
				  "will_decay":will_decay,
				  "expiry":t+self.duration,
				  "position":list(pos),
				  "strength":strength}
		if self.behaviour == "stack":
			self.tracker.append(marker)
		elif self.behaviour == "unique":
			self.tracker[pos] = marker

	def cullMarkerList(self):
		new = []
		for marker in self.tracker:
			expiry = marker["expiry"]
			flag = marker["will_decay"]
			#print("marker with expiry at tick {}, current time is {}. my flag is {}".format(expiry,t,flag))
			if expiry > t or flag == False:
				new.append(marker)

		self.tracker = new

	def cullMarkerDict(self):
		for key in list(self.tracker.keys()):
			marker = self.tracker[key]
			expiry = marker["expiry"]
			flag = marker["will_decay"]
			if expiry <= t and flag == True:
				del self.tracker[key]
				

	def generateConcMapLegacy(self):
		if self.behaviour == "stack":
			self.cullMarkerList()
		elif self.behaviour == "unique":
			self.cullMarkerDict()
		
		conc_map = np.full(grid_size,0,dtype=np.float64)
		
		# this iterates over every single cell in the grid
		for i_y,i_x in np.ndindex(grid_size):	
			conc = 0
			for marker in self.tracker:
				if self.behaviour == "unique":
					marker = self.tracker[marker]

				pos = marker["position"]
				deposited = marker["deposited"]
				strength = marker["strength"] * (1-((t-deposited)/self.duration))
				# this iterates over the markers
				norm = 2 # 1 for taxicab distance, 2 for standard euclidean etc
				dist = distance(pos,(i_x,i_y),norm)
				
				dist += 1 # stops div zero errors

				conc += strength/(dist**(self.falloff))

			if conc <= self.min_strength:
				#cut strength exactly to zero past min threshold
				conc = 0
			conc_map[i_y,i_x] = conc
		self.conc_map=conc_map

	def generateConcMap(self):
		if self.behaviour == "stack":
			self.cullMarkerList()
		elif self.behaviour == "unique":
			self.cullMarkerDict()

		conc_map = np.full(grid_size,0,dtype=np.float64)
		for marker in self.tracker:
			if self.behaviour == "unique":
				marker = self.tracker[marker]
			
			marker_precalc = self.precalc[tuple(marker["position"])]
			deposited = marker["deposited"]
			strength = marker["strength"] * (1-((t-deposited)/self.duration))

			conc_map = conc_map + (strength * marker_precalc)

		conc_map = np.where(conc_map<self.min_strength,0,conc_map)
		self.conc_map = conc_map

	def generatePrecalc(self):
		"""
		the idea behind this is to speed up the calculation of the heaviest part of the system, generating the marker concentration maps.
		for a given marker and cell, 1/(dist**falloff) is a constant. so we can simply precalculate this before hand and refer back to it later.

		for an (n x m) grid, this will produce a dictionary with n*m entries, where each entry is a (n x m) grid. or n^2 * m^2 unique values.
		
		however, most of our speed advantage comes from the fact that we can then calculate future concentration maps as a whole using numpy's array arithmetic,
		possibly even GPU acceleration later down the line.
		"""
		print("Starting marker precalc...")
		start = time.time()
		precalc = {}
		# this iterates over every single cell in the grid, as a position that a marker could be in
		for i_y,i_x in np.ndindex(grid_size):
			key = (i_x,i_y) # this will be used as a lookup key to access it later
			ndarray = np.full(grid_size,0,dtype=np.float64)
			for j_y,j_x in np.ndindex(grid_size):
				ndarray[j_y,j_x] = (distance((j_x,j_y),(i_x,i_y),norm=2)+1) ** (-self.falloff)

			precalc[key] = ndarray

		end = time.time()
		print("Marker precalc took {:.3f} seconds".format(end-start))
		print("Precalc dict has {} entries".format(len(precalc)))
		return precalc



			
	

Regarding the decision making & movement process of the walker; we shall follow some simple rules:

1. The walker will consider the 3 cells infront of it; and weight them based on the concentration
    1. Invalid moves (such as leaving the grid area, or into a wall) are given a zero weight
    2. If all three moves are invalid, the walker will turn all the way around and wait until next turn to move.

2. For each weight, we apply an exponent (`wander_exponent`) which controls how random or deterministic the walker is when picking from valid moves
    1. This is shown in an interactible format here: https://www.desmos.com/calculator/dqls0azfzl
    2. An exponent value of 0 means that all options have equal chance, regardless of concentration
    3. Higher values increase the influence of concentration where an infinite value would mean fully deterministic decision making
    4. Negative values make it run away from the marker

3. Once a decision has been made, the walker will store its new position and orientation; and all walkers will commit to their new state at the same time.
    1. This isnt very useful now, but is useful once we start introducing the action of depositing markers here during this process.



In [134]:
class FoodTracker:
    """
    This object acts as a tracker and manages all of the food sources in the model.
    each food pile is stored as a dictionary with two entries, position and amount.

    This tracker object is responsible for distributing food to nearby walkers from the individual piles;
    as well as culling any food piles that are now empty, and reporting statistics.
    """
    pickup_range = 1.5
    def __init__(self):
        self.tracker = []

    def addFoodPile(self,pos,amount=random.randint(50,100)):
        food = {
            "pos":list(pos),
            "amount":amount
        }
        self.tracker.append(food)

    def distribute(self):
        # this function is responsible for distributing food at each tick, and culling zero-amount piles.
        new_tracker = []
        for food in self.tracker:
            food_pos = food["pos"]
            food_amount = food["amount"]
            for walker in walker_list:
                if food_amount <= 0:
                    # if theres no food left in this pile, then we can stop considering it.
                    break

                dist = distance(food_pos,walker.pos)
                if dist <= self.pickup_range:
                    # we want to reset the step count regardless of if the walker actually picks up any more food or not.
                    # also allow it to immediately drop a marker
                    walker.step_count = 0
                    walker.deposition_tick = 0

                    if walker.food_count < walker.max_food:
                        # either hand out the maximum amount of food the walker can carry, or give them all thats left of the food pile
                        transaction = min(walker.max_food - walker.food_count,food_amount)
                        food_amount -= transaction
                        walker.food_count += transaction
            if food_amount > 0:
                new_tracker.append({"pos":food_pos,"amount":food_amount})

        self.tracker = new_tracker

In [135]:
class Nest:
    pickup_range = 1.5
    def __init__(self,pos):
        self.pos = pos
        self.amount = 0

    def collect(self):
        # this follows near-identical logic to the one used in the food tracker.
        for walker in walker_list:
            dist = distance(walker.pos,self.pos)
            if dist <= self.pickup_range:
                walker.step_count = 0
                walker.deposition_tick = 0

                if walker.food_count > 0:
                    transaction = walker.food_count
                    self.amount += transaction
                    walker.food_count -= transaction

In [136]:
class Walker:
    job_dict = {"scout":{"sens":"B","dep":"A"},
                "carrier":{"sens":"A","dep":"B"}}
    deposition_rate = 1
    max_food = 1
    wander_exponent = 10 # how likely it is to choose randomly versus picking largest concentration.
    

    direction_lookup = [(-1,-1),
                        (0,-1),
                        (1,-1),
                        (1,0),
                        (1,1),
                        (0,1),
                        (-1,1),
                        (-1,0)]
    

    def __init__(self, pos, dir, job="scout"):
        self.pos = pos # tuple of 2 ints
        self.dir = dir # int between 0 and 7
        self.job = job # some key from job_dict

        self.deposition_tick = random.randint(0,self.deposition_rate-1) # start at a random point in the deposition cycle
        self.step_count = 0
        self.food_count = 0

    def calculateMove(self):
        self.evaluateRole()

        left_dir = (self.dir-1) % 8
        right_dir = (self.dir+1) % 8

        left_pos = (self.pos[0]+self.direction_lookup[left_dir][0],self.pos[1]+self.direction_lookup[left_dir][1])
        front_pos = (self.pos[0]+self.direction_lookup[self.dir][0],self.pos[1]+self.direction_lookup[self.dir][1])
        right_pos = (self.pos[0]+self.direction_lookup[right_dir][0],self.pos[1]+self.direction_lookup[right_dir][1])

        """
        print("I am in position:",self.pos)
        print("the left is",left_pos)
        print("the front is",front_pos)
        print("the right is",right_pos)
        """
        conc_map = marker_lookup[self.job_dict[self.job]["sens"]].conc_map

        # these are unnormalised probabilities, divide by total to get actual probabilities
        biases = []
        choices = []
        #print("biases of: \nL:{:.3f}\nF:{:.3f}\nR:{:.3f}".format(left_bias,front_bias,right_bias))
        if checkIfPosValid(left_pos):
            choices.append((left_pos,left_dir))
            left_bias = conc_map[left_pos[1],left_pos[0]]**self.wander_exponent
            biases.append(left_bias)

        if checkIfPosValid(front_pos):
            choices.append((front_pos,self.dir))
            front_bias = conc_map[front_pos[1],front_pos[0]]**self.wander_exponent
            biases.append(front_bias)

        if checkIfPosValid(right_pos):
            choices.append((right_pos,right_dir))
            right_bias = conc_map[right_pos[1],right_pos[0]]**self.wander_exponent
            biases.append(right_bias)
        
        if len(choices) == 0:
            # this occurs when it rams into a wall, and all 3 of its choices are invalid
            # we will handle this by doing a 180, but not moving.
            #print("no valid moves, turning around...")
            self.new_pos = self.pos
            self.new_dir = (self.dir + 4) % 8
        else:
            self.step_count += 1
            #print("choosing from following choices: {}".format(choices))
            if sum(biases) == 0:
                #only occurs when there are no markers to follow
                #print("all zero bias, updating to 1")
                for i in range(0,len(biases)):
                    biases[i]=1
                    
            choice = random.choices(choices,weights=biases)[0]
            self.new_pos = choice[0]
            #print("moving to position {}".format(self.new_pos))
            self.new_dir = choice[1]

            if self.deposition_tick == 0:
                strength = 2**(-self.step_count*0.1)
                marker_lookup[self.job_dict[self.job]["dep"]].addMarker(self.pos,strength)

            self.deposition_tick += 1
            self.deposition_tick %= self.deposition_rate


    def commitMove(self):
        self.pos = self.new_pos
        self.dir = self.new_dir

        del self.new_pos
        del self.new_dir

    def evaluateRole(self):
        # switch to carrier role if it has any amount of food on it
        if self.food_count > 0:
            self.job = "carrier"
        else:
            self.job = "scout"



In [137]:
grid_size = (20,20) # this is (y,x) format cos numpy
t=0
# (int(grid_size[1]/2),int(grid_size[0]/2))
nest = Nest(rand_pos())

marker_lookup = {"A":MarkerTracker((255,0,0),behaviour="stack"),
                 "B":MarkerTracker((0,255,0),behaviour="stack")}

walker_list = []
for i in range(0,50):
    walker_list.append(Walker(nest.pos,random.randint(0,7)))

food_tracker = FoodTracker()
#food_tracker.addFoodPile((18,18),25)
for i in range(0,5):
    food_tracker.addFoodPile(rand_pos(),500)



Starting marker precalc...
Marker precalc took 0.243 seconds
Precalc dict has 400 entries
Starting marker precalc...
Marker precalc took 0.246 seconds
Precalc dict has 400 entries


In [138]:
cell_scale = 16
bottom_padding_size = 0
canvas_x_size = grid_size[1] * cell_scale
canvas_y_size = grid_size[0] * cell_scale + bottom_padding_size

display_markers = ["A","B"]

def arrow_cache_builder():
    # this is used to precalculate all 8 of the directional arrows that would have to be drawn
    arrow_cache = []
    angles = [-45,0,45,90,135,180,225,270]
    arrow_scale = 0.6
    original = np.asarray([[0,0],[1,-1]]) * (cell_scale/2) * arrow_scale
    for i,deg in enumerate(angles):
        rad = deg * (np.pi/180)
        c = np.cos(rad)
        s = np.sin(rad)
        rot_mat = np.asarray([[c,-s],[s,c]])

        transformed = np.matmul(rot_mat,original) + np.full((2,2),cell_scale/2)
        end = (transformed[0,0],transformed[1,0])
        tip = (transformed[0,1],transformed[1,1])
        arrow_cache += [{"tip":tip,"end":end}]

    return arrow_cache
    

def interp(vec1, vec2,t,clamp=False):
    if clamp:
        t = max(0,min(1,t))
    return vec1 * (1-t) + vec2 * t

def col_interp(col1,col2,t,clamp=False):
    vec1 = np.asarray(col1)
    vec2 = np.asarray(col2)
    res = interp(vec1,vec2,t,clamp)

    res = np.clip(res,0,255)
    col = (int(res[0]),int(res[1]),int(res[2]))
    return col

def blend_colour(col_list):
    # this is used to blend the colours of the different markers when their fields intersect
    a = 2
    total = [0,0,0]
    for col in col_list:
        total[0] += col[0]**a
        total[1] += col[1]**a
        total[2] += col[2]**a

    total = (int((total[0]/len(col_list))**(1/a)),int((total[1]/len(col_list))**(1/a)),int((total[2]/len(col_list))**(1/a)))
    return total

def drawCells(canvas):
    # this part of the drawing procedure plots all of the cells to the canvas, as well as the concentration/colour of the markers
    base_cell_bg_colour = (128,128,128)

    for i_y,i_x in np.ndindex(grid_size):
        cell_tl = (int(i_x*cell_scale),int(i_y*cell_scale))
        cell_br = (cell_tl[0] + cell_scale-1,cell_tl[1] + cell_scale-1)

        col_list = []
        if(len(display_markers) != 0):
            for label in display_markers:
                #iterate through the labels of all the markers we're displaying
                marker_tracker = marker_lookup[label]

                # this is temporary; it needs to generate a colour based on the concentration and base colours of all of the markers we're trying to display
                conc=marker_tracker.conc_map[i_y,i_x]
                temp_col = col_interp(base_cell_bg_colour,marker_tracker.colour,conc,False) # a 'False' flag here means that it will 'overdrive' the colours for values greater than 1
                col_list += [temp_col]
            
            cell_bg_colour = blend_colour(col_list)
        else:
            cell_bg_colour = base_cell_bg_colour
        canvas = cv2.rectangle(canvas, cell_tl, cell_br, cell_bg_colour, -1)
        cell_edge_colour = col_interp((0,0,0),cell_bg_colour,0.6,True)
        canvas = cv2.rectangle(canvas, cell_tl, cell_br, cell_edge_colour, thickness=int(cell_scale * 0.05))

    return canvas

def drawWalkers(canvas):
    # pretty self explanatory, it draws all the walkers
    colour_dict = {"scout":(0,0,0),
                   "carrier":(0,0,200)}
    for walker in walker_list:
        pos = walker.pos
        dir = walker.dir
        colour = colour_dict[walker.job]

        temp = arrow_cache[dir]
        tip = temp["tip"]
        end = temp["end"]

        tip_x = int(pos[0]*cell_scale + tip[0])
        tip_y = int(pos[1]*cell_scale + tip[1])
        end_x = int(pos[0]*cell_scale + end[0])
        end_y = int(pos[1]*cell_scale + end[1])

        canvas = cv2.arrowedLine(canvas, (end_x,end_y), (tip_x,tip_y), colour, thickness=int(np.ceil(cell_scale * 0.05)), tipLength = 0.2) 
    return canvas

arrow_cache = arrow_cache_builder()

def drawFood(canvas):
    colour = (0,255,255)
    for food in food_tracker.tracker:
        pos = food["pos"]
        draw_pos = (int((pos[0]+0.5) * cell_scale),int((pos[1]+0.5) * cell_scale))
        canvas = cv2.circle(canvas,draw_pos,int(cell_scale*0.8/2),colour,-1)

    return canvas

def drawNest(canvas):
    colour = (19,69,139)
    pos = nest.pos

    draw_pos = (int((pos[0]+0.5) * cell_scale),int((pos[1]+0.5) * cell_scale))
    nest_tl = (int(draw_pos[0]-cell_scale*0.6/2),int(draw_pos[1]-cell_scale*0.6/2))
    nest_br = (int(draw_pos[0]+cell_scale*0.6/2),int(draw_pos[1]+cell_scale*0.6/2))
    canvas = cv2.rectangle(canvas, nest_tl, nest_br, colour, thickness=-1)
    return canvas
    



def draw():
    canvas = np.full((int(canvas_y_size),int(canvas_x_size),3),150)
    canvas = drawCells(canvas)
    canvas = drawFood(canvas)
    canvas = drawNest(canvas)
    canvas = drawWalkers(canvas)
    return canvas

canvas = draw()

In [139]:
import ipywidgets.widgets as widgets
from ipywidgets import HBox
from IPython.display import display

def bgr8_to_jpeg(value):
    return bytes(cv2.imencode('.jpg',value)[1])

def text_to_html(value):
    style = """
    <style>
    div {
        min-width: 320px;
    }
    p {
        white-space: pre;
        line-height : 14pt;
    }
    </style>
    """
    value = value.replace("\n","<br>")
    return style+"<div><p>{}</p></div>".format(value)

text_widget = widgets.HTML(value="stats readout here")
text_widget2 = widgets.HTML(value="timing readout here")
image_widget = widgets.Image(format='jpeg',width=640)
image_widget.value = bgr8_to_jpeg(canvas)
side_by_side = widgets.HBox([image_widget,text_widget,text_widget2])
display(side_by_side)

#widgets.HTML(value= '<style>p{word-wrap: break-word}</style> <p>'+ [variable containing long text goes here] +' </p>')

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

In [140]:
canvas= draw()
image_widget.value = bgr8_to_jpeg(canvas)


update_rate = 0
c = 0

#print(walker_list)
startup_time = round(time.time()-begin_startup,3)
time.sleep(2)
while True:
    timing_dict = {}
    start = time.time()
    t+=1
    
    for marker_tracker in marker_lookup.values():
        marker_tracker.generateConcMap()
    
    timing_dict["markers"] = round(time.time()-start,3)
    start = time.time()

    food_tracker.distribute()
    timing_dict["food"] = round(time.time()-start,3)
    start = time.time()

    nest.collect()

    timing_dict["nest"] = round(time.time()-start,3)
    start = time.time()
    
    for walker in walker_list:
        walker.calculateMove()

    timing_dict["walker_calculate"] = round(time.time()-start,3)
    start = time.time()

    for walker in walker_list:
        walker.commitMove()

    timing_dict["walker_commit"] = round(time.time()-start,3)
    start = time.time()

    canvas= draw()
    #cv2.imwrite("images/{}.jpg".format(t),canvas)
    image_widget.value = bgr8_to_jpeg(canvas)

    timing_dict["canvas_draw"] = round(time.time()-start,3)
    start = time.time()
    
    statistics_dict = statistics()
    timing_dict["statistics"] = round(time.time()-start,3)
    start = time.time()

    timing_dict["total"] = round(sum(list(timing_dict.values())),3)

    timing_dict = {"startup":startup_time,"tick":timing_dict}
    text_widget.value = text_to_html(json.dumps(statistics_dict,indent=8))
    text_widget2.value = text_to_html(json.dumps(timing_dict,indent=8))

    if(timing_dict["tick"]["total"]<update_rate):
        time.sleep(update_rate-timing_dict["tick"]["total"])

KeyboardInterrupt: 