In [185]:
import matplotlib.pyplot as plt
import gym
from gym import spaces
from IPython import display
import random
from collections import OrderedDict
%matplotlib inline

UP = 0; DOWN = 1; LEFT = 2; RIGHT = 3; WAIT = 4; NOT_FOUND = 5;
DIRECTIONS = [(0, -1), (0, 1), (-1, 0), (1, 0), (0, 0)]

class Space:
    def __init__(self, space_map):
        self.space_map = OrderedDict(space_map)
        self.space = spaces.MultiDiscrete([j for i in self.space_map.values() for j in i])
        self.space_discrete = spaces.Discrete([j for i in self.space_map.values() for j in i][0])
    def get_space(self):
        return self.space
    def get_space_discrete(self):
        return self.space_discrete
    def make_array(self, obs):
        return [j for i in OrderedDict(obs).values() for j in i]
    def make_dict(self, lst):
        d = OrderedDict()
        index = 0
        for k,v in self.space_map.items():
            d[k] = lst[index:index+len(v)]
            index += len(v)
        return d
    
class MemorySpace:
    def __init__(self, space_map):
        for k,v in list(space_map.items()):
            space_map[k + '~PREV'] = v
        self.space_map = OrderedDict(space_map)
        self.space = spaces.MultiDiscrete([j for i in self.space_map.values() for j in i])
        self.space_discrete = spaces.Discrete([j for i in self.space_map.values() for j in i][0])
    def get_space(self):
        return self.space
    def get_space_discrete(self):
        return self.space_discrete
    def make_array(self, obs, prev_obs):
        return [j for i in obs.values() for j in i] + [j for i in prev_obs.values() for j in i]
    def make_dict(self, lst):
        d = OrderedDict()
        index = 0
        for k,v in self.space_map.items():
            d[k] = lst[index:index+len(v)]
            index += len(v)
        return d

In [186]:
NUM_PLAYERS = 5
NUM_IMPOSTOR = 1
NUM_TASKS = 4
KILL_COOLDOWN = 3

class Player:
    def __init__(self, world, start_pos=(0,0), impostor=False):
        self.world = world
        self.map = world.map
        self.x, self.y = start_pos
        self.world[self.x][self.y].append(self)
        
        self.last_dir = 0
        self.alive = True
        self.impostor = impostor
        self.kill_cooldown = KILL_COOLDOWN // 2
        
        self.tasks = world.spawn_tasks(4)
        
    def repr(self):
        return 'P'
        
    def move(self, direction):
        if not self.can_move(direction):
            return
        self.map[self.x][self.y].remove(self)
        change = DIRECTIONS[direction]
        self.x += change[0]
        self.y += change[1]
        self.map[self.x][self.y].append(self)
        self.last_dir = direction
        if self.world.meetings and not self.impostor and self.alive and \
            any(len(self.world.body_map[pos[0]][pos[1]]) > 0 for pos in self.get_all_adjacent_pos()):
            self.world.meeting = True
        
    def can_move(self, direction):
        if direction == UP and self.y > 0:
            return 1
        if direction == DOWN and self.y < self.world.dims[1] - 1:
            return 1
        if direction == LEFT and self.x > 0:
            return 1
        if direction == RIGHT and self.x < self.world.dims[0] - 1:
            return 1
        return 0
    
    def move_random(self):
        self.move(random.randrange(4))
        
    def get_adjacent_pos(self, direction):
        return self.x + DIRECTIONS[direction][0], self.y + DIRECTIONS[direction][1]
    
    def get_all_adjacent_pos(self):
        pos = [(self.x + DIRECTIONS[d][0], self.y + DIRECTIONS[d][1]) for d in range(5) if self.can_move(d)]
        return pos
    
    def is_task_in_dir(self, direction):
        pos = self.get_adjacent_pos(direction)
        if not self.can_move(direction):
            return 0
        return self.tasks[pos[0]][pos[1]] > 0
    
    def is_player_in_dir(self, direction):
        pos = self.get_adjacent_pos(direction)
        if not self.can_move(direction):
            return 0
        if len(self.world[pos[0]][pos[1]]) == 1 and self.world[pos[0]][pos[1]][0] == self:
            return 0
        return len(self.world[pos[0]][pos[1]]) > 0

    def is_body_in_dir(self, direction):
        pos = self.get_adjacent_pos(direction)
        if not self.can_move(direction):
            return 0
        return len(self.world.body_map[pos[0]][pos[1]]) > 0
    
    def loc_player(self, player_num):
        player = self.world.players[player_num]
        for d in range(5):
            if not self.can_move(d):
                continue
            pos = self.get_adjacent_pos(d)
            if player in self.world[pos[0]][pos[1]]:
                return d
        return NOT_FOUND
        
    def do_task(self):
        if self.impostor or self.tasks[self.x][self.y] == 0:
            return 0
        self.tasks[self.x][self.y] -= 1
        self.world.required_tasks -= 1
        return 1
    
    def kill(self):
        if self.kill_cooldown > 0 or len(self.world[self.x][self.y]) <= 1:
            return 0
        if self.world[self.x][self.y][0].impostor:
            kill_player = self.world[self.x][self.y][1]
        else:
            kill_player = self.world[self.x][self.y][0]
        kill_player.alive = False
        kill_player.map[kill_player.x][kill_player.y].remove(kill_player)
        kill_player.map = self.world.dead_map
        kill_player.map[kill_player.x][kill_player.y].append(kill_player)
        self.world.body_map[kill_player.x][kill_player.y].append(kill_player)
        self.world.alive_crew -= 1
        self.kill_cooldown = KILL_COOLDOWN
        for d in range(5):
            if self.is_player_in_dir(d):
                self.world.caught = self
        return 1
    
    def take_random_action(self):
        if self.impostor:
            if len(self.world[self.x][self.y]) == 2 and all([self.is_player_in_dir(d) == 0 for d in range(4)]) \
            and self.world.players[0] not in self.world[self.x][self.y]:
                self.kill()
            self.move(random.randrange(4))
        else:
            if self.is_task_in_dir(WAIT):
                self.do_task()
            else:
                self.move(random.randrange(4))
    
    def get_visible_players(self):
        return self.world.map[self.x][self.y][:]

class World:
    def __init__(self, dims, meetings=True):
        self.map = [[[] for i in range(dims[1])] for i in range(dims[0])]
        self.dead_map = [[[] for i in range(dims[1])] for i in range(dims[0])]
        self.body_map = [[[] for i in range(dims[1])] for i in range(dims[0])]
        self.dims = dims
        self.required_tasks = 0
        self.meeting = False
        self.meetings = meetings
        self.voted = None
        self.caught = None
        self.alive_crew = NUM_PLAYERS - NUM_IMPOSTOR
        self.alive_impostor = NUM_IMPOSTOR
        self.required_tasks = NUM_TASKS * self.alive_crew
        self.players = [Player(self) for i in range(NUM_PLAYERS)]
        if NUM_IMPOSTOR > 0:
            self.players[random.randrange(1, NUM_PLAYERS)].impostor = True
    def spawn_tasks(self, x):
        task_map = [[0 for i in range(self.dims[1])] for i in range(self.dims[0])]
        for i in range(x):
            task_map[random.randrange(self.dims[0])][random.randrange(self.dims[1])] += 1
        return task_map
    def __getitem__(self, key):
        return self.map[key]
    def perform_meeting(self, votes):
        self.votes = [v if i >= len(self.players) or self.players[i].alive else 0 for i, v in enumerate(votes)]
        highest = max(self.votes)
        voted = [i for i, v in enumerate(self.votes) if v == highest]
        if len(voted) > 1:
            self.voted = -1
        else:
            self.voted = voted[0]
            if voted[0] < NUM_PLAYERS:
                self.players[self.voted].alive = False
                if self.players[self.voted].impostor:
                    self.alive_impostor -= 1
                else:
                    self.alive_crew -= 1
        for x in range(self.dims[0]):
            for y in range(self.dims[1]):
                self.map[x][y].clear()
                self.dead_map[x][y].clear()
                self.body_map[x][y].clear()
        for p in self.players:
            p.map[0][0].append(p)
            p.x = p.y = 0
            p.kill_cooldown = KILL_COOLDOWN
        self.meeting = False
    def check_win(self):
        if self.alive_crew <= self.alive_impostor:
            return -1
        if self.required_tasks <= 0 or self.caught or (self.alive_impostor == 0 and NUM_IMPOSTOR > 0):
            return 1
        return 0
    def render(self):
        if self.voted != None:
            print(self.votes)
            if self.voted == -1:
                print("No one was ejected. (Tie)")
            elif self.voted >= NUM_PLAYERS:
                print("No one was ejected. (Skipped)")
            elif self.players[self.voted].impostor:
                print("Player " + str(self.voted) + " was the Impostor.")
            else:
                print("Player " + str(self.voted) + " was not the Impostor.")
            self.voted = None
        else:
            #for x in range(self.dims[0]):
            #    for y in range(self.dims[1]):
            #        if len(self.map[x][y]) == 0 and len(self.body_map[x][y]) > 0:
            #            print('X', end='')
            #        else:
            #            print(len(self.map[x][y]), end='')
            #    print()
            if self.meeting and self.check_win() == 0:
                print("Body Reported!")
        if self.alive_impostor == 0 and NUM_IMPOSTOR > 0:
            print("Crewmates Win!")
        if self.alive_crew <= NUM_IMPOSTOR:
            print("Impostors Win!")
        elif self.caught:
            print("Caught! Crewmates Win!")
        elif self.required_tasks <= 0:
            print("All tasks complete! Crewmates Win!")
            

In [196]:
class TestEnvironmentC2(gym.Env): #Finding killer
    metadata = {'render.modes': ['human']}
    def __init__(self):
        super(TestEnvironmentC2, self).__init__()
        self.obs_space = MemorySpace({'dir_came_from':[4],
                            'can_move':[2 for i in range(4)],
                            'player_loc':[5 for i in range(NUM_PLAYERS)],
                            'is_alive':[2 for i in range(NUM_PLAYERS)],
                            #'is_body_in_dir':[2 for i in range(5)],
                               })
        self.observation_space = self.obs_space.get_space()
        self.act_space = Space({'move':[5],
                                'vote':[NUM_PLAYERS + 1]
                               })
        self.action_space = self.act_space.get_space()
        self.last_obs = None
    def reset(self):
        self.world = World((4, 3))
        self.players = self.world.players
        self.total_steps = 0
        return self._next_observation()
    def _next_observation(self, dup=True):
        obs = {'dir_came_from':[self.players[0].last_dir],
              'can_move':[self.players[0].can_move(d) for d in range(4)],
               'player_loc':[self.players[0].loc_player(i) for i in range(NUM_PLAYERS)],
               'is_alive':[int(self.players[i].alive) for i in range(NUM_PLAYERS)],
               #'is_body_in_dir':[self.players[0].is_body_in_dir(d) for d in range(5)],
              }
        last = self.last_obs if self.last_obs != None else obs
        self.last_obs = obs
        return self.obs_space.make_array(obs, last)
    def step(self, action):
        self.total_steps += 1
        obs = self._next_observation()
        reward = self._take_action(action)
        game_result = 10 * self.world.check_win()
        if game_result == 0 and self.total_steps >= 100:
            game_result = 0.1
        reward += game_result
        done = (game_result != 0)
        for p in self.players:
            p.kill_cooldown -= 1
        return obs, reward, done, {}
    def _take_action(self, action):
        action = self.act_space.make_dict(action)
        reward = 0
        if self.world.meeting:
            votes = [0 for p in range(len(self.players)+1)]
            #action['vote'][0] = random.randrange(len(votes))
            votes[action['vote'][0]] = 1
            self.world.perform_meeting(votes)
            if action['vote'][0] < NUM_PLAYERS and not self.players[action['vote'][0]].impostor:
                return -1
            return 0
        else:
            #action['move'][0] = random.randrange(5)
            self.players[0].move(action['move'][0])
            for p in self.players[1:]:
                p.take_random_action()
            return 0
    def render(self, mode='human', close=False):
        self.world.render()
        #print()

In [203]:
from stable_baselines.common.policies import MlpPolicy, MlpLstmPolicy
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2, DQN, deepq
import time

env = TestEnvironmentC2()
#env = DummyVecEnv([lambda: TestEnvironmentC()])
model = PPO2(MlpPolicy, env, policy_kwargs=dict(net_arch=[64, 64]))
#model = PPO2(MlpLstmPolicy, env, nminibatches=1)
#model = DQN(deepq.LnMlpPolicy, env)
start = time.time()
model.learn(total_timesteps=500000)
end = time.time()
print(end - start)

353.8353621959686


In [204]:
total = 0
NUM_TRIALS = 3000
for i in range(NUM_TRIALS):
    obs = env.reset()
    for i in range(1000):
        action, _states = model.predict(obs)
        obs, rewards, dones, info = env.step(action)
        #env.render()
        total += rewards
        if dones:
            break
    #print("Total steps: ", env.total_steps)
    #total += env.total_steps
env.close()
print("Avg: ", total / NUM_TRIALS)

Avg:  -4.180499999999985


In [None]:
Crewmate Do Tasks

Random agent: -7.68, -8.71, -8.87
PPO (64, 64) Learner: 9.87, 10.1, 7.93
PPO (32, 64, 32) Learner: 7.21, 7.12, -1.56
PPO (32, 32, 32, 32, 32) Learner: 5.18, 5.52, 4.243
PPO (64, 64, 32) Learner: 4.45, 5.29, 6.61
PPO (64, 64, 64) Learner: 10.33, 10.06, 10.37
PPO (128, 128) Learner: 10.73, 11.00, 10.57
DQN (64, 64) Learner: -17.64, -17.81
DQN With Normalization: -18.87, -17.47

PPO (64, 64) Learner 5x: 11.33, 11.31
PPO (64, 64, 64) Learner 5x: 11.43, 11.36
PPO (128, 128) Learner 5x: 11.24, 11.24

Impostor Kill Players

Random agent: -5.82, -5.60, -5.06
PPO (64, 64) Learner: 3.21, 4.66, 4.15
PPO (64, 64) Learner 5x: 4.12, 4.28
PPO (64, 64, 64) Learner: 4.48, 4.25, 4.04
PPO (64, 64, 64) Learner 5x: 3.78, 4.43
PPO (128, 128) Learner: 3.98, 4.07, 3.41

Crewmate Vote Out Impostor

Random agent: -6.21, -6.13, -6.19
PPO (64, 64) Learner: -6.26, -6.56, -6.35
PPO (64, 64) Learner 5x: -4.89, -4.29
PPO (64, 64) Learner 5x with memory: -4.32, -4.62, -4.18
PPO (64, 64, 64) Learner: -5.03, -4.59, -5.79
PPO (64, 64, 64) Learner 5x: -4.68, -4.89
PPO (128, 128) Learner: -5.75, -6.59, -4.47


In [145]:
NUM_PLAYERS = 1
NUM_IMPOSTOR = 0

class TestEnvironmentC(gym.Env):
    metadata = {'render.modes': ['human']}
    def __init__(self):
        super(TestEnvironmentC, self).__init__()
        self.obs_space = Space({'dir_came_from':[4],
                            'can_move':[2 for i in range(4)],
                            'is_task':[2 for i in range(5)],
                               })
        self.observation_space = self.obs_space.get_space()
        self.act_space = Space({'move':[5]
                               })
        self.action_space = self.act_space.get_space_discrete()
    def reset(self):
        self.world = World((3, 3))
        self.players = self.world.players
        self.total_steps = 0
        return self.next_observation()
    def next_observation(self):
        obs = {'dir_came_from':[self.players[0].last_dir],
              'can_move':[self.players[0].can_move(d) for d in range(4)],
               'is_task':[self.players[0].is_task_in_dir(d) for d in range(5)]
              }
        return self.obs_space.make_array(obs)
    def step(self, action):
        self.total_steps += 1
        obs = self.next_observation()
        reward = self.take_action_discrete(action)
        game_result = 10 * self.world.check_win()
        if game_result == 0 and self.total_steps >= 100:
            game_result = -10
        reward += game_result
        done = (game_result != 0)
        return obs, reward - 0.1, done, {}
    def take_action(self, action):
        action = self.act_space.make_dict(action)
        reward = 0
        #action['move'][0] = random.randrange(5)
        if action['move'][0] == WAIT:
            reward += self.players[0].do_task()
        else:
            self.players[0].move(action['move'][0])
        return reward
    def take_action_discrete(self, action):
        reward = 0
        #action = random.randrange(5)
        if action == WAIT:
            reward += self.players[0].do_task()
        else:
            self.players[0].move(action)
        return reward
    def render(self, mode='human', close=False):
        self.world.render()
        print('Tasks left: ' + str(self.world.required_tasks))
        print()

In [96]:
class TestEnvironmentI(gym.Env):
    metadata = {'render.modes': ['human']}
    def __init__(self):
        super(TestEnvironmentI, self).__init__()
        self.obs_space = Space({'dir_came_from':[4],
                            'can_move':[2 for i in range(4)],
                            'is_player':[2 for i in range(5)],
                               })
        self.observation_space = self.obs_space.get_space()
        self.act_space = Space({'move':[4],
                                'kill':[2]
                               })
        self.action_space = self.act_space.get_space()
    def reset(self):
        self.world = World((4, 3), meetings=False)
        self.players = self.world.players
        self.total_steps = 0
        return self._next_observation()
    def _next_observation(self):
        obs = {'dir_came_from':[self.players[0].last_dir],
              'can_move':[self.players[0].can_move(d) for d in range(4)],
               'is_player':[self.players[0].is_player_in_dir(d) for d in range(5)]
              } #dead?
        return self.obs_space.make_array(obs)
    def step(self, action):
        self.total_steps += 1
        obs = self._next_observation()
        reward = self._take_action(action)
        game_result = -10 * self.world.check_win()
        if game_result == 0 and self.total_steps >= 100:
            game_result = -10
        reward += game_result
        done = (game_result != 0)
        for p in self.players:
            p.kill_cooldown -= 1
        return obs, reward - 0.1, done, {}
    def _take_action(self, action):
        action = self.act_space.make_dict(action)
        reward = 0
        #if random.randrange(2):
        if action['kill'][0]:
            reward += self.players[0].kill()
        #self.players[0].move(random.randrange(4))
        self.players[0].move(action['move'][0])
        for p in self.players[1:]:
            p.take_random_action()
        return reward
    def render(self, mode='human', close=False):
        self.world.render()
        #print()