In [1]:
import argparse
import datetime
import gym
import json
import mlflow
import os
import pickle
import torch
import tqdm
from algorithms.ddqn_pber import DDQNWithMPBER
from algorithms_with_statistics.ddqn_pber import DDQNWithMPBERAndERLogging
from algorithms_with_statistics.ddqn_per import DDQNWithMPERAndLogging
from dynaconf import Dynaconf
from func_timeout import FunctionTimedOut
from mlflow.exceptions import MlflowException
from os import path
from ray.rllib.algorithms.dqn import DQN
from ray.rllib.env.wrappers.atari_wrappers import wrap_deepmind
from ray.tune.logger import UnifiedLogger
from replay_buffer.mpber import MultiAgentPrioritizedBlockReplayBuffer
from utils import init_ray, check_path, logs_with_timeout, convert_np_arrays

In [2]:
torch.manual_seed(10)

<torch._C.Generator at 0x7f87df3ca090>

In [3]:
run_name = "examples_pber"
setting = "./settings/ddqn_per/BeamRider.yml"
log_path = "/home/seventheli/logging/"
checkpoint_path = "/home/seventheli/checkpoints/"
sub_buffer_size= 8
single_ray = 0
with_er_logging = 1

### Set Ray

In [4]:
if single_ray == 1:
    init_ray()
else:
    init_ray("./ray_config.yml")

2023-08-24 17:59:50,698	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


### Settings

In [5]:
settings = Dynaconf(envvar_prefix="DYNACONF", settings_files=setting)

### Set MLflow

In [6]:
mlflow.set_tracking_uri(settings.mlflow.url)
mlflow.set_experiment(experiment_name=settings.mlflow.experiment)
mlflow_client = mlflow.tracking.MlflowClient()

### Set hyper-parameters

In [7]:
hyper_parameters = settings.dqn.hyper_parameters.to_dict()
hyper_parameters["logger_config"] = {"type": UnifiedLogger, "logdir": checkpoint_path}
print("log path: %s \n check_path: %s" % (log_path, checkpoint_path))

log path: /home/seventheli/logging/ 
 check_path: /home/seventheli/checkpoints/


In [9]:
if sub_buffer_size == 0:
    # Set run object
    run_name = settings.dqn.env + "_PER_%s" % run_name
    mlflow_run = mlflow.start_run(run_name=run_name,
                                  tags={"mlflow.user": settings.mlflow.user})
    # Log parameters
    mlflow.log_params(hyper_parameters["replay_buffer_config"])
    mlflow.log_params(
        {key: hyper_parameters[key] for key in hyper_parameters.keys() if key not in ["replay_buffer_config"]})
    if with_er_logging:
        algorithm = DDQNWithMPERAndLogging(config=hyper_parameters, env=settings.dqn.env)
    else:
        algorithm = DQN(config=hyper_parameters, env=settings.dqn.env)
else:
    # Set run object
    run_name = settings.dqn.env + "_PBER_%s" % run_name
    mlflow_run = mlflow.start_run(run_name=run_name,
                                  tags={"mlflow.user": settings.mlflow.user})
    env_example = wrap_deepmind(gym.make(settings.dqn.env))
    # Log parameters
    mlflow.log_params({
        **settings.dqn.hyper_parameters.replay_buffer_config.to_dict(),
        "type": "MultiAgentPrioritizedBlockReplayBuffer",
        "sub_buffer_size": sub_buffer_size,
    })
    mlflow.log_params(
        {key: hyper_parameters[key] for key in hyper_parameters.keys() if key not in ["replay_buffer_config"]})
    # Set BER
    replay_buffer_config = {
        **settings.dqn.hyper_parameters.replay_buffer_config.to_dict(),
        "type": MultiAgentPrioritizedBlockReplayBuffer,
        "obs_space": env_example.observation_space,
        "action_space": env_example.action_space,
        "sub_buffer_size": sub_buffer_size,
        "worker_side_prioritization": False,
        "replay_sequence_length": 1,
    }
    hyper_parameters["replay_buffer_config"] = replay_buffer_config
    if with_er_logging:
        algorithm = DDQNWithMPBERAndERLogging(config=hyper_parameters, env=settings.dqn.env)
    else:
        algorithm = DDQNWithMPBER(config=hyper_parameters, env=settings.dqn.env)

  logger.warn(
A.L.E: Arcade Learning Environment (version 0.7.5+db37282)
[Powered by Stella]
2023-08-24 18:00:29,373	INFO multi_agent_prioritized_replay_buffer.py:115 -- PrioritizedMultiAgentReplayBuffer instantiated with underlying_buffer_config. This will overwrite the standard behaviour of the underlying PrioritizedReplayBuffer.
2023-08-24 18:00:29,390	INFO algorithm.py:501 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
[2m[36m(RolloutWorker pid=139638)[0m   logger.warn(
[2m[36m(RolloutWorker pid=139638)[0m A.L.E: Arcade Learning Environment (version 0.7.5+db37282)
[2m[36m(RolloutWorker pid=139638)[0m [Powered by Stella]


In [10]:
print(algorithm.config.to_dict()["replay_buffer_config"])

{'type': <class 'replay_buffer.mpber.MultiAgentPrioritizedBlockReplayBuffer'>, 'capacity': 1000000, 'prioritized_replay_beta': 1, 'prioritized_replay_alpha': 0.5, 'obs_space': Box(0, 255, (84, 84, 4), uint8), 'action_space': Discrete(9), 'sub_buffer_size': 8, 'worker_side_prioritization': False, 'replay_sequence_length': 1}


### Check path available

In [11]:
check_path(log_path)
log_path = path.join(log_path, run_name)
check_path(log_path)
check_path(checkpoint_path)
checkpoint_path = path.join(checkpoint_path, run_name)
check_path(checkpoint_path)

In [12]:
with open(os.path.join(checkpoint_path, "%s_config.pyl" % run_name), "wb") as f:
    _ = algorithm.config.to_dict()
    _.pop("multiagent")
    pickle.dump(_, f)
    
checkpoint_path = path.join(checkpoint_path, "results")
check_path(checkpoint_path)
mlflow.log_artifacts(checkpoint_path)

In [13]:
keys_to_extract = {"episode_reward_max", "episode_reward_min", "episode_reward_mean"}

In [14]:
for i in tqdm.tqdm(range(1, 10000)):
    try:
        result = algorithm.train()
        time_used = result["time_total_s"]
        # statistics
        sampler = result.get("sampler_results", None)
    except:
        continue
    try:
        if i >= 10 and i % settings.log.log == 0:
            learner_data = result["info"].copy()
            if learner_data["learner"].get("time_usage", None) is not None:
                logs_with_timeout(learner_data["learner"].get("time_usage"), step=result["episodes_total"])
            learner_data.pop("learner")
            logs_with_timeout(learner_data, step=result["episodes_total"])
            _save = {key: sampler[key] for key in keys_to_extract if key in sampler}
            logs_with_timeout(_save, step=result["episodes_total"])
        if i % settings.log.log == 0:
            algorithm.save_checkpoint(checkpoint_path)
    except FunctionTimedOut:
        tqdm.tqdm.write("logging failed")
    except MlflowException:
        tqdm.tqdm.write("logging failed")
    with open(path.join(log_path, str(i) + ".json"), "w") as f:
        result["config"] = None
        json.dump(convert_np_arrays(result), f)
    if time_used >= settings.log.max_time:
        break

100%|██████████| 9/9 [05:49<00:00, 38.83s/it]


In [15]:
mlflow.log_artifacts(log_path)
mlflow.log_artifacts(checkpoint_path)