Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/ray/autoscaler/v2/tests/test_node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def _get_provider(name, **kwargs):
def test_node_providers_basic(get_provider, provider_name):
# Test launching.
provider = get_provider(name=provider_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increasing timeout as the test was flaky

timeout_s = 30 if provider_name == "fake_multi" else 10
provider.launch(
shape={"worker_nodes": 2},
request_id="1",
Expand All @@ -212,7 +213,7 @@ def verify():
assert nodes_by_type == {"worker_nodes": 4, "worker_nodes1": 1}
return True

wait_for_condition(verify)
wait_for_condition(verify, timeout=timeout_s)

nodes = provider.get_non_terminated().keys()

Expand All @@ -238,7 +239,7 @@ def verify():
assert node.request_id == "4"
return True

wait_for_condition(verify)
wait_for_condition(verify, timeout=timeout_s)


@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ py_test_module_list(
"test_state_api_summary.py",
"test_streaming_generator_regression.py",
"test_system_metrics.py",
"test_task_events_3.py",
"test_task_metrics_reconstruction.py",
"test_top_level_api.py",
"test_tpu.py",
Expand Down Expand Up @@ -876,6 +875,7 @@ py_test_module_list(
"test_ray_get.py",
"test_state_api_2.py",
"test_task_events.py",
"test_task_events_3.py",
"test_unavailable_actors.py",
],
tags = [
Expand Down
28 changes: 28 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,34 @@ def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
server.stop()


@pytest.fixture(scope="function")
def event_routing_config(request, monkeypatch):
"""
fixture to toggle event routing modes.
Modes:
- "default": Uses the existing core_worker to gcs code path.
- "aggregator": Enable publishing events to GCS through the Aggregator agent.
"""
mode = getattr(request, "param", "default")
# clear envs to ensure default behavior
monkeypatch.delenv(
"RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_GCS", raising=False
)
monkeypatch.delenv("RAY_enable_core_worker_ray_event_to_aggregator", raising=False)

if mode == "aggregator":
print("using aggregator mode")
# Enable aggregator path in core worker
monkeypatch.setenv("RAY_enable_core_worker_ray_event_to_aggregator", "1")
# Explicitly disable core worker to GCS so that all events are only sent to GCS once (through the aggregator pathway)
monkeypatch.setenv("RAY_enable_core_worker_task_event_to_gcs", "0")
# Ensure aggregator agent publishes to GCS
monkeypatch.setenv(
"RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISH_EVENTS_TO_GCS", "True"
)
yield


@pytest.fixture
def cleanup_auth_token_env():
"""Reset authentication environment variables, files, and caches."""
Expand Down
Loading