Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib + Tune] Add placement group support to RLlib. #14289

Merged
merged 19 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions python/ray/tune/trainable.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import sys
from contextlib import redirect_stdout, redirect_stderr
from datetime import datetime

import copy
from datetime import datetime
import logging
import os
import ray.cloudpickle as pickle
import platform

from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils.util import Tee
import shutil
import sys
import tempfile
import time
from typing import Any, Dict, Union
import uuid

import ray
from ray.util.debug import log_once
import ray.cloudpickle as pickle
from ray.tune.resources import Resources
from ray.tune.result import (
DEFAULT_RESULTS_DIR, SHOULD_CHECKPOINT, TIME_THIS_ITER_S,
TIMESTEPS_THIS_ITER, DONE, TIMESTEPS_TOTAL, EPISODES_THIS_ITER,
EPISODES_TOTAL, TRAINING_ITERATION, RESULT_DUPLICATE, TRIAL_INFO,
STDOUT_FILE, STDERR_FILE)
from ray.tune.utils import UtilMonitor
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils.util import Tee
from ray.util.debug import log_once

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,7 +107,8 @@ def __init__(self, config=None, logger_creator=None):
self._monitor = UtilMonitor(start=log_sys_usage)

@classmethod
def default_resource_request(cls, config):
def default_resource_request(cls, config: Dict[str, Any]) -> \
Union[Resources, PlacementGroupFactory]:
"""Provides a static resource requirement for the given configuration.

This can be overridden by sub-classes to set the correct trial resource
Expand All @@ -122,8 +124,12 @@ def default_resource_request(cls, config):
extra_cpu=config["workers"],
extra_gpu=int(config["use_gpu"]) * config["workers"])

Args:
config[Dict[str, Any]]: The Trainable's config dict.

Returns:
Resources: A Resources object consumed by Tune for queueing.
Union[Resources, PlacementGroupFactory]: A Resources object or
PlacementGroupFactory consumed by Tune for queueing.
"""
return None

Expand Down
31 changes: 23 additions & 8 deletions python/ray/tune/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,33 @@ def __init__(self,
if trainable_cls:
default_resources = trainable_cls.default_resource_request(
self.config)
if default_resources:
if resources:
raise ValueError(
"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources))

# If Trainable returns resources, do not allow manual overrid via
# `resources_per_trial` by the user.
if default_resources and (resources or placement_group_factory):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a lot of special-casing here in this file diff -- can we please avoid this?

"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources))

# New way: Trainable returns a PlacementGroupFactory object.
if isinstance(default_resources, PlacementGroupFactory):
placement_group_factory = default_resources
resources = None
# Set placement group factory to None for backwards compatibility.
else:
placement_group_factory = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# If Trainable returns resources, do not allow manual overrid via
# `resources_per_trial` by the user.
if default_resources and (resources or placement_group_factory):
raise ValueError(
"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources))
# New way: Trainable returns a PlacementGroupFactory object.
if isinstance(default_resources, PlacementGroupFactory):
placement_group_factory = default_resources
resources = None
# Set placement group factory to None for backwards compatibility.
else:
placement_group_factory = None
# If Trainable returns resources, do not allow manual override via
# `resources_per_trial` by the user.
if default_resources:
if resources or placement_group_factory:
raise ValueError(
"Resources for {} have been automatically set to {} "
"by its `default_resource_request()` method. Please "
"clear the `resources_per_trial` option.".format(
trainable_cls, default_resources))
# New way: Trainable returns a PlacementGroupFactory object.
if isinstance(default_resources, PlacementGroupFactory):
placement_group_factory = default_resources
resources = None
# Set placement group factory to None for backwards compatibility.
else:
placement_group_factory = None
resources = default_resources

(and remove line under this).

We need to keep the indent here, and with the suggested change we keep the same logic as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

resources = default_resources

self.location = Location()

self.resources = resources or Resources(cpu=1, gpu=0)
self.placement_group_factory = placement_group_factory
if isinstance(resources, PlacementGroupFactory):
self.placement_group_factory = resources
else:
self.placement_group_factory = placement_group_factory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(resources, PlacementGroupFactory):
self.placement_group_factory = resources
else:
self.placement_group_factory = placement_group_factory
self.placement_group_factory = placement_group_factory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this block anymore (placement_group_factory is an argument of the constructor). With the changes above resources will never hold anything other than None or a Resources object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

self._setup_resources()

self.stopping_criterion = stopping_criterion or {}
Expand Down
7 changes: 7 additions & 0 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,13 @@ py_test(
srcs = ["tests/test_pettingzoo_env.py"]
)

py_test(
name = "tests/test_placement_groups",
tags = ["tests_dir", "tests_dir_P"],
size = "medium",
srcs = ["tests/test_placement_groups.py"]
)

py_test(
name = "tests/test_reproducibility",
tags = ["tests_dir", "tests_dir_R"],
Expand Down
65 changes: 51 additions & 14 deletions rllib/agents/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ray.rllib.execution.metric_ops import StandardMetricsReporting
from ray.rllib.utils.annotations import override
from ray.tune.trainable import Trainable
from ray.tune.resources import Resources
from ray.tune.utils.placement_groups import PlacementGroupFactory

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,8 +68,11 @@
"max_sample_requests_in_flight_per_worker": 2,
# max number of workers to broadcast one set of weights to
"broadcast_interval": 1,
# use intermediate actors for multi-level aggregation. This can make sense
# if ingesting >2GB/s of samples, or if the data requires decompression.
# Use n (`num_aggregation_workers`) extra Actors for multi-level
# aggregation of the data produced by the m RolloutWorkers
# (`num_workers`). Note that n should be much smaller than m.
# This can make sense if ingesting >2GB/s of samples, or if
# the data requires decompression.
"num_aggregation_workers": 0,

# Learning params.
Expand Down Expand Up @@ -101,17 +104,40 @@ class OverrideDefaultResourceRequest:
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
Trainer._validate_config(cf)
return Resources(
cpu=cf["num_cpus_for_driver"],
gpu=cf["num_gpus"],
memory=cf["memory"],
object_store_memory=cf["object_store_memory"],
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"] +
cf["num_aggregation_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"],
extra_memory=cf["memory_per_worker"] * cf["num_workers"],
extra_object_store_memory=cf["object_store_memory_per_worker"] *
cf["num_workers"])

eval_config = cf["evaluation_config"]

# Return PlacementGroupFactory containing all needed resources
# (already properly defined as device bundles).
return PlacementGroupFactory(
bundles=[{
# Driver + Aggregation Workers:
# Force to be on same node to maximize data bandwidth
# between aggregation workers and the learner (driver).
# Aggregation workers tree-aggregate experiences collected
# from RolloutWorkers (n rollout workers map to m
# aggregation workers, where m < n) and always use 1 CPU
# each.
"CPU": cf["num_cpus_for_driver"] +
cf["num_aggregation_workers"],
"GPU": cf["num_gpus"]
}] + [
{
# RolloutWorkers.
"CPU": cf["num_cpus_per_worker"],
"GPU": cf["num_gpus_per_worker"],
} for _ in range(cf["num_workers"])
] + ([
{
# Evaluation workers (+1 b/c of the additional local
# worker)
"CPU": eval_config.get("num_cpus_per_worker",
cf["num_cpus_per_worker"]),
"GPU": eval_config.get("num_gpus_per_worker",
cf["num_gpus_per_worker"]),
} for _ in range(cf["evaluation_num_workers"] + 1)
] if cf["evaluation_interval"] else []),
strategy=config.get("placement_strategy", "PACK"))


def make_learner_thread(local_worker, config):
Expand Down Expand Up @@ -172,6 +198,17 @@ def validate_config(config):
raise ValueError(
"Must use `batch_mode`=truncate_episodes if `vtrace` is True.")

# Check whether worker to aggregation-worker ratio makes sense.
if config["num_aggregation_workers"] > config["num_workers"]:
raise ValueError(
"`num_aggregation_workers` must be smaller than or equal "
"`num_workers`! Aggregation makes no sense otherwise.")
elif config["num_aggregation_workers"] > \
config["num_workers"] / 2:
logger.warning(
"`num_aggregation_workers` should be significantly smaller than"
"`num_workers`! Try setting it to 0.5*`num_workers` or less.")


# Update worker weights as they finish generating experiences.
class BroadcastUpdateLearnerWeights:
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/impala/tests/test_impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_impala_compilation(self):
local_cfg["model"]["use_lstm"] = True
local_cfg["model"]["lstm_use_prev_action"] = True
local_cfg["model"]["lstm_use_prev_reward"] = True
local_cfg["num_aggregation_workers"] = 2
local_cfg["num_aggregation_workers"] = 1
trainer = impala.ImpalaTrainer(config=local_cfg, env=env)
for i in range(num_iterations):
print(trainer.train())
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/sac/tests/test_sac.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def step(self, action):
class TestSAC(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
ray.init(local_mode=True)
ray.init()

@classmethod
def tearDownClass(cls) -> None:
Expand Down
94 changes: 68 additions & 26 deletions rllib/agents/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
from ray.rllib.utils.from_config import from_config
from ray.rllib.utils.typing import TrainerConfigDict, \
PartialTrainerConfigDict, EnvInfoDict, ResultDict, EnvType, PolicyID
from ray.tune.logger import Logger, UnifiedLogger
from ray.tune.registry import ENV_CREATOR, register_env, _global_registry
from ray.tune.trainable import Trainable
from ray.tune.trial import ExportFormat
from ray.tune.resources import Resources
from ray.tune.logger import Logger, UnifiedLogger
from ray.tune.result import DEFAULT_RESULTS_DIR
from ray.tune.trainable import Trainable
from ray.tune.trial import ExportFormat
from ray.tune.utils.placement_groups import PlacementGroupFactory

tf1, tf, tfv = try_import_tf()

Expand Down Expand Up @@ -92,10 +93,6 @@
"batch_mode": "truncate_episodes",

# === Settings for the Trainer process ===
# Number of GPUs to allocate to the trainer process. Note that not all
# algorithms can take advantage of trainer GPUs. This can be fractional
# (e.g., 0.3 GPUs).
"num_gpus": 0,
# Training batch size, if applicable. Should be >= rollout_fragment_length.
# Samples batches will be concatenated together to a batch of this size,
# which is then passed to SGD.
Expand Down Expand Up @@ -304,7 +301,11 @@
# The extra python environments need to set for worker processes.
"extra_python_environs_for_worker": {},

# === Advanced Resource Settings ===
# === Resource Settings ===
# Number of GPUs to allocate to the trainer process. Note that not all
# algorithms can take advantage of trainer GPUs. This can be fractional
# (e.g., 0.3 GPUs).
"num_gpus": 0,
# Number of CPUs to allocate per worker.
"num_cpus_per_worker": 1,
# Number of GPUs to allocate per worker. This can be fractional. This is
Expand All @@ -316,11 +317,21 @@
# Number of CPUs to allocate for the trainer. Note: this only takes effect
# when running in Tune. Otherwise, the trainer runs in the main program.
"num_cpus_for_driver": 1,
# Deprecated.
"memory": 0,
"object_store_memory": 0,
"memory_per_worker": 0,
"object_store_memory_per_worker": 0,
# The strategy for the placement group factory returned by
# `Trainer.default_resource_request()`. A PlacementGroup defines, which
# devices (resources) should always be co-located on the same node.
# For example, a Trainer with 2 rollout workers, running with
# num_gpus=1 will request a placement group with the bundles:
# [{"gpu": 1, "cpu": 1}, {"cpu": 1}, {"cpu": 1}], where the first bundle is
# for the driver and the other 2 bundles are for the two workers.
# These bundles can now be "placed" on the same or different
# nodes depending on the value of `placement_strategy`:
# "PACK": Packs bundles into as few nodes as possible.
# "SPREAD": Places bundles across distinct nodes as even as possible.
# "STRICT_PACK": Packs bundles into one node. The group is not allowed
# to span multiple nodes.
# "STRICT_SPREAD": Packs bundles across distinct nodes.
"placement_strategy": "PACK",

# === Offline Datasets ===
# Specify how to generate experiences:
Expand Down Expand Up @@ -398,6 +409,12 @@
# The number of contiguous environment steps to replay at once. This may
# be set to greater than 1 to support recurrent models.
"replay_sequence_length": 1,

# Deprecated keys.
"memory": 0,
"object_store_memory": 0,
"memory_per_worker": 0,
"object_store_memory_per_worker": 0,
}
# __sphinx_doc_end__
# yapf: enable
Expand Down Expand Up @@ -497,21 +514,37 @@ def default_logger_creator(config):
@classmethod
@override(Trainable)
def default_resource_request(
cls, config: PartialTrainerConfigDict) -> Resources:
cls, config: PartialTrainerConfigDict) -> \
Union[Resources, PlacementGroupFactory]:
cf = dict(cls._default_config, **config)
Trainer._validate_config(cf)
num_workers = cf["num_workers"] + cf["evaluation_num_workers"]
# TODO(ekl): add custom resources here once tune supports them
return Resources(
cpu=cf["num_cpus_for_driver"],
gpu=cf["num_gpus"],
memory=cf["memory"],
object_store_memory=cf["object_store_memory"],
extra_cpu=cf["num_cpus_per_worker"] * num_workers,
extra_gpu=cf["num_gpus_per_worker"] * num_workers,
extra_memory=cf["memory_per_worker"] * num_workers,
extra_object_store_memory=cf["object_store_memory_per_worker"] *
num_workers)

eval_config = cf["evaluation_config"]

# Return PlacementGroupFactory containing all needed resources
# (already properly defined as device bundles).
return PlacementGroupFactory(
bundles=[{
# Driver.
"CPU": cf["num_cpus_for_driver"],
"GPU": cf["num_gpus"]
}] + [
{
# RolloutWorkers.
"CPU": cf["num_cpus_per_worker"],
"GPU": cf["num_gpus_per_worker"]
} for _ in range(cf["num_workers"])
] + ([
{
# Evaluation workers (+1 b/c of the additional local
# worker)
"CPU": eval_config.get("num_cpus_per_worker",
cf["num_cpus_per_worker"]),
"GPU": eval_config.get("num_gpus_per_worker",
cf["num_gpus_per_worker"]),
} for _ in range(cf["evaluation_num_workers"] + 1)
] if cf["evaluation_interval"] else []),
strategy=config.get("placement_strategy", "PACK"))

@override(Trainable)
@PublicAPI
Expand Down Expand Up @@ -1097,6 +1130,15 @@ def _validate_config(config: PartialTrainerConfigDict):
if model_config is None:
config["model"] = model_config = {}

if config.get("memory", 0) != 0:
deprecation_warning(old="memory")
if config.get("object_store_memory", 0) != 0:
deprecation_warning(old="object_store_memory")
if config.get("memory_per_worker", 0) != 0:
deprecation_warning(old="memory_per_worker")
if config.get("object_store_memory_per_worker", 0) != 0:
deprecation_warning(old="object_store_memory_per_worker")

if not config.get("_use_trajectory_view_api"):
traj_view_framestacks = model_config.get("num_framestacks", "auto")
if model_config.get("_time_major"):
Expand Down
Loading