Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Cheng Su <scnju13@gmail.com>
  • Loading branch information
c21 committed Jan 29, 2024
1 parent 6e70338 commit 06f5d4a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy that caps the concurrency of each operator.
The concurrency cap limits the number of concurrently running tasks.
The concrete stategy is as follows:
- Each PhysicalOperator is assigned a concurrency cap.
- An PhysicalOperator can run new tasks if the number of running tasks is less
than the cap.
The policy will limit the number of concurrently running tasks based on its
concurrency cap parameter.
NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now.
TODO(chengsu): Consolidate with actor scaling logic of `ActorPoolMapOperator`.
"""

def __init__(self, topology: "Topology"):
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def test_e2e_normal(self):
N = self.__class__._cluster_cpus
ds = ray.data.range(N, parallelism=N)
# Use different `num_cpus` to make sure they don't fuse.
ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1)
ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1)
ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1)
ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1)
res = ds.take_all()
self.assertEqual(len(res), N)

Expand Down

0 comments on commit 06f5d4a

Please sign in to comment.