Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add an internal system concurrency group for executing compiled dag tasks #41605

Merged
merged 4 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions python/ray/tests/test_concurrency_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ def f2(self):
assert "ok" == ray.get(async_actor.f2.remote())


def test_system_concurrency_group(ray_start_regular_shared):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you test if users provide the default concurrency group, it raises an excepetion?

# It should fail.
@ray.remote(concurrency_groups={"_ray_system": 2})
class Actor:
   ...

Also it seems like concurrency group has a value (to limit concurrency). What's the default for the system concurrency group? Just 1?

Copy link
Contributor Author

@ericl ericl Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine--- it's an internal feature and we don't provide any guarantees on how it works.

@ray.remote
class NormalActor:
def block_forever(self):
time.sleep(9999)
return "never"

def ping(self):
return "pong"

n = NormalActor.remote()
n.block_forever.options(concurrency_group="_ray_system").remote()
print(ray.get(n.ping.remote()))


if __name__ == "__main__":
import os

Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,11 @@ RAY_CONFIG(std::string, predefined_unit_instance_resources, "GPU")
/// When set it to "neuron_cores,TPU,FPGA", we will also treat FPGA as unit_instance.
RAY_CONFIG(std::string, custom_unit_instance_resources, "neuron_cores,TPU,NPU")

/// The name of the system-created concurrency group for actors. This group is
/// created with 1 thread, and is created lazily. The intended usage is for
/// Ray-internal auxiliary tasks (e.g., accelerated dag workers).
RAY_CONFIG(std::string, system_concurrency_group_name, "_ray_system")

// Maximum size of the batches when broadcasting resources to raylet.
RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512)

Expand Down
19 changes: 13 additions & 6 deletions src/ray/core_worker/transport/concurrency_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,21 @@ ConcurrencyGroupManager<ExecutorType>::ConcurrencyGroupManager(
// the thread pools instead of main thread.
if (ExecutorType::NeedDefaultExecutor(max_concurrency_for_default_concurrency_group) ||
!concurrency_groups.empty()) {
defatult_executor_ =
default_executor_ =
std::make_shared<ExecutorType>(max_concurrency_for_default_concurrency_group);
}
}

template <typename ExecutorType>
std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetExecutor(
const std::string &concurrency_group_name, const ray::FunctionDescriptor &fd) {
if (concurrency_group_name == RayConfig::instance().system_concurrency_group_name() &&
name_to_executor_index_.find(concurrency_group_name) ==
name_to_executor_index_.end()) {
auto executor = std::make_shared<ExecutorType>(1);
name_to_executor_index_[concurrency_group_name] = executor;
}

if (!concurrency_group_name.empty()) {
auto it = name_to_executor_index_.find(concurrency_group_name);
/// TODO(qwang): Fail the user task.
Expand All @@ -64,26 +71,26 @@ std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetExecutor
functions_to_executor_index_.end()) {
return functions_to_executor_index_[fd->ToString()];
}
return defatult_executor_;
return default_executor_;
}

/// Get the default executor.
template <typename ExecutorType>
std::shared_ptr<ExecutorType> ConcurrencyGroupManager<ExecutorType>::GetDefaultExecutor()
const {
return defatult_executor_;
return default_executor_;
}

/// Stop and join the executors that the this manager owns.
template <typename ExecutorType>
void ConcurrencyGroupManager<ExecutorType>::Stop() {
if (defatult_executor_) {
if (default_executor_) {
RAY_LOG(DEBUG) << "Default executor is stopping.";
defatult_executor_->Stop();
default_executor_->Stop();
RAY_LOG(INFO) << "Default executor is joining. If the 'Default executor is joined.' "
"message is not printed after this, the worker is probably "
"hanging because the actor task is running an infinite loop.";
defatult_executor_->Join();
default_executor_->Join();
RAY_LOG(INFO) << "Default executor is joined.";
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/concurrency_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConcurrencyGroupManager final {
functions_to_executor_index_;

// The default concurrency group executor. It's nullptr if its max concurrency is 1.
std::shared_ptr<ExecutorType> defatult_executor_ = nullptr;
std::shared_ptr<ExecutorType> default_executor_ = nullptr;

friend class ConcurrencyGroupManagerTest;
};
Expand Down
Loading