Skip to content

Commit

Permalink
[train] remove _max_cpu_fraction_per_node (#39412)
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Deng <matt@anyscale.com>
  • Loading branch information
matthewdeng committed Sep 8, 2023
1 parent 7da798f commit fddde50
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 198 deletions.
16 changes: 1 addition & 15 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class ScalingConfig:
use_gpu: Union[bool, SampleRange] = False
resources_per_worker: Optional[Union[Dict, SampleRange]] = None
placement_strategy: Union[str, SampleRange] = "PACK"
_max_cpu_fraction_per_node: Optional[Union[float, SampleRange]] = None

def __post_init__(self):
if self.resources_per_worker:
Expand Down Expand Up @@ -237,15 +236,7 @@ def as_placement_group_factory(self) -> "PlacementGroupFactory":
for _ in range(self.num_workers if self.num_workers else 0)
]
bundles = trainer_bundle + worker_bundles
if self._max_cpu_fraction_per_node is not None:
kwargs = {
"_max_cpu_fraction_per_node": self._max_cpu_fraction_per_node,
}
else:
kwargs = {}
return PlacementGroupFactory(
bundles, strategy=self.placement_strategy, **kwargs
)
return PlacementGroupFactory(bundles, strategy=self.placement_strategy)

@classmethod
def from_placement_group_factory(
Expand All @@ -263,7 +254,6 @@ def from_placement_group_factory(
placement_strategy = pgf.strategy
resources_per_worker = None
num_workers = None
max_cpu_fraction_per_node = None

if worker_bundles:
first_bundle = worker_bundles[0]
Expand All @@ -276,16 +266,12 @@ def from_placement_group_factory(
num_workers = len(worker_bundles)
resources_per_worker = first_bundle

if "_max_cpu_fraction_per_node" in pgf._kwargs:
max_cpu_fraction_per_node = pgf._kwargs["_max_cpu_fraction_per_node"]

return ScalingConfig(
trainer_resources=trainer_resources,
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker=resources_per_worker,
placement_strategy=placement_strategy,
_max_cpu_fraction_per_node=max_cpu_fraction_per_node,
)


Expand Down
5 changes: 2 additions & 3 deletions python/ray/air/tests/test_resource_changing.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ def test_gbdt_trainer(ray_start_8_cpus):
trainer = AssertingXGBoostTrainer(
datasets={TRAIN_DATASET_KEY: train_ds},
label_column="target",
scaling_config=ScalingConfig(
num_workers=2, placement_strategy="SPREAD", _max_cpu_fraction_per_node=0.9
),
scaling_config=ScalingConfig(num_workers=2, placement_strategy="SPREAD"),
params={
"objective": "binary:logistic",
"eval_metric": ["logloss"],
Expand All @@ -145,6 +143,7 @@ def test_gbdt_trainer(ray_start_8_cpus):
tune_config=TuneConfig(
mode="min",
metric="train-logloss",
max_concurrent_trials=3,
scheduler=ResourceChangingScheduler(
ASHAScheduler(),
resources_allocation_function=DistributeResources(
Expand Down
1 change: 0 additions & 1 deletion python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def training_loop(self):

_scaling_config_allowed_keys: List[str] = [
"trainer_resources",
"_max_cpu_fraction_per_node",
]
_handles_checkpoint_freq: bool = False
_handles_checkpoint_at_end: bool = False
Expand Down
5 changes: 0 additions & 5 deletions python/ray/train/gbdt_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ def get_tune_resources(self) -> PlacementGroupFactory:
placement_options = {
"strategy": scaling_config.placement_strategy,
}
# Special case, same as in ScalingConfig.as_placement_group_factory
if scaling_config._max_cpu_fraction_per_node is not None:
placement_options[
"_max_cpu_fraction_per_node"
] = scaling_config._max_cpu_fraction_per_node
ray_params = ray_params_cls_extended(
placement_options=placement_options,
**ray_params_kwargs,
Expand Down
130 changes: 1 addition & 129 deletions python/ray/train/tests/test_base_trainer.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
import logging
import tempfile
from unittest.mock import patch

import pytest

import ray
from ray import train, tune
from ray.train import Checkpoint, ScalingConfig
from ray.air.constants import MAX_REPR_LENGTH
from ray.tune.impl import tuner_internal
from ray.train.gbdt_trainer import GBDTTrainer
from ray.train.trainer import BaseTrainer
from ray.util.placement_group import get_current_placement_group

logger = logging.getLogger(__name__)


@pytest.fixture
def mock_tuner_internal_logger():
class MockLogger:
def __init__(self):
self.warnings = []

def warning(self, msg):
self.warnings.append(msg)

def warn(self, msg, **kwargs):
self.warnings.append(msg)

def info(self, msg):
print(msg)

def clear(self):
self.warnings = []

old = tuner_internal.warnings
tuner_internal.warnings = MockLogger()
yield tuner_internal.warnings
# The code after the yield will run as teardown code.
tuner_internal.warnings = old


class DummyTrainer(BaseTrainer):
_scaling_config_allowed_keys = BaseTrainer._scaling_config_allowed_keys + [
"num_workers",
Expand Down Expand Up @@ -127,105 +100,7 @@ def check_override(self):
tune.run(trainer.as_trainable(), config=new_config)


def test_reserved_cpus(ray_start_4_cpus):
def train_loop(self):
ray.data.range(10).show()

# Will deadlock without reserved CPU fraction.
scale_config = ScalingConfig(num_workers=1, _max_cpu_fraction_per_node=0.9)
trainer = DummyTrainer(
train_loop,
scaling_config=scale_config,
)
tune.run(trainer.as_trainable(), num_samples=4)

# Needs to request 0 CPU for the trainer otherwise the pg
# will require {CPU: 1} * 2 resources, which means
# _max_cpu_fraction_per_node == 0.01 cannot schedule it
# (because this only allows to have 1 CPU for pg per node).
scale_config = ScalingConfig(
num_workers=1, _max_cpu_fraction_per_node=0.01, trainer_resources={"CPU": 0}
)
trainer = DummyTrainer(
train_loop,
scaling_config=scale_config,
)
tune.run(trainer.as_trainable(), num_samples=4)


@patch("ray.available_resources", ray.cluster_resources)
def test_reserved_cpu_warnings(ray_start_4_cpus, mock_tuner_internal_logger):
# ray.available_resources() is used in the warning logic.
# We mock it as it can be stochastic due to garbage collection etc.
# The aim of this test is not to check if ray.available_resources()
# works correctly, but to test the warning logic.

def train_loop(config):
pass

# Fraction correctly specified.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, _max_cpu_fraction_per_node=0.9),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings

# No datasets, no fraction.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings

# Should warn.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert (
len(mock_tuner_internal_logger.warnings) == 1
), mock_tuner_internal_logger.warnings
assert "_max_cpu_fraction_per_node" in mock_tuner_internal_logger.warnings[0]
mock_tuner_internal_logger.clear()

# Warn if num_samples is configured
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert (
len(mock_tuner_internal_logger.warnings) == 1
), mock_tuner_internal_logger.warnings
assert "_max_cpu_fraction_per_node" in mock_tuner_internal_logger.warnings[0]
mock_tuner_internal_logger.clear()

# Don't warn if resources * samples < 0.8
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, trainer_resources={"CPU": 0}),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not mock_tuner_internal_logger.warnings

# Don't warn if Trainer is not used
tuner = tune.Tuner(train_loop, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not mock_tuner_internal_logger.warnings


def test_reserved_cpu_warnings_no_cpu_usage(
ray_start_1_cpu_1_gpu, mock_tuner_internal_logger
):
def test_reserved_cpu_warnings_no_cpu_usage(ray_start_1_cpu_1_gpu):
"""Ensure there is no divide by zero error if trial requires no CPUs."""

def train_loop(config):
Expand All @@ -239,7 +114,6 @@ def train_loop(config):
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings


def test_setup(ray_start_4_cpus):
Expand Down Expand Up @@ -303,6 +177,4 @@ def training_loop(self):
if __name__ == "__main__":
import sys

import pytest

sys.exit(pytest.main(sys.argv[1:] + ["-v", "-x", __file__]))
2 changes: 0 additions & 2 deletions python/ray/train/tests/test_xgboost_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class ScalingConfigAssertingXGBoostTrainer(XGBoostTrainer):
def training_loop(self) -> None:
pgf = train.get_context().get_trial_resources()
assert pgf.strategy == "SPREAD"
assert pgf._kwargs["_max_cpu_fraction_per_node"] == 0.9
return super().training_loop()


Expand All @@ -78,7 +77,6 @@ def test_fit_with_advanced_scaling_config(ray_start_4_cpus):
scaling_config=ScalingConfig(
num_workers=2,
placement_strategy="SPREAD",
_max_cpu_fraction_per_node=0.9,
),
label_column="target",
params=params,
Expand Down
42 changes: 0 additions & 42 deletions python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import math
import logging
import warnings
import shutil
import tempfile
from pathlib import Path
Expand All @@ -21,7 +20,6 @@

import pyarrow.fs

import ray
import ray.cloudpickle as pickle
from ray.util import inspect_serializability
from ray.air._internal.remote_storage import download_from_uri, is_non_local_path_uri
Expand Down Expand Up @@ -158,8 +156,6 @@ def __init__(
with open(experiment_checkpoint_path / _TUNER_PKL, "wb") as fp:
pickle.dump(self.__getstate__(), fp)

self._maybe_warn_resource_contention()

def get_run_config(self) -> RunConfig:
return self._run_config

Expand Down Expand Up @@ -190,44 +186,6 @@ def _expected_utilization(self, cpus_per_trial, cpus_total):
)
return (actual_concurrency * cpus_per_trial) / (cpus_total + 0.001)

def _maybe_warn_resource_contention(self):
if not ray.is_initialized():
return

trainable = self.converted_trainable

# This may not be precise, but we don't have a great way of
# accessing the actual scaling config if it is being tuned.
scaling_config = None
get_scaling_config = getattr(trainable, "base_scaling_config", None)
if callable(get_scaling_config):
scaling_config = get_scaling_config()

if scaling_config is None or scaling_config._max_cpu_fraction_per_node:
return

has_base_dataset = getattr(trainable, "has_base_dataset", False)

cpus_per_trial = scaling_config.total_resources.get("CPU", 0)
cpus_left = ray.available_resources().get("CPU", 0) # avoid div by 0
# TODO(amogkam): Remove this warning after _max_cpu_fraction_per_node is no
# longer experimental.
if (
has_base_dataset
and self._expected_utilization(cpus_per_trial, cpus_left) > 0.8
):
warnings.warn(
"Executing `.fit()` may leave less than 20% of CPUs in "
"this cluster for Dataset execution, which can lead to "
"resource contention or hangs. To avoid this, "
"reserve at least 20% of node CPUs for Dataset execution by "
"setting `_max_cpu_fraction_per_node = 0.8` in the Trainer "
"scaling_config. See "
"https://docs.ray.io/en/master/data/dataset-internals.html"
"#datasets-and-tune for more info.",
stacklevel=4,
)

def _validate_trainable(
self, trainable: TrainableType, required_trainable_name: Optional[str] = None
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def run_ingest_bulk(dataset, num_workers, num_cpus_per_worker):
num_workers=num_workers,
trainer_resources={"CPU": 0},
resources_per_worker={"CPU": num_cpus_per_worker},
_max_cpu_fraction_per_node=0.1,
),
datasets={"train": dataset},
num_epochs=1,
Expand Down

0 comments on commit fddde50

Please sign in to comment.