Skip to content

Commit

Permalink
[Data] Fix unit tests in test_streaming_integration.py (#41623)
Browse files Browse the repository at this point in the history
This PR is to fix the unit tests failure (was marked as flaky so not shown up in previous PR) - #41461.

Signed-off-by: Cheng Su <scnju13@gmail.com>
  • Loading branch information
c21 committed Dec 5, 2023
1 parent 94bd29e commit cde9bf7
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions python/ray/data/tests/test_streaming_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,14 +441,19 @@ def get_max_waiters(self):

b1 = Barrier.remote(6)

def barrier1(x):
ray.get(b1.wait.remote(), timeout=10)
return x
class BarrierWaiter:
def __init__(self, barrier):
self._barrier = barrier

def __call__(self, x):
ray.get(self._barrier.wait.remote(), timeout=10)
return x

# Tests that we autoscale up to necessary size.
# 6 tasks + 1 tasks in flight per actor => need at least 6 actors to run.
ray.data.range(6, parallelism=6).map_batches(
barrier1,
BarrierWaiter,
fn_constructor_args=(b1,),
compute=ray.data.ActorPoolStrategy(
min_size=1, max_size=6, max_tasks_in_flight_per_actor=1
),
Expand All @@ -458,14 +463,11 @@ def barrier1(x):

b2 = Barrier.remote(3, delay=2)

def barrier2(x):
ray.get(b2.wait.remote(), timeout=10)
return x

# Tests that we don't over-scale up.
# 6 tasks + 2 tasks in flight per actor => only scale up to 3 actors
ray.data.range(6, parallelism=6).map_batches(
barrier2,
BarrierWaiter,
fn_constructor_args=(b2,),
compute=ray.data.ActorPoolStrategy(
min_size=1, max_size=3, max_tasks_in_flight_per_actor=2
),
Expand All @@ -476,30 +478,29 @@ def barrier2(x):
# Tests that the max pool size is respected.
b3 = Barrier.remote(6)

def barrier3(x):
ray.get(b3.wait.remote(), timeout=2)
return x

# This will hang, since the actor pool is too small.
with pytest.raises(ray.exceptions.RayTaskError):
ray.data.range(6, parallelism=6).map(
barrier3, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=2)
BarrierWaiter,
fn_constructor_args=(b3,),
compute=ray.data.ActorPoolStrategy(min_size=1, max_size=2),
).take_all()


def test_e2e_autoscaling_down(ray_start_10_cpus_shared, restore_data_context):
DataContext.get_current().new_execution_backend = True
DataContext.get_current().use_streaming_executor = True

def f(x):
time.sleep(1)
return x
class UDFClass:
def __call__(self, x):
time.sleep(1)
return x

# Tests that autoscaling works even when resource constrained via actor killing.
# To pass this, we need to autoscale down to free up slots for task execution.
DataContext.get_current().execution_options.resource_limits.cpu = 2
ray.data.range(5, parallelism=5).map_batches(
f,
UDFClass,
compute=ray.data.ActorPoolStrategy(min_size=1, max_size=2),
batch_size=None,
).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all()
Expand All @@ -522,24 +523,25 @@ def test_streaming_fault_tolerance(ray_start_10_cpus_shared, restore_data_contex
DataContext.get_current().new_execution_backend = True
DataContext.get_current().use_streaming_executor = True

def f(x):
import os
class RandomExit:
def __call__(self, x):
import os

if random.random() > 0.9:
print("force exit")
os._exit(1)
return x
if random.random() > 0.9:
print("force exit")
os._exit(1)
return x

# Test recover.
base = ray.data.range(1000, parallelism=100)
ds1 = base.map_batches(
f, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999
RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999
)
ds1.take_all()

# Test disabling fault tolerance.
ds2 = base.map_batches(
f, compute=ray.data.ActorPoolStrategy(size=4), max_restarts=0
RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_restarts=0
)
with pytest.raises(ray.exceptions.RayActorError):
ds2.take_all()
Expand Down

0 comments on commit cde9bf7

Please sign in to comment.