Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][enable_task_events] Options to disable task tracing on task/actor #42431

Merged
merged 20 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions doc/source/ray-core/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ define can run. This also implies that tasks are scheduled more flexibly,
and that if you don't need the stateful part of an actor, you're mostly
better off using tasks.

Task Tracing
------------

By default, Ray will trace the execution of actor tasks, and reporting task status events and profiling events
to the Ray Dashboard. See more from :ref:`State API <state-api-overview-ref>` for more details.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved

You could disable task tracing for the actor by setting the `task_tracing` option to `False` in :func:`ray.remote() <ray.remote>` and :meth:`.options() <ray.actor.ActorClass.options>`.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
You could also disable task tracing for some actor methods by setting the `task_tracing` option to `False` in :func:`ray.remote() <ray.remote>` and :meth:`.options() <ray.remote_function.RemoteFunction.options>` on the actor method.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
The per method setting will override the actor setting.


More about Ray Actors
---------------------

Expand Down
9 changes: 9 additions & 0 deletions doc/source/ray-core/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ You can change this behavior by setting
in :func:`ray.remote() <ray.remote>` and :meth:`.options() <ray.remote_function.RemoteFunction.options>`.
See :ref:`Ray fault tolerance <fault-tolerance>` for more details.

Task Tracing
------------

By default, Ray will trace the execution of tasks, and reporting task status events and profiling events
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
to the Ray Dashboard. See more from :ref:`State API <state-api-overview-ref>` for more details.
You could change this behavior by setting ``task_tracing`` options in :func:`ray.remote() <ray.remote>` and :meth:`.options() <ray.remote_function.RemoteFunction.options>`
to disable task tracing, which would not only reduce the overhead of task execution, but also reduce the amount of data sent to the Ray Dashboard.
Nested tasks don't inherit the task tracing settings from the parent task, you need to set the task tracing settings for each task separately.


More about Ray Tasks
--------------------
Expand Down
1 change: 1 addition & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
)
),
"_metadata": Option((dict, type(None))),
"task_tracing": Option(bool, default_value=True),
}


Expand Down
18 changes: 13 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3812,7 +3812,8 @@ cdef class CoreWorker:
scheduling_strategy,
c_string debugger_breakpoint,
c_string serialized_runtime_env_info,
int64_t generator_backpressure_num_objects
int64_t generator_backpressure_num_objects,
c_bool task_tracing
):
cdef:
unordered_map[c_string, double] c_resources
Expand Down Expand Up @@ -3845,7 +3846,8 @@ cdef class CoreWorker:
name, num_returns, c_resources,
b"",
generator_backpressure_num_objects,
serialized_runtime_env_info)
serialized_runtime_env_info,
task_tracing)

current_c_task_id = current_task.native()

Expand Down Expand Up @@ -3890,6 +3892,7 @@ cdef class CoreWorker:
concurrency_groups_dict,
int32_t max_pending_calls,
scheduling_strategy,
c_bool task_tracing,
):
cdef:
CRayFunction ray_function
Expand Down Expand Up @@ -3936,7 +3939,8 @@ cdef class CoreWorker:
# execute out of order for
# async or threaded actors.
is_asyncio or max_concurrency > 1,
max_pending_calls),
max_pending_calls,
task_tracing),
extension_data,
&c_actor_id)

Expand Down Expand Up @@ -4027,7 +4031,8 @@ cdef class CoreWorker:
retry_exception_allowlist,
double num_method_cpus,
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects):
int64_t generator_backpressure_num_objects,
c_bool task_tracing):

cdef:
CActorID c_actor_id = actor_id.native()
Expand All @@ -4039,6 +4044,7 @@ cdef class CoreWorker:
CTaskID current_c_task_id = CTaskID.Nil()
TaskID current_task = self.get_current_task_id()
c_string serialized_retry_exception_allowlist
c_string serialized_runtime_env = b"{}"

serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
retry_exception_allowlist,
Expand All @@ -4065,7 +4071,9 @@ cdef class CoreWorker:
num_returns,
c_resources,
concurrency_group_name,
generator_backpressure_num_objects),
generator_backpressure_num_objects,
serialized_runtime_env,
task_tracing),
max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
Expand Down
49 changes: 49 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def bar(self):
"_max_retries",
"retry_exceptions",
"_generator_backpressure_num_objects",
"task_tracing",
]
error_string = (
"The @ray.method decorator must be applied using at least one of "
Expand Down Expand Up @@ -102,6 +103,8 @@ def annotate_method(method):
method.__ray_generator_backpressure_num_objects__ = kwargs[
"_generator_backpressure_num_objects"
]
if "task_tracing" in kwargs and kwargs["task_tracing"] is not None:
method.__ray_task_tracing__ = kwargs["task_tracing"]
return method

return annotate_method
Expand Down Expand Up @@ -130,6 +133,8 @@ class ActorMethod:
_generator_backpressure_num_objects: Generator-only config.
If a number of unconsumed objects reach this threshold,
a actor task stop pausing.
task_tracing: True if task tracing is enabled, i.e. task events from
the actor should be reported. Defaults to True.
_decorator: An optional decorator that should be applied to the actor
method invocation (as opposed to the actor method execution) before
invoking the method. The decorator must return a function that
Expand All @@ -148,6 +153,7 @@ def __init__(
retry_exceptions: Union[bool, list, tuple],
is_generator: bool,
generator_backpressure_num_objects: int,
task_tracing: bool,
decorator=None,
hardref=False,
):
Expand All @@ -166,6 +172,7 @@ def __init__(
self._retry_exceptions = retry_exceptions
self._is_generator = is_generator
self._generator_backpressure_num_objects = generator_backpressure_num_objects
self._task_tracing = task_tracing
# This is a decorator that is used to wrap the function invocation (as
# opposed to the function execution). The decorator must return a
# function that takes in two arguments ("args" and "kwargs"). In most
Expand Down Expand Up @@ -275,6 +282,7 @@ def _remote(
retry_exceptions=None,
concurrency_group=None,
_generator_backpressure_num_objects=None,
task_tracing=None,
):
if num_returns is None:
num_returns = self._num_returns
Expand All @@ -285,6 +293,8 @@ def _remote(
max_retries = 0
if retry_exceptions is None:
retry_exceptions = self._retry_exceptions
if task_tracing is None:
task_tracing = self._task_tracing
if _generator_backpressure_num_objects is None:
_generator_backpressure_num_objects = (
self._generator_backpressure_num_objects
Expand All @@ -308,6 +318,7 @@ def invocation(args, kwargs):
generator_backpressure_num_objects=(
_generator_backpressure_num_objects
),
task_tracing=task_tracing,
)

# Apply the decorator if there is one.
Expand All @@ -326,6 +337,7 @@ def __getstate__(self):
"decorator": self._decorator,
"is_generator": self._is_generator,
"generator_backpressure_num_objects": self._generator_backpressure_num_objects, # noqa
"task_tracing": self._task_tracing,
}

def __setstate__(self, state):
Expand All @@ -337,6 +349,7 @@ def __setstate__(self, state):
state["retry_exceptions"],
state["is_generator"],
state["generator_backpressure_num_objects"],
state["task_tracing"],
state["decorator"],
hardref=True,
)
Expand All @@ -357,6 +370,8 @@ class _ActorClassMethodMetadata(object):
max_retries: Number of retries on method failure.
retry_exceptions: Boolean of whether you want to retry all user-raised
exceptions, or a list of allowlist exceptions to retry, for each method.
task_tracing: True if tracing is enabled, i.e. task events from
the actor should be reported. Defaults to True.
"""

_cache = {} # This cache will be cleared in ray._private.worker.disconnect()
Expand Down Expand Up @@ -395,6 +410,7 @@ def create(cls, modified_class, actor_creation_function_descriptor):
self.max_retries = {}
self.retry_exceptions = {}
self.method_is_generator = {}
self.task_tracing = {}
self.generator_backpressure_num_objects = {}
self.concurrency_group_for_methods = {}

Expand Down Expand Up @@ -438,6 +454,9 @@ def create(cls, modified_class, actor_creation_function_descriptor):
method_name
] = method.__ray_concurrency_group__

if hasattr(method, "__ray_task_tracing__"):
self.task_tracing[method_name] = method.__ray_task_tracing__

is_generator = inspect.isgeneratorfunction(
method
) or inspect.isasyncgenfunction(method)
Expand Down Expand Up @@ -771,6 +790,8 @@ def options(self, **actor_options):
_metadata: Extended options for Ray libraries. For example,
_metadata={"workflows.io/options": <workflow options>} for
Ray workflows.
task_tracing: True if tracing is enabled, i.e. task events from
the actor should be reported. Defaults to True.

Examples:

Expand Down Expand Up @@ -882,6 +903,8 @@ def _remote(self, args=None, kwargs=None, **actor_options):
Note that this limit is counted per handle. -1 means that the
number of pending calls is unlimited.
scheduling_strategy: Strategy about how to schedule this actor.
task_tracing: True if tracing is enabled, i.e. task events from
the actor should be reported. Defaults to True.

Returns:
A handle to the newly created actor.
Expand Down Expand Up @@ -951,6 +974,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
max_restarts = actor_options["max_restarts"]
max_task_retries = actor_options["max_task_retries"]
max_pending_calls = actor_options["max_pending_calls"]
task_tracing = actor_options["task_tracing"]

if scheduling_strategy is None or not isinstance(
scheduling_strategy, PlacementGroupSchedulingStrategy
Expand Down Expand Up @@ -1149,6 +1173,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
concurrency_groups_dict=concurrency_groups_dict or dict(),
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy,
task_tracing=task_tracing,
)

if _actor_launch_hook:
Expand All @@ -1160,13 +1185,15 @@ def _remote(self, args=None, kwargs=None, **actor_options):
meta.language,
actor_id,
max_task_retries,
task_tracing,
meta.method_meta.method_is_generator,
meta.method_meta.decorators,
meta.method_meta.signatures,
meta.method_meta.num_returns,
meta.method_meta.max_retries,
meta.method_meta.retry_exceptions,
meta.method_meta.generator_backpressure_num_objects,
meta.method_meta.task_tracing,
actor_method_cpu,
meta.actor_creation_function_descriptor,
worker.current_session_and_job,
Expand Down Expand Up @@ -1203,6 +1230,8 @@ class ActorHandle:
Attributes:
_ray_actor_language: The actor language.
_ray_actor_id: Actor ID.
_ray_task_tracing: The default value of whether task tracing is
enabled, i.e. task events from the actor should be reported.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
_ray_method_is_generator: Map of method name -> if it is a generator
method.
_ray_method_decorators: Optional decorators for the function
Expand All @@ -1219,6 +1248,9 @@ class ActorHandle:
_ray_method_generator_backpressure_num_objects: Generator-only
config. The max number of objects to generate before it
starts pausing a generator.
_ray_method_task_tracing: The value of whether task
tracing is enabled for the actor methods. This overrides the
actor's default value (`_ray_task_tracing`)
_ray_actor_method_cpus: The number of CPUs required by actor methods.
_ray_original_handle: True if this is the original actor handle for a
given actor. If this is true, then the actor will be destroyed when
Expand All @@ -1233,13 +1265,15 @@ def __init__(
language,
actor_id,
max_task_retries: Optional[int],
task_tracing: bool,
method_is_generator: Dict[str, bool],
method_decorators,
method_signatures,
method_num_returns: Dict[str, int],
method_max_retries: Dict[str, int],
method_retry_exceptions: Dict[str, Union[bool, list, tuple]],
method_generator_backpressure_num_objects: Dict[str, int],
method_task_tracing: Dict[str, bool],
actor_method_cpus: int,
actor_creation_function_descriptor,
session_and_job,
Expand All @@ -1249,6 +1283,8 @@ def __init__(
self._ray_actor_id = actor_id
self._ray_max_task_retries = max_task_retries
self._ray_original_handle = original_handle
self._ray_task_tracing = task_tracing

self._ray_method_is_generator = method_is_generator
self._ray_method_decorators = method_decorators
self._ray_method_signatures = method_signatures
Expand All @@ -1258,6 +1294,7 @@ def __init__(
self._ray_method_generator_backpressure_num_objects = (
method_generator_backpressure_num_objects
)
self._ray_method_task_tracing = method_task_tracing
self._ray_actor_method_cpus = actor_method_cpus
self._ray_session_and_job = session_and_job
self._ray_is_cross_language = language != Language.PYTHON
Expand Down Expand Up @@ -1290,6 +1327,10 @@ def __init__(
self._ray_method_generator_backpressure_num_objects.get(
method_name
), # noqa
self._ray_method_task_tracing.get(
method_name,
self._ray_task_tracing, # Use actor's default value
),
decorator=self._ray_method_decorators.get(method_name),
)
setattr(self, method_name, method)
Expand Down Expand Up @@ -1319,6 +1360,7 @@ def _actor_method_call(
retry_exceptions: Union[bool, list, tuple] = None,
concurrency_group_name: Optional[str] = None,
generator_backpressure_num_objects: Optional[int] = None,
task_tracing: bool = True,
):
"""Method execution stub for an actor handle.

Expand All @@ -1336,6 +1378,8 @@ def _actor_method_call(
max_retries: Number of retries when method fails.
retry_exceptions: Boolean of whether you want to retry all user-raised
exceptions, or a list of allowlist exceptions to retry.
task_tracing: True if tracing is enabled, i.e. task events from
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
the actor should be reported.

Returns:
object_refs: A list of object refs returned by the remote actor
Expand Down Expand Up @@ -1403,6 +1447,7 @@ def _actor_method_call(
self._ray_actor_method_cpus,
concurrency_group_name if concurrency_group_name is not None else b"",
generator_backpressure_num_objects,
task_tracing,
)

if num_returns == STREAMING_GENERATOR_RETURN:
Expand Down Expand Up @@ -1449,6 +1494,8 @@ def remote(self, *args, **kwargs):
False, # retry_exceptions
False, # is_generator
self._ray_method_generator_backpressure_num_objects.get(item, -1),
# TODO(rickyx): how to determinte the defautls here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not too sure about the default here.

False, # task_tracing
# Currently, cross-lang actor method not support decorator
decorator=None,
)
Expand Down Expand Up @@ -1502,6 +1549,7 @@ def _serialization_helper(self):
"method_generator_backpressure_num_objects": (
self._ray_method_generator_backpressure_num_objects
),
"method_task_tracing": self._ray_method_task_tracing,
"actor_method_cpus": self._ray_actor_method_cpus,
"actor_creation_function_descriptor": self._ray_actor_creation_function_descriptor, # noqa: E501
},
Expand Down Expand Up @@ -1544,6 +1592,7 @@ def _deserialization_helper(cls, state, outer_object_ref=None):
state["method_max_retries"],
state["method_retry_exceptions"],
state["method_generator_backpressure_num_objects"],
state["method_task_tracing"],
state["actor_method_cpus"],
state["actor_creation_function_descriptor"],
worker.current_session_and_job,
Expand Down