In [1]:
 import numpy as np
import time
import gym
import queue

import ray
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.execution.concurrency_ops import Concurrently, Enqueue, Dequeue
from ray.rllib.execution.metric_ops import StandardMetricsReporting
from ray.rllib.execution.replay_ops import StoreToReplayBuffer, Replay
from ray.rllib.execution.rollout_ops import ParallelRollouts, AsyncGradients, \
    ConcatBatches
from ray.rllib.execution.train_ops import TrainOneStep, ComputeGradients, \
    AverageGradients
from ray.rllib.execution.replay_buffer import LocalReplayBuffer, \
    ReplayActor
from ray.rllib.policy.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \
    MultiAgentBatch
from ray.util.iter import LocalIterator, from_range
from ray.util.iter_metrics import SharedMetrics 

In [2]:
from palice_replay_buffer import PADICELocalReplayBuffer

In [3]:
def make_workers(n):
    local = RolloutWorker(
        env_creator=lambda _: gym.make("CartPole-v1"),
        policy=PPOTFPolicy,
        rollout_fragment_length=100)
    remotes = [
        RolloutWorker.as_remote().remote(
            env_creator=lambda _: gym.make("CartPole-v1"),
            policy=PPOTFPolicy,
            rollout_fragment_length=100) for _ in range(n)
    ]
    workers = WorkerSet._from_existing(local, remotes)
    return workers

In [4]:
ray.init()
buf = PADICELocalReplayBuffer(
    num_shards=1,
    learning_starts=200,
    buffer_size=1000,
    replay_batch_size=100,
    prioritized_replay_alpha=0.6,
    prioritized_replay_beta=0.4,
    prioritized_replay_eps=0.0001)
assert buf.replay() is None

workers = make_workers(0)
a = ParallelRollouts(workers, mode="bulk_sync")
b = a.for_each(StoreToReplayBuffer(local_buffer=buf))

next(b)
assert buf.replay() is None  # learning hasn't started yet
next(b)
assert buf.replay().count == 100

replay_op = Replay(local_buffer=buf)
assert next(replay_op).count == 100


2020-07-12 11:59:25,846	INFO resource_spec.py:212 -- Starting Ray with 5.27 GiB memory available for workers and up to 2.66 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-07-12 11:59:26,335	INFO services.py:1165 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m
2020-07-12 11:59:28,256	INFO rollout_worker.py:941 -- Built policy map: {'default_policy': <ray.rllib.policy.tf_policy_template.PPOTFPolicy object at 0x7fec3845f810>}
2020-07-12 11:59:28,257	INFO rollout_worker.py:942 -- Built preprocessor map: {'default_policy': <ray.rllib.models.preprocessors.NoPreprocessor object at 0x7fec3845f510>}
2020-07-12 11:59:28,258	INFO rollout_worker.py:413 -- Built filter map: {'default_policy': <ray.rllib.utils.filter.NoFilter object at 0x7fec38457c90>}
2020-07-12 11:59:28,260	INFO rollout_worker.py:526 -- Generating sample batch of size 100
2020-07-12 11:59:28,261	INFO sampler.py:466 -- Raw obs from env: { 0: { 'agent0':

In [5]:
print("replay init buffer: ", buf.replay_init_buffers[DEFAULT_POLICY_ID].stats())
print("replay buffer: ", buf.replay_buffers[DEFAULT_POLICY_ID].stats())

replay init buffer:  {'added_count': 13, 'sampled_count': 200, 'est_size_bytes': 2717, 'num_entries': 13}
replay buffer:  {'added_count': 200, 'sampled_count': 200, 'est_size_bytes': 55400, 'num_entries': 200}
