Skip to content

Commit

Permalink
[rllib] Configure learner queue timeout (#5270)
Browse files Browse the repository at this point in the history
* configure learner queue timeout

* lint

* use config

* fix method args order, add unit test

* fix wrong param name
  • Loading branch information
antoine-galataud authored and ericl committed Jul 26, 2019
1 parent 6f682db commit 8276182
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 19 deletions.
6 changes: 6 additions & 0 deletions python/ray/rllib/agents/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
"replay_buffer_num_slots": 0,
# max queue size for train batches feeding into the learner
"learner_queue_size": 16,
# wait for train batches to be available in minibatch buffer queue
# this many seconds. This may need to be increased e.g. when training
# with a slow environment
"learner_queue_timeout": 300,
# level of queuing for sampling.
"max_sample_requests_in_flight_per_worker": 2,
# max number of workers to broadcast one set of weights to
Expand Down Expand Up @@ -126,6 +130,8 @@ def make_aggregators_and_optimizer(workers, config):
num_sgd_iter=config["num_sgd_iter"],
minibatch_buffer_size=config["minibatch_buffer_size"],
num_aggregation_workers=config["num_aggregation_workers"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
**config["optimizer"])

if aggregators:
Expand Down
1 change: 1 addition & 0 deletions python/ray/rllib/agents/ppo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"replay_proportion": 0.0,
"replay_buffer_num_slots": 100,
"learner_queue_size": 16,
"learner_queue_timeout": 300,
"max_sample_requests_in_flight_per_worker": 2,
"broadcast_interval": 1,
"grad_clip": 40.0,
Expand Down
7 changes: 5 additions & 2 deletions python/ray/rllib/optimizers/aso_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ class LearnerThread(threading.Thread):
"""

def __init__(self, local_worker, minibatch_buffer_size, num_sgd_iter,
learner_queue_size):
learner_queue_size, learner_queue_timeout):
threading.Thread.__init__(self)
self.learner_queue_size = WindowStat("size", 50)
self.local_worker = local_worker
self.inqueue = queue.Queue(maxsize=learner_queue_size)
self.outqueue = queue.Queue()
self.minibatch_buffer = MinibatchBuffer(
self.inqueue, minibatch_buffer_size, num_sgd_iter)
inqueue=self.inqueue,
size=minibatch_buffer_size,
timeout=learner_queue_timeout,
num_passes=num_sgd_iter)
self.queue_timer = TimerStat()
self.grad_timer = TimerStat()
self.load_timer = TimerStat()
Expand Down
6 changes: 4 additions & 2 deletions python/ray/rllib/optimizers/aso_minibatch_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ class MinibatchBuffer(object):
This is for use with AsyncSamplesOptimizer.
"""

def __init__(self, inqueue, size, num_passes):
def __init__(self, inqueue, size, timeout, num_passes):
"""Initialize a minibatch buffer.
Arguments:
inqueue: Queue to populate the internal ring buffer from.
size: Max number of data items to buffer.
timeout: Queue timeout
num_passes: Max num times each data item should be emitted.
"""
self.inqueue = inqueue
self.size = size
self.timeout = timeout
self.max_ttl = num_passes
self.cur_max_ttl = 1 # ramp up slowly to better mix the input data
self.buffers = [None] * size
Expand All @@ -35,7 +37,7 @@ def get(self):
released: True if the item is now removed from the ring buffer.
"""
if self.ttl[self.idx] <= 0:
self.buffers[self.idx] = self.inqueue.get(timeout=300.0)
self.buffers[self.idx] = self.inqueue.get(timeout=self.timeout)
self.ttl[self.idx] = self.cur_max_ttl
if self.cur_max_ttl < self.max_ttl:
self.cur_max_ttl += 1
Expand Down
7 changes: 5 additions & 2 deletions python/ray/rllib/optimizers/aso_multi_gpu_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ def __init__(self,
minibatch_buffer_size=1,
num_sgd_iter=1,
learner_queue_size=16,
learner_queue_timeout=300,
num_data_load_threads=16,
_fake_gpus=False):
LearnerThread.__init__(self, local_worker, minibatch_buffer_size,
num_sgd_iter, learner_queue_size)
num_sgd_iter, learner_queue_size,
learner_queue_timeout)
self.lr = lr
self.train_batch_size = train_batch_size
if not num_gpus:
Expand Down Expand Up @@ -99,7 +101,8 @@ def __init__(self,
self.loader_thread.start()

self.minibatch_buffer = MinibatchBuffer(
self.ready_optimizers, minibatch_buffer_size, num_sgd_iter)
self.ready_optimizers, minibatch_buffer_size,
learner_queue_timeout, num_sgd_iter)

@override(LearnerThread)
def step(self):
Expand Down
11 changes: 8 additions & 3 deletions python/ray/rllib/optimizers/async_samples_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self,
num_sgd_iter=1,
minibatch_buffer_size=1,
learner_queue_size=16,
learner_queue_timeout=300,
num_aggregation_workers=0,
_fake_gpus=False):
PolicyOptimizer.__init__(self, workers)
Expand Down Expand Up @@ -69,11 +70,15 @@ def __init__(self,
minibatch_buffer_size=minibatch_buffer_size,
num_sgd_iter=num_sgd_iter,
learner_queue_size=learner_queue_size,
learner_queue_timeout=learner_queue_timeout,
_fake_gpus=_fake_gpus)
else:
self.learner = LearnerThread(self.workers.local_worker(),
minibatch_buffer_size, num_sgd_iter,
learner_queue_size)
self.learner = LearnerThread(
self.workers.local_worker(),
minibatch_buffer_size=minibatch_buffer_size,
num_sgd_iter=num_sgd_iter,
learner_queue_size=learner_queue_size,
learner_queue_timeout=learner_queue_timeout)
self.learner.start()

# Stats
Expand Down
31 changes: 21 additions & 10 deletions python/ray/rllib/tests/test_optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,26 @@ def setUpClass(cls):
ray.init(num_cpus=8)

def testSimple(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(workers)
self._wait_for(optimizer, 1000, 1000)

def testMultiGPU(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(workers, num_gpus=1, _fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)

def testMultiGPUParallelLoad(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
workers, num_gpus=1, num_data_loader_buffers=1, _fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)

def testMultiplePasses(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
workers,
Expand All @@ -149,7 +149,7 @@ def testMultiplePasses(self):
self.assertGreater(optimizer.stats()["num_steps_trained"], 8000)

def testReplay(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
workers,
Expand All @@ -166,7 +166,7 @@ def testReplay(self):
self.assertLess(stats["num_steps_trained"], stats["num_steps_sampled"])

def testReplayAndMultiplePasses(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
workers,
Expand All @@ -187,23 +187,23 @@ def testReplayAndMultiplePasses(self):
self.assertLess(train_ratio, 0.4)

def testMultiTierAggregationBadConf(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
aggregators = TreeAggregator.precreate_aggregators(4)
optimizer = AsyncSamplesOptimizer(workers, num_aggregation_workers=4)
self.assertRaises(ValueError,
lambda: optimizer.aggregator.init(aggregators))

def testMultiTierAggregation(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
aggregators = TreeAggregator.precreate_aggregators(1)
optimizer = AsyncSamplesOptimizer(workers, num_aggregation_workers=1)
optimizer.aggregator.init(aggregators)
self._wait_for(optimizer, 1000, 1000)

def testRejectBadConfigs(self):
local, remotes = self._make_evs()
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
self.assertRaises(
ValueError, lambda: AsyncSamplesOptimizer(
Expand Down Expand Up @@ -231,7 +231,18 @@ def testRejectBadConfigs(self):
_fake_gpus=True)
self._wait_for(optimizer, 1000, 1000)

def _make_evs(self):
def testLearnerQueueTimeout(self):
local, remotes = self._make_envs()
workers = WorkerSet._from_existing(local, remotes)
optimizer = AsyncSamplesOptimizer(
workers,
sample_batch_size=1000,
train_batch_size=1000,
learner_queue_timeout=1)
self.assertRaises(AssertionError,
lambda: self._wait_for(optimizer, 1000, 1000))

def _make_envs(self):
def make_sess():
return tf.Session(config=tf.ConfigProto(device_count={"CPU": 2}))

Expand Down

0 comments on commit 8276182

Please sign in to comment.