In [1]:
import pathlib
import sys
root_path = pathlib.Path().absolute().parent
sys.path.append(str(root_path))

from src.train import run_experiment, OptimizerType
from src.models.dqn import SpatialDQN, ModelType
from src.replay_memory import FastReplayBuffer
from src.env import FourRoomEnvWithTagging
from src.featurizers import FeaturizerType
from src.visualize import StateSequenceVisualizer

import torch

torch.set_printoptions(precision=3, sci_mode=False, linewidth=200)



In [2]:
BUF_SIZE = 3000
N_IMPOSTERS = 1
N_JOBS = 5
N_CREW = 4
SEQUENCE_SIZE = 2
env = FourRoomEnvWithTagging(n_imposters=N_IMPOSTERS, n_crew=N_CREW, n_jobs=N_JOBS, debug=False)


model_registry_path = root_path / 'model_registry'

model_registry_path.mkdir(exist_ok=True)

tests_path = model_registry_path / 'test'

tests_path.mkdir(exist_ok=True)

In [3]:
imposter_model_config = {
    "input_image_size": env.n_cols,
    "non_spatial_input_size": 20,
    "n_channels": [7, 5, 3],
    "strides": [1, 1],
    "paddings": [1, 1],
    "kernel_sizes": [3, 3],
    "rnn_layers": 3,
    "rnn_hidden_dim": 64,
    "rnn_dropout": 0.2,
    "mlp_hidden_layer_dims": [16, 16],
    "n_actions": env.n_imposter_actions,
    "pretrained_model_path": None
}

crew_model_config = {
    "input_image_size": env.n_cols,
    "non_spatial_input_size": 20,
    "n_channels": [7, 5, 3],
    "strides": [1, 1],
    "paddings": [1, 1],
    "kernel_sizes": [3, 3],
    "rnn_layers": 3,
    "rnn_hidden_dim": 64,
    "rnn_dropout": 0.2,
    "mlp_hidden_layer_dims": [16, 16],
    "n_actions": env.n_crew_actions,
    "pretrained_model_path": None
}

In [4]:
# run_experiment(
#     env=env, 
#     num_steps=1000,
#     imposter_model_args=imposter_model_config,
#     crew_model_args={'n_actions': env.n_crew_actions},
#     imposter_model_type=ModelType.SPATIAL_DQN,
#     crew_model_type=ModelType.RANDOM,
#     featurizer_type=FeaturizerType.GLOBAL,
#     sequence_length = 10,
#     replay_buffer_size=1_000,
#     replay_prepopulate_steps=1_000,
#     batch_size=4,
#     gamma=0.99,
#     scheduler_start_eps=1,
#     scheduler_end_eps=0.05,
#     scheduler_time_steps=100_000,
#     train_imposter=True,
#     train_crew=False,
#     experiment_save_path=tests_path,
#     optimizer_type = OptimizerType.ADAM,
#     learning_rate=0.001,
#     train_step_interval=5,
#     num_checkpoint_saves=2,
# )

In [5]:
# run_experiment(
#     env=env, 
#     num_steps=1000,
#     imposter_model_args={'n_actions': env.n_imposter_actions},
#     crew_model_args=crew_model_config,
#     crew_model_type=ModelType.SPATIAL_DQN,
#     imposter_model_type=ModelType.RANDOM,
#     featurizer_type=FeaturizerType.GLOBAL,
#     sequence_length = 3,
#     replay_buffer_size=3000,
#     replay_prepopulate_steps=1_000,
#     batch_size=4,
#     gamma=0.99,
#     scheduler_start_eps=1,
#     scheduler_end_eps=0.05,
#     scheduler_time_steps=100_000,
#     train_imposter=False,
#     train_crew=True,
#     experiment_save_path=tests_path,
#     optimizer_type = OptimizerType.ADAM,
#     learning_rate=0.001,
#     train_step_interval=5,
#     num_checkpoint_saves=2,
# )

In [11]:
from src.featurizers import GlobalFeaturizer
import pygame
from src.visualize import AmongUsVisualizer
from src.models.dqn import RandomEquiprobable

SEQ_LEN = 10

imposter_model = RandomEquiprobable(env.n_imposter_actions)
crew_model = RandomEquiprobable(env.n_crew_actions)


with AmongUsVisualizer(env) as visualizer:
    state, _ = visualizer.reset()
    featurizer = GlobalFeaturizer(env)
    replay_memory = FastReplayBuffer(1_000, 4, env.flattened_state_size, env.n_agents, env.n_imposters)
    replay_memory.add_start(env.flatten_state(state), env.imposter_idxs)


    stop_game = False
    done = False
    paused = False
    while not stop_game:
        for event in pygame.event.get():
            if event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
                paused = not paused
            if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE):
                stop_game = True
                break
        
        if not done and not paused:
            state = replay_memory.get_last_trajectory().states
            featurizer.fit(state)

            actions = []

            for agent_idx, (agent_spatial, agent_non_spatial) in enumerate(featurizer.generator()):
                if agent_idx in env.imposter_idxs:
                    action = imposter_model(agent_spatial, agent_non_spatial).argmax().item()
                else:
                    action = crew_model(agent_spatial, agent_non_spatial).argmax().item()
                
                actions.append(action)
            
            next_state, reward, done, truncated, _ = visualizer.step(actions)

            replay_memory.add(
                env.flatten_state(next_state), 
                actions,
                reward, 
                done, 
                env.imposter_idxs)
        
        pygame.time.wait(1000)
    visualizer.close()
