Skip to content

Commit

Permalink
[Core] Allow to rate limit the max # of workers concurrently started (#…
Browse files Browse the repository at this point in the history
…39253)

Currently # of workers concurrently started is limited to number of CPUs. However, this seems to cause issues when we are running Ray in an environment where worker startup is slow (note: ray worker startup requires high disk read).

This PR allows to overwrite the worker_maximum_startup_concurrency  via RAY_ worker_maximum_startup_concurrency  instead of relying on number of cpus.
  • Loading branch information
rkooo567 committed Sep 7, 2023
1 parent 6523d94 commit 8b7fcd7
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ RAY_CONFIG(int64_t, kill_worker_timeout_milliseconds, 100)
/// starting_worker_timeout_callback() is called.
RAY_CONFIG(int64_t, worker_register_timeout_seconds, 60)

/// The maximum workers raylet can start at the same time.
/// 0 means it will use the default (number of CPUs).
RAY_CONFIG(int64_t, worker_maximum_startup_concurrency, 0)

/// The maximum number of workers to iterate whenever we analyze the resources usage.
RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128)

Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ DEFINE_int32(num_prestart_python_workers,
0,
"Number of prestarted default Python workers on raylet startup.");
DEFINE_bool(head, false, "Whether this node is a head node.");
/// NOTE: This value is overwritten inside worker_pool.h by
/// worker_maximum_startup_concurrency.
DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency.");
DEFINE_string(static_resource_list, "", "The static resource list of this node.");
DEFINE_string(python_worker_command, "", "Python worker command.");
Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service,
num_prestart_python_workers(num_prestarted_python_workers),
periodical_runner_(io_service),
get_time_(get_time) {
if (RayConfig::instance().worker_maximum_startup_concurrency() > 0) {
// Overwrite the maximum concurrency.
maximum_startup_concurrency_ =
RayConfig::instance().worker_maximum_startup_concurrency();
}

RAY_CHECK(maximum_startup_concurrency > 0);
// We need to record so that the metric exists. This way, we report that 0
// processes have started before a task runs on the node (as opposed to the
Expand Down

0 comments on commit 8b7fcd7

Please sign in to comment.