Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][enable_task_events] Options to disable task tracing on task/actor #42431

Merged
merged 20 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
)
),
"_metadata": Option((dict, type(None))),
"_report_task_events": Option(bool, default_value=True),
}


Expand Down
9 changes: 7 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3890,6 +3890,7 @@ cdef class CoreWorker:
concurrency_groups_dict,
int32_t max_pending_calls,
scheduling_strategy,
c_bool report_task_events,
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
):
cdef:
CRayFunction ray_function
Expand Down Expand Up @@ -4027,7 +4028,8 @@ cdef class CoreWorker:
retry_exception_allowlist,
double num_method_cpus,
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects):
int64_t generator_backpressure_num_objects,
c_bool report_task_events):

cdef:
CActorID c_actor_id = actor_id.native()
Expand All @@ -4039,6 +4041,7 @@ cdef class CoreWorker:
CTaskID current_c_task_id = CTaskID.Nil()
TaskID current_task = self.get_current_task_id()
c_string serialized_retry_exception_allowlist
c_string serialized_runtime_env = b"{}"

serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
retry_exception_allowlist,
Expand All @@ -4065,7 +4068,9 @@ cdef class CoreWorker:
num_returns,
c_resources,
concurrency_group_name,
generator_backpressure_num_objects),
generator_backpressure_num_objects,
serialized_runtime_env,
report_task_events),
max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
Expand Down
10 changes: 10 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def _remote(
retry_exceptions=None,
concurrency_group=None,
_generator_backpressure_num_objects=None,
_report_task_events=True,
):
if num_returns is None:
num_returns = self._num_returns
Expand Down Expand Up @@ -308,6 +309,7 @@ def invocation(args, kwargs):
generator_backpressure_num_objects=(
_generator_backpressure_num_objects
),
report_task_events=_report_task_events,
)

# Apply the decorator if there is one.
Expand Down Expand Up @@ -771,6 +773,8 @@ def options(self, **actor_options):
_metadata: Extended options for Ray libraries. For example,
_metadata={"workflows.io/options": <workflow options>} for
Ray workflows.
_report_task_events: True if task events from the actor should be
reported. Defaults to True.

Examples:

Expand Down Expand Up @@ -882,6 +886,8 @@ def _remote(self, args=None, kwargs=None, **actor_options):
Note that this limit is counted per handle. -1 means that the
number of pending calls is unlimited.
scheduling_strategy: Strategy about how to schedule this actor.
_report_task_events: True if task events from the actor should be
reported. Defaults to True.

Returns:
A handle to the newly created actor.
Expand Down Expand Up @@ -951,6 +957,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
max_restarts = actor_options["max_restarts"]
max_task_retries = actor_options["max_task_retries"]
max_pending_calls = actor_options["max_pending_calls"]
report_task_events = actor_options["_report_task_events"]

if scheduling_strategy is None or not isinstance(
scheduling_strategy, PlacementGroupSchedulingStrategy
Expand Down Expand Up @@ -1149,6 +1156,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
concurrency_groups_dict=concurrency_groups_dict or dict(),
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy,
report_task_events=report_task_events,
)

if _actor_launch_hook:
Expand Down Expand Up @@ -1319,6 +1327,7 @@ def _actor_method_call(
retry_exceptions: Union[bool, list, tuple] = None,
concurrency_group_name: Optional[str] = None,
generator_backpressure_num_objects: Optional[int] = None,
report_task_events: bool = True,
):
"""Method execution stub for an actor handle.

Expand Down Expand Up @@ -1403,6 +1412,7 @@ def _actor_method_call(
self._ray_actor_method_cpus,
concurrency_group_name if concurrency_group_name is not None else b"",
generator_backpressure_num_objects,
report_task_events,
)

if num_returns == STREAMING_GENERATOR_RETURN:
Expand Down
5 changes: 5 additions & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ cdef extern from "ray/core_worker/common.h" nogil:
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects,
c_string serialized_runtime_env)
CTaskOptions(c_string name, int num_returns,
unordered_map[c_string, double] &resources,
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects,
c_string serialized_runtime_env, c_bool report_task_events)

cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions":
CActorCreationOptions()
Expand Down
27 changes: 27 additions & 0 deletions python/ray/tests/test_task_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,33 @@ def verify(expected_task_name):
wait_for_condition(verify, expected_task_name="AsyncActor.main_task")


def test_disable_report_task_events(shutdown_only):
ray.init(num_cpus=1, _system_config=_SYSTEM_CONFIG)

@ray.remote
class Actor:
def f(self):
pass

a = Actor.remote()

ray.get(a.f.options(name="no-report", _report_task_events=False).remote())
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
ray.get(a.f.options(name="report").remote())

def verify():
tasks = list_tasks()
print(tasks)
assert len(tasks) == 2
assert sorted({t["name"] for t in tasks}) == sorted(
{"report", "Actor.__init__"}
)
return True

wait_for_condition(verify)

# TODO: more testing on actor/task, per actor task configuration and etc.


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ const std::string TaskSpecification::GetSerializedRetryExceptionAllowlist() cons
return message_->serialized_retry_exception_allowlist();
}

bool TaskSpecification::ShouldReportTaskEvents() const {
return message_->report_task_events();
}

// === Below are getter methods specific to actor creation tasks.

ActorID TaskSpecification::ActorCreationId() const {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

void EmitTaskMetrics() const;

/// \return true if task events from this task should be reported.
bool ShouldReportTaskEvents() const;

private:
void ComputeResources();

Expand Down
4 changes: 3 additions & 1 deletion src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ class TaskSpecBuilder {
int64_t depth,
const TaskID &submitter_task_id,
const std::shared_ptr<rpc::RuntimeEnvInfo> runtime_env_info = nullptr,
const std::string &concurrency_group_name = "") {
const std::string &concurrency_group_name = "",
bool report_task_events = true) {
message_->set_type(TaskType::NORMAL_TASK);
message_->set_name(name);
message_->set_language(language);
Expand Down Expand Up @@ -163,6 +164,7 @@ class TaskSpecBuilder {
message_->mutable_runtime_env_info()->CopyFrom(*runtime_env_info);
}
message_->set_concurrency_group_name(concurrency_group_name);
message_->set_report_task_events(report_task_events);
return *this;
}

Expand Down
18 changes: 14 additions & 4 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ struct TaskOptions {
std::unordered_map<std::string, double> &resources,
const std::string &concurrency_group_name = "",
int64_t generator_backpressure_num_objects = -1,
const std::string &serialized_runtime_env_info = "{}")
const std::string &serialized_runtime_env_info = "{}",
bool report_task_events = true)
: name(name),
num_returns(num_returns),
resources(resources),
concurrency_group_name(concurrency_group_name),
serialized_runtime_env_info(serialized_runtime_env_info),
generator_backpressure_num_objects(generator_backpressure_num_objects) {}
generator_backpressure_num_objects(generator_backpressure_num_objects),
report_task_events(report_task_events) {}

/// The name of this task.
std::string name;
Expand All @@ -90,6 +92,9 @@ struct TaskOptions {
/// -1 means either streaming generator is not used or
/// it is used but the feature is disabled.
int64_t generator_backpressure_num_objects;
/// True if task events (worker::TaskEvent) from this task should be reported, default
/// to true.
bool report_task_events = true;
};

/// Options for actor creation tasks.
Expand All @@ -109,7 +114,8 @@ struct ActorCreationOptions {
const std::string &serialized_runtime_env_info = "{}",
const std::vector<ConcurrencyGroup> &concurrency_groups = {},
bool execute_out_of_order = false,
int32_t max_pending_calls = -1)
int32_t max_pending_calls = -1,
bool report_task_events = true)
: max_restarts(max_restarts),
max_task_retries(max_task_retries),
max_concurrency(max_concurrency),
Expand All @@ -125,7 +131,8 @@ struct ActorCreationOptions {
concurrency_groups(concurrency_groups.begin(), concurrency_groups.end()),
execute_out_of_order(execute_out_of_order),
max_pending_calls(max_pending_calls),
scheduling_strategy(scheduling_strategy) {
scheduling_strategy(scheduling_strategy),
report_task_events(report_task_events) {
// Check that resources is a subset of placement resources.
for (auto &resource : resources) {
auto it = this->placement_resources.find(resource.first);
Expand Down Expand Up @@ -177,6 +184,9 @@ struct ActorCreationOptions {
const int max_pending_calls = -1;
// The strategy about how to schedule this actor.
rpc::SchedulingStrategy scheduling_strategy;
/// True if task events (worker::TaskEvent) from this creation task should be reported
/// default to true.
const bool report_task_events = true;
};

using PlacementStrategy = rpc::PlacementStrategy;
Expand Down
60 changes: 34 additions & 26 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,8 @@ void CoreWorker::BuildCommonTaskSpec(
const TaskID &main_thread_current_task_id,
const std::string &concurrency_group_name,
bool include_job_config,
int64_t generator_backpressure_num_objects) {
int64_t generator_backpressure_num_objects,
bool report_task_events) {
// Build common task spec.
auto override_runtime_env_info =
OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info);
Expand Down Expand Up @@ -1925,7 +1926,8 @@ void CoreWorker::BuildCommonTaskSpec(
depth,
main_thread_current_task_id,
override_runtime_env_info,
concurrency_group_name);
concurrency_group_name,
report_task_events);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
Expand Down Expand Up @@ -1980,7 +1982,8 @@ std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
/*concurrency_group_name*/ "",
/*include_job_config*/ true,
/*generator_backpressure_num_objects*/
task_options.generator_backpressure_num_objects);
task_options.generator_backpressure_num_objects,
/*report_task_event*/ task_options.report_task_events);
builder.SetNormalTaskSpec(max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
Expand Down Expand Up @@ -2065,7 +2068,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
worker_context_.GetMainThreadOrActorCreationTaskID(),
/*concurrency_group_name*/ "",
/*include_job_config*/ true,
/*generator_backpressure_num_objects*/ -1);
/*generator_backpressure_num_objects*/ -1,
/*report_task_events*/ actor_creation_options.report_task_events);

// If the namespace is not specified, get it from the job.
const auto ray_namespace = (actor_creation_options.ray_namespace.empty()
Expand Down Expand Up @@ -2309,7 +2313,8 @@ Status CoreWorker::SubmitActorTask(
task_options.concurrency_group_name,
/*include_job_config*/ false,
/*generator_backpressure_num_objects*/
task_options.generator_backpressure_num_objects);
task_options.generator_backpressure_num_objects,
/*report_task_events*/ task_options.report_task_events);
// NOTE: placement_group_capture_child_tasks and runtime_env will
// be ignored in the actor because we should always follow the actor's option.

Expand Down Expand Up @@ -2683,21 +2688,18 @@ Status CoreWorker::ExecuteTask(
if (!options_.is_local_mode) {
task_counter_.MovePendingToRunning(func_name, task_spec.IsRetry());

if (task_spec.IsActorTask() && !actor_repr_name.empty()) {
task_manager_->RecordTaskStatusEvent(
task_spec.AttemptNumber(),
task_spec,
rpc::TaskStatus::RUNNING,
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(actor_repr_name, pid_));
} else {
task_manager_->RecordTaskStatusEvent(
task_spec.AttemptNumber(),
task_spec,
rpc::TaskStatus::RUNNING,
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(pid_));
}
const auto update =
(task_spec.IsActorTask() && !actor_repr_name.empty())
? worker::TaskStatusEvent::TaskStateUpdate(actor_repr_name, pid_)
: worker::TaskStatusEvent::TaskStateUpdate(pid_);
RAY_UNUSED(
task_manager_->RecordTaskStatusEventIfNeeded(task_spec.TaskId(),
worker_context_.GetCurrentJobID(),
task_spec.AttemptNumber(),
task_spec,
rpc::TaskStatus::RUNNING,
/* include_task_info */ false,
update));

worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId(), task_spec.AttemptNumber(), task_spec.GetName());
Expand Down Expand Up @@ -4379,12 +4381,14 @@ void CoreWorker::RecordTaskLogStart(const TaskID &task_id,
auto current_task = worker_context_.GetCurrentTask();
RAY_CHECK(current_task)
<< "We should have set the current task spec while executing the task.";
task_manager_->RecordTaskStatusEvent(
RAY_UNUSED(task_manager_->RecordTaskStatusEventIfNeeded(
task_id,
worker_context_.GetCurrentJobID(),
attempt_number,
*current_task,
rpc::TaskStatus::NIL,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info));
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info)));
}

void CoreWorker::RecordTaskLogEnd(const TaskID &task_id,
Expand All @@ -4401,12 +4405,14 @@ void CoreWorker::RecordTaskLogEnd(const TaskID &task_id,
auto current_task = worker_context_.GetCurrentTask();
RAY_CHECK(current_task)
<< "We should have set the current task spec before executing the task.";
task_manager_->RecordTaskStatusEvent(
RAY_UNUSED(task_manager_->RecordTaskStatusEventIfNeeded(
task_id,
worker_context_.GetCurrentJobID(),
attempt_number,
*current_task,
rpc::TaskStatus::NIL,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info));
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info)));
}

void CoreWorker::UpdateTaskIsDebuggerPaused(const TaskID &task_id,
Expand All @@ -4417,12 +4423,14 @@ void CoreWorker::UpdateTaskIsDebuggerPaused(const TaskID &task_id,
<< "We should have set the current task spec before executing the task.";
RAY_LOG(DEBUG) << "Task " << current_task_it->second.TaskId()
<< " is paused by debugger set to" << is_debugger_paused;
task_manager_->RecordTaskStatusEvent(
RAY_UNUSED(task_manager_->RecordTaskStatusEventIfNeeded(
task_id,
worker_context_.GetCurrentJobID(),
current_task_it->second.AttemptNumber(),
current_task_it->second,
rpc::TaskStatus::NIL,
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(is_debugger_paused));
worker::TaskStatusEvent::TaskStateUpdate(is_debugger_paused)));
}

ClusterSizeBasedLeaseRequestRateLimiter::ClusterSizeBasedLeaseRequestRateLimiter(
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const TaskID &main_thread_current_task_id,
const std::string &concurrency_group_name = "",
bool include_job_config = false,
int64_t generator_backpressure_num_objects = -1);
int64_t generator_backpressure_num_objects = -1,
bool report_task_events = true);
void SetCurrentTaskId(const TaskID &task_id,
uint64_t attempt_number,
const std::string &task_name);
Expand Down