Skip to content

Commit

Permalink
[data] fix bugs introduced by autoscaler refactor (ray-project#45200)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Fix following bugs introduced by ray-project#45002:
* `autoscaler.try_trigger_scaling` not called when `select_op_to_run`
returns None.
* scaling up condition on `under_resource_limits`.

`python/ray/data/tests/test_streaming_integration.py::test_e2e_autoscaling_up`
should pass after this fix.

## Related issue number

Closes ray-project#43481

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
  • Loading branch information
raulchen authored and ryanaoleary committed Jun 6, 2024
1 parent 0661682 commit 77f9f9c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def _actor_pool_should_scale_up(
elif actor_pool.current_size() >= actor_pool.max_size():
# Do not scale up, if the actor pool is already at max size.
return False
# Do not scale up, if the op still has enough resources to run.
if op_state._scheduling_status.under_resource_limits:
# Do not scale up, if the op does not have more resources.
if not op_state._scheduling_status.under_resource_limits:
return False
# Do not scale up, if the op has enough free slots for the existing inputs.
if op_state.num_queued() <= actor_pool.num_free_task_slots():
Expand Down
27 changes: 13 additions & 14 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ def select_operator_to_run(
):
ops.append(op)
op_runnable = True
# Update scheduling status
state._scheduling_status = OpSchedulingStatus(
selected=False,
runnable=op_runnable,
Expand All @@ -564,20 +565,18 @@ def select_operator_to_run(
if state.num_queued() > 0 and not op.completed()
]

# Nothing to run.
if not ops:
return None

# Run metadata-only operators first. After that, choose the operator with the least
# memory usage.
selected_op = min(
ops,
key=lambda op: (
not op.throttling_disabled(),
resource_manager.get_op_usage(op).object_store_memory,
),
)
topology[selected_op]._scheduling_status.selected = True
selected_op = None
if ops:
# Run metadata-only operators first. After that, choose the operator with the
# least memory usage.
selected_op = min(
ops,
key=lambda op: (
not op.throttling_disabled(),
resource_manager.get_op_usage(op).object_store_memory,
),
)
topology[selected_op]._scheduling_status.selected = True
autoscaler.try_trigger_scaling()
return selected_op

Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_actor_pool_scaling():
internal_queue_size=MagicMock(return_value=1),
)
op_state = MagicMock(num_queued=MagicMock(return_value=10))
op_scheduling_status = MagicMock(under_resource_limits=False)
op_scheduling_status = MagicMock(under_resource_limits=True)
op_state._scheduling_status = op_scheduling_status

@contextmanager
Expand Down Expand Up @@ -85,11 +85,11 @@ def assert_should_scale_up(expected):
with patch(op, "internal_queue_size", 0):
assert_should_scale_up(False)

# Shouldn't scale up since the op is under resource limits.
# Shouldn't scale up since the op doesn't have enough resources.
with patch(
op_scheduling_status,
"under_resource_limits",
True,
False,
is_method=False,
):
assert_should_scale_up(False)
Expand Down

0 comments on commit 77f9f9c

Please sign in to comment.