[Core] Support publishing Submission job events using ray event recorder#61099
[Core] Support publishing Submission job events using ray event recorder#61099sampan-s-nayak wants to merge 36 commits into
Conversation
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
…nto job_events_missing_fields_3 Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request successfully integrates the Ray event recorder framework into the job submission system, enabling the emission of job definition and lifecycle events. The implementation follows the existing pattern for actor and node events, using a priority-based approach where the new framework takes precedence when enabled. The addition of an end-to-end test ensures the correctness of event emission and capture. I have provided a few suggestions to improve the efficiency of protobuf timestamp handling and to ensure robust initialization of the event recorder.
| now = time.time() | ||
| transition.timestamp.CopyFrom( | ||
| Timestamp(seconds=int(now), nanos=int((now % 1) * 1e9)) | ||
| ) |
There was a problem hiding this comment.
Instead of creating a new Timestamp object and using CopyFrom, you can use the FromMilliseconds method directly on the existing transition.timestamp object. This is more efficient and avoids manual nanosecond calculations which can sometimes suffer from floating-point precision issues.
| now = time.time() | |
| transition.timestamp.CopyFrom( | |
| Timestamp(seconds=int(now), nanos=int((now % 1) * 1e9)) | |
| ) | |
| transition.timestamp.FromMilliseconds(int(time.time() * 1000)) |
| if ray_constants.RAY_ENABLE_RAY_EVENT: | ||
| try: | ||
| from ray._raylet import initialize_event_recorder | ||
|
|
||
| initialize_event_recorder( | ||
| aggregator_address=self._dashboard_agent.ip, | ||
| aggregator_port=self._dashboard_agent.grpc_port, | ||
| node_ip=self._dashboard_agent.ip, | ||
| node_id_hex=self._dashboard_agent.node_id, | ||
| max_buffer_size=10000, | ||
| ) | ||
| logger.info("Initialized ray event recorder in JobAgent.") | ||
| except Exception: | ||
| logger.warning( | ||
| "Failed to initialize ray event recorder in JobAgent.", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
The initialize_event_recorder call is placed within the run method of JobAgent. While this works for initialization, consider if a corresponding shutdown_event_recorder call is needed when the agent or module stops, similar to the implementation in JobSupervisor. This ensures that any buffered events are flushed before the process exits.
Signed-off-by: sampan <sampan@anyscale.com>
…elds_3 Signed-off-by: sampan <sampan@anyscale.com>
…ss attribute Cython cdef class types are immutable — setting EventRecorder._instance at module scope raises TypeError at runtime. Use a module-level _event_recorder_instance variable with global declarations instead. Signed-off-by: sampan <sampan@anyscale.com>
…elds_3 Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
not stale |
Existing users who set RAY_enable_ray_event for C++ events should not automatically opt into the new Python event pipeline. Introduce a separate RAY_enable_python_ray_event flag for Python-side ONE-event publishing (job events via EventRecorder, autoscaler events via DashboardHeadRayEventPublisher). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com>
## Description Adds cython bindings for c++ rayEventRecorder and sets up the scaffolding required to emit events from python (using the new one-event framework). This will also be used to emit library events in the future refer: #61099 for example usage of this abstraction --------- Signed-off-by: sampan <sampan@anyscale.com> Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com> Co-authored-by: sampan <sampan@anyscale.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
## Description Adds cython bindings for c++ rayEventRecorder and sets up the scaffolding required to emit events from python (using the new one-event framework). This will also be used to emit library events in the future refer: ray-project#61099 for example usage of this abstraction --------- Signed-off-by: sampan <sampan@anyscale.com> Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com> Co-authored-by: sampan <sampan@anyscale.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
## Description Adds cython bindings for c++ rayEventRecorder and sets up the scaffolding required to emit events from python (using the new one-event framework). This will also be used to emit library events in the future refer: ray-project#61099 for example usage of this abstraction --------- Signed-off-by: sampan <sampan@anyscale.com> Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com> Co-authored-by: sampan <sampan@anyscale.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
## Description Adds cython bindings for c++ rayEventRecorder and sets up the scaffolding required to emit events from python (using the new one-event framework). This will also be used to emit library events in the future refer: ray-project#61099 for example usage of this abstraction --------- Signed-off-by: sampan <sampan@anyscale.com> Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com> Co-authored-by: sampan <sampan@anyscale.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
Description
use the newly added python ray event exporter framework to emit submission job events and verify using an E2E test.