In [7]:
from mesa import Model
from mesa import Agent
from mesa.space import MultiGrid
from mesa.time import RandomActivation
from mesa.datacollection import DataCollector
import random
import numpy as np
import pickle
from IPython.display import clear_output

from a_star_pathfinding import a_star
from vision import lines
from vision import transform

In [8]:
# make seperate function for impostor
def imposter_strategy(pos, strategy, tasks, kill):
    
    if strategy == 'aggressive':
        goal = [random.choice(tasks).pos]
    
    if strategy == 'passive':
        if kill:
            goal = [random.choice(tasks).pos]
        else:
            goal = []
            
    return goal

In [9]:
def euclidean(pos_a, pos_b):
    return ((pos_a[0] - pos_b[0])**2 + (pos_a[1] - pos_b[1])**2)**.5

In [10]:
class AmongUs(Model):
    def __init__(self, map_name, n_crew, n_impo):
        super().__init__()

        # generate grid
        self.height = 138
        self.width = 242
        self.grid = MultiGrid(self.width, self.height, torus=True)

        # generate map
        self.generate_map(map_name)
        
        # generate tasks
        self.generate_tasks(map_name)
        
        # load vision dict for imposters
        self.vision_dict = np.load('vision_dict.pkl', allow_pickle=True)
        
        # create schedulers for agents
        self.schedule_Crewmate = RandomActivation(self)
        self.schedule_Imposter = RandomActivation(self)
        
        # initialize agents
        self.n_crew = n_crew
        self.n_impo = n_impo
        self.n_run = 0
        
    def generate_map(self, map_name):
        hard_walls = np.load(f'{map_name}/hardwalls.npy')
        vents = np.load(f'{map_name}/vents.npy', allow_pickle=True)
        obstructions = np.load(f'{map_name}/obstructions.npy', allow_pickle=True)
        
        for coord in hard_walls:
            self.new_agent(Wall, tuple(coord))

        self.vents_dict = {}
        self.vents = []

        for connection in vents:
            connection = connection.tolist()
            self.vents_dict[tuple(connection[0])] = tuple(connection[1])
            self.vents_dict[tuple(connection[1])] = tuple(connection[0])
            
            if len(connection) == 3:
                self.vents_dict[tuple(connection[0])] = tuple(connection[2])
                self.vents_dict[tuple(connection[1])] = tuple(connection[2])
                self.vents_dict[tuple(connection[2])] = tuple(connection[0])
                self.vents_dict[tuple(connection[2])] = tuple(connection[1])
                
        for coord in self.vents_dict:
            self.new_agent(Vent, tuple(coord))
        
        for coord in obstructions:
            self.new_agent(Obstruction, tuple(coord))
    
    def generate_tasks(self, map_name):
        # load the image + coordinates of the short tasks and common tasks
        short_tasks = np.load(f'{map_name}/shorttasks.npy', allow_pickle=True)
        common_tasks = np.load(f'{map_name}/commontasks.npy', allow_pickle=True)
        
        self.short_tasks = short_tasks
        self.tasks = []
        
        self.common_tasks = common_tasks
        self.c_tasks = []

        # load all the short_tasks into the map (all possible task locations), first create tuples
        for i in range(len(short_tasks)):
            if i == 3:
                self.new_agent(ShortTask, tuple(short_tasks[i][0][0]))
                for parttwo in range(8):
                    self.new_agent(ShortTask, tuple(short_tasks[i][1][parttwo]))
            elif i == 6:
                for partone in range(5):
                    self.new_agent(ShortTask, tuple(short_tasks[i][0][partone]))
                self.new_agent(ShortTask, tuple(short_tasks[i][1][0]))
        
            else:
                self.new_agent(ShortTask, tuple(short_tasks[i][0]))
        
        # Do the same for common tasks
        self.new_agent(CommonTask, tuple(common_tasks[0][0]))
        for i in range(len(common_tasks[1])):
            self.new_agent(CommonTask, tuple(common_tasks[1][i][0]))

    def respawn_players(self):
        for crewmate in self.schedule_Crewmate.agents:
            self.remove_agent(crewmate)
        for imposter in self.schedule_Imposter.agents:
            self.remove_agent(imposter)

        # generate crewmates
        starting_positions = [(125, 100), (135, 100), (110, 115), (135, 110)]
        
        for _ in range(self.n_crew):
            self.new_agent(Crewmate, (125, 100))

        # generate imposters
        for _ in range(self.n_impo):
            self.new_agent(Imposter, (125, 100))

    def new_agent(self, agent_type, pos):
        '''
        Method that creates a new agent, and adds it to the correct scheduler.
        '''
        agent = agent_type(self.next_id(), self, pos)

        self.grid.place_agent(agent, pos)

        if agent_type == Crewmate or agent_type == Imposter:
            getattr(self, f'schedule_{agent_type.__name__}').add(agent)

        if agent_type == Vent:
            self.vents.append(agent)
        
        if agent_type == ShortTask:
            self.tasks.append(agent)
        
        #NIEUW KAN NOG ERROR GEVEN!
        if agent_type == CommonTask:
            self.c_tasks.append(agent)
            
    def remove_agent(self, agent):
        '''
        Method that removes an agent from the grid and the correct scheduler.
        '''
        self.grid.remove_agent(agent)
        getattr(self, f'schedule_{type(agent).__name__}').remove(agent)

    def step(self):
        '''
        Method that steps every agent. 
        
        Prevents applying step on new agents by creating a local list.
        '''

        self.schedule_Crewmate.step()
        self.schedule_Imposter.step()

        # self.datacollector.collect(self)
        # update sus matrix every step
        # self.sus_matrix += 0.005
        
    def play_match(self, iterations=0, match_num=0):
        '''
        Method that runs a single match.
        '''
        # self.trust_matrix = np.load(f'social_matrixes/trust_{match_num}.npy', allow_pickle=True)
        self.sus_matrix = np.full((self.n_crew + self.n_impo), .5)
        self.respawn_players()
        i = 1
        while len(self.schedule_Crewmate.agents) > self.n_impo and len(self.schedule_Imposter.agents) > 0:
            print(f'Match: {match_num}/{iterations}')
            print(f' Step: {i}')
            clear_output(wait=True)
            i += 1
            self.step()
        print(f'Ended at iteration {i}')

    def run(self, iterations):
        '''
        Method that runs multiple matches.
        '''
        for match_num in range(iterations):
            self.play_match(iterations, match_num+1)
            clear_output(wait=True)

In [11]:
class Crewmate(Agent):
    def __init__(self, unique_id, model, init_pos):
        super().__init__(unique_id, model)
        self.pos = init_pos
        self.path = []
        self.goallist = []
        self.injob = 0

        # self.create_dict()
        
    def make_tasks(self):
        short_tasks = self.model.short_tasks
        common_tasks = self.model.common_tasks
        
        # AANPASSING
        # set 4 random tasks for Crewmate to execute (can still be same task multiple times)
        self.goallist = random.sample(list(short_tasks), 4)
        
        # Import the common tasks NOG DOEN VOOR FIX WIRES
        self.goallist.append(common_tasks[0])
        
        # if it is a task with multiple locations/second part, replace by one of the coordinates from the first list 
        self.goallist = [[random.choice(self.goallist[index][0])] if len(x)>1 else x for index,x in enumerate(self.goallist)]
        
        # Turn list into list of tuples ### GEEFT SOMS EEN ERROR 
        self.goallist = [tuple(self.goallist[index][0]) for index,y in enumerate(self.goallist)]
        
    def find_path(self):
        if len(self.goallist) == 0:
            self.make_tasks()
        
        possiblepaths = []
        for t in self.goallist:
            possiblepaths.append(a_star(self.pos, t, self.model.grid)[0])
            
        self.path = self.path + min(possiblepaths)

    def detect_agents(self):
        agents_detected = []
        visible_area = self.model.vision_dict[self.pos]
        for pos in visible_area:
            agents_on_tile = self.model.grid.get_neighbors(pos=pos, moore=True, radius=0, include_center=True)
            for agent in agents_on_tile:
                if type(agent) == Crewmate or type(agent) == Imposter:
                    agents_detected.append(agent)
        return agents_detected   
    
    def die(self):
        self.model.new_agent(Dead_crewmate, self.pos)
        self.model.remove_agent(self)  

    def step(self):
        if self.path == []:
            if self.pos in self.goallist:
                self.goallist.remove(self.pos)
                self.injob = random.randint(5, 15)
                
            # if Task was task 3, add second part of task 3 to the task list
            if self.pos == tuple(self.model.short_tasks[3][0][0]):
                self.goallist.append(tuple(random.choice(self.model.short_tasks[3][1])))
            
            # if Task was task 6, add second part of task 6 to the task list
            elif self.pos in self.model.short_tasks[6][0]:
                self.goallist.append(tuple(self.model.short_tasks[6][1][0]))
            
            self.find_path()

        elif self.path != []:
            if self.injob > 0:
                self.injob -= 1
            else:
                self.model.grid.move_agent(self, self.path.pop()) 

        agents_detected = self.detect_agents()
        
        for agent in agents_detected:
            break

   
            
class Imposter(Agent):
    def __init__(self, unique_id, model, init_pos):
        super().__init__(unique_id, model)
        self.pos = init_pos
        self.path = []
        self.goals = [(85, 35), (140, 110), (125, 90)]
        self.vents_enabled = True
        self.tactic = 'passive'
        self.cooldown = 30

    def make_task(self, tactic, tasks):
        self.goals = imposter_strategy(self.pos, tactic, tasks, False)
    
    def find_path(self):
        if len(self.goals) == 0:
            self.make_task(self.tactic, self.model.tasks)
        
        else:
            goal = self.goals.pop()
            best_path, best_score = a_star(self.pos, goal, self.model.grid)

            if self.vents_enabled:
                for vent in self.model.vents:
                    vent_in = vent.pos_a
                    vent_out = vent.pos_b
                    guess_vent_path = euclidean(self.pos, vent_in) + euclidean(vent_out, goal)

                    if guess_vent_path < best_score:
                        vent_path_in, vent_score_in = a_star(self.pos, vent_in, self.model.grid)
                        vent_path_out, vent_score_out = a_star(vent_out, goal, self.model.grid)
                        vent_path = vent_path_out + vent_path_in
                        vent_score = vent_score_in + vent_score_out
    
                        if vent_score < best_score:
                            best_score = vent_score
                            best_path = vent_path

            self.path = best_path

    def detect_crewmates(self):
        crewmates_detected = []
        visible_area = self.model.vision_dict[self.pos]
        for pos in visible_area:
            agents_on_tile = self.model.grid.get_neighbors(pos=pos, moore=True, radius=0, include_center=True)
            for agent in agents_on_tile:
                if type(agent) == Crewmate:
                    crewmates_detected.append(agent)
        return crewmates_detected

    def step(self):
        if self.cooldown > 0:
            self.cooldown -= 1

        if self.path == []:
            self.find_path()

        crewmates_detected = self.detect_crewmates()

        for crewmate in crewmates_detected:
            if self.cooldown == 0 and euclidean(crewmate.pos, self.pos) <= 6:
                # kill a crewmate
                crewmate.die()
                self.cooldown = 30

        if self.path != []:
            self.model.grid.move_agent(self, self.path.pop())  
            
class Dead_crewmate(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos = pos    
    
class Wall(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos = pos

class Obstruction(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos = pos

class Vent(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos_a = pos
        self.pos_b = self.model.vents_dict[pos]

class ShortTask(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos = pos
        
#NIEUW KAN NOG ERROR GEVEN!
class CommonTask(Agent):
    def __init__(self, unique_id, model, pos):
        super().__init__(unique_id, model)
        self.pos = pos

In [12]:
amongUs = AmongUs('the_skeld', 3, 1)

amongUs.run(2)

Ended at iteration 986
