Skip to content

[Data] Streamlining DefaultActorPoolAutoscaler#61385

Merged
alexeykudinkin merged 5 commits intomasterfrom
ak/act-ascl-clup
Mar 13, 2026
Merged

[Data] Streamlining DefaultActorPoolAutoscaler#61385
alexeykudinkin merged 5 commits intomasterfrom
ak/act-ascl-clup

Conversation

@alexeykudinkin
Copy link
Contributor

Description

  1. Inlining ActorPoolResizingPolicy
  2. Rebasing _ActorPool to compute utilization based on all actors, not just running
  3. Allow autoscaler to scale up while pending actors are still starting up
  4. Updated tests

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner February 27, 2026 18:10
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Feb 27, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the DefaultActorPoolAutoscaler by inlining the ActorPoolResizingPolicy, which simplifies the structure. It also improves the autoscaling logic by calculating utilization based on all actors (including pending ones) and allowing scale-up operations even when there are pending actors. The changes are well-reasoned and the accompanying test updates are thorough. I have a few suggestions to improve code clarity and fix a potential issue in one of the tests.

assert actor_pool.get_pool_util() == 1.5
assert_autoscaling_action(
delta=1,
delta=5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The expected delta of 5 seems incorrect. Given the test setup, actor_pool_max_upscaling_delta defaults to 1. The calculated scale-up delta is ceil(10 * (1.5 - 1)) = 5, but this should be capped by max_scale_up, which evaluates to min(inf, 1, 5) = 1. Therefore, the final delta should be 1. Please verify the logic or update the test expectation.

Suggested change
delta=5,
delta=1,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

delta = min(delta, max_scale_up)
delta = max(1, delta) # At least scale up by 1

delta = self._compute_upscale_delta(actor_pool, op_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The op_state parameter is unused in the _compute_upscale_delta method (defined at line 230). Consider removing it from the method signature and this call site for cleaner code.

Suggested change
delta = self._compute_upscale_delta(actor_pool, op_state)
delta = self._compute_upscale_delta(actor_pool)

Comment on lines +255 to +264
def _estimate_expected_tasks(
op_state: OpState,
) -> float:
# Each task consumes `average_num_inputs_per_task` input blocks on average,
# so the total expected number of tasks:
#
# ceil(num enqueued blocks / avg_inputs_per_task)
#
avg_input_blocks_per_task = op_state.op.metrics.average_num_inputs_per_task or 1
return math.ceil(op_state.total_enqueued_input_blocks() / avg_input_blocks_per_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function _estimate_expected_tasks estimates a number of tasks but returns a float. Since tasks are discrete units, it would be more idiomatic and type-safe to return an int.

Suggested change
def _estimate_expected_tasks(
op_state: OpState,
) -> float:
# Each task consumes `average_num_inputs_per_task` input blocks on average,
# so the total expected number of tasks:
#
# ceil(num enqueued blocks / avg_inputs_per_task)
#
avg_input_blocks_per_task = op_state.op.metrics.average_num_inputs_per_task or 1
return math.ceil(op_state.total_enqueued_input_blocks() / avg_input_blocks_per_task)
def _estimate_expected_tasks(
op_state: OpState,
) -> int:
# Each task consumes `average_num_inputs_per_task` input blocks on average,
# so the total expected number of tasks:
#
# ceil(num enqueued blocks / avg_inputs_per_task)
#
avg_input_blocks_per_task = op_state.op.metrics.average_num_inputs_per_task or 1
return int(math.ceil(op_state.total_enqueued_input_blocks() / avg_input_blocks_per_task))

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Feb 27, 2026
)

def _compute_upscale_delta(
self, actor_pool: AutoscalingActorPool, op_state: OpState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

op_state is not being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… running;

Removed dead methods;

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) March 13, 2026 21:28
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

return math.ceil(
actor_pool.current_size()
* (actor_pool.get_pool_util() / self._actor_pool_scaling_up_threshold - 1)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused op_state parameter in _compute_upscale_delta

Low Severity

The op_state parameter in _compute_upscale_delta is accepted but never used in the function body. The function only uses actor_pool and self._actor_pool_scaling_up_threshold. This was confirmed in the PR discussion, where the author acknowledged the issue and said they would fix it.

Fix in Cursor Fix in Web

@alexeykudinkin alexeykudinkin merged commit f1821c1 into master Mar 13, 2026
6 of 7 checks passed
@alexeykudinkin alexeykudinkin deleted the ak/act-ascl-clup branch March 13, 2026 22:09
@ayushk7102 ayushk7102 mentioned this pull request Mar 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants