[Core] Publish platform events via Ray Event Recorder#63329
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the PlatformEventBuilder class to support infrastructure platform events, such as those from Kubernetes, and integrates it into the Ray dashboard's observability module. The changes include initializing the EventRecorder in the dashboard head and emitting events during processing callbacks. Feedback suggests allowing the EventRecorder to generate unique IDs for event updates to avoid deduplication issues, refactoring environment variable checks for efficiency, and moving imports out of the event processing hot path.
| if os.environ.get("RAY_ENABLE_PYTHON_RAY_EVENT", "False").lower() in ( | ||
| "true", | ||
| "1", | ||
| ): | ||
| try: | ||
| from ray._common.observability.platform_events import ( | ||
| PlatformEventBuilder, | ||
| ) | ||
| from ray._raylet import EventRecorder |
There was a problem hiding this comment.
| timestamp_ns=int(time.time() * 1e9), | ||
| ) | ||
| EventRecorder.emit(cython_event) | ||
| return True |
There was a problem hiding this comment.
E2E test never initializes EventRecorder on worker
High Severity
The remote task emit_test_platform_event calls EventRecorder.emit() on a worker process, but EventRecorder.initialize() is never called on that worker. EventRecorder is a per-process singleton, and the only initialize() call in the entire codebase is in platform_event_head.py's run() method, which runs in the dashboard head process — a completely different process. When the worker calls emit(), the singleton _event_recorder_instance is None, so emit_batch silently drops the event and returns False. The test will then timeout at the wait_for_condition check. The comment even says "explicitly initializes and emits" but the code only emits.
Reviewed by Cursor Bugbot for commit e7be190. Configure here.
8718785 to
b35d418
Compare
b35d418 to
bac7c35
Compare
bac7c35 to
271db30
Compare
a8a923c to
1e59f09
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Reviewed by Cursor Bugbot for commit 1e59f09. Configure here.
| ) | ||
| head_node_id_hex = ( | ||
| head_node_id_bytes.decode() if head_node_id_bytes else None | ||
| ) |
There was a problem hiding this comment.
Duplicates existing get_head_node_id utility function
Low Severity
The logic to fetch the head node ID from GCS (lines 54–61) duplicates the existing get_head_node_id() utility in ray.dashboard.modules.job.utils, which performs the identical KV lookup with the same key, namespace, and timeout, and returns the decoded hex string or None.
Reviewed by Cursor Bugbot for commit 1e59f09. Configure here.
|
@sampan-s-nayak could you please help take a pass at this PR whenever possible for you? Thanks! |
|
|
||
| def _process_event_callback(self, ray_event: RayEvent): | ||
| """Callback running in the main asyncio loop to cache events.""" | ||
| """Thread-safe entry point that dispatches event caching to the main asyncio loop.""" |
|
@sampan-s-nayak PTAL |
| RAY_EXPORT_EVENT_MAX_BACKUP_COUNT = env_bool("RAY_EXPORT_EVENT_MAX_BACKUP_COUNT", 20) | ||
|
|
||
| # Enables emitting events through the Python EventRecorder (One-Event framework) | ||
| # to the AggregatorAgent. When enabled, dashboard modules that support it emit |
There was a problem hiding this comment.
is this specific to dashboard modules? should we have a specific flag just for platform events? (I think we already had some config to enable platform events, can we reuse that?)
There was a problem hiding this comment.
I actually tried to keep this from your PR here #61099 which I think added this env var to control whether the One-Event framework pipeline is used for Python event emission. The other platform specific env var is RAY_DASHBOARD_INGEST_PLATFORM_EVENTS but thats more for whether the k8s events should even be ingested/collected.
There was a problem hiding this comment.
if we have a single flag for all python events then in the future we wont have a way to disable a specific event type. can we instead make the config a list of supported event types and enable Platform_events by default?
| try: | ||
| from ray._raylet import EventRecorder | ||
|
|
||
| head_node_id_bytes = await self.gcs_client.async_internal_kv_get( |
There was a problem hiding this comment.
we can use get_head_node_id() here instead?
There was a problem hiding this comment.
You mean from https://github.com/ray-project/ray/blob/master/python/ray/dashboard/modules/job/utils.py#L37? I could.. but that adds a cross-module dependency from the platform_events module onto the job module which is unrelated...
I could however add it in ray/dashboard/utils.py and make both platform_events and job modules use it, lmk if you prefer that
1e59f09 to
aeeb1e1
Compare
Signed-off-by: Richa Banker <richabanker@google.com>
64e7752 to
07e69ed
Compare


Description
Add support for publishing Platform events via the python ray event exporter framework