Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries in dynamic resource test cases #5550

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
128 changes: 90 additions & 38 deletions python/ray/tests/test_dynres.py
Expand Up @@ -11,6 +11,48 @@

logger = logging.getLogger(__name__)

TIMEOUT_RESOURCE_UPDATE = 1 # seconds


def retry_until(condition, timeout):
# Helper method to retry a condition till it is true or a timeout elapses.
# Returns true if the condition was true before the timeout,
# False if the timeout elapses.
SLEEP_DURATION = 0.1 # seconds
time_elapsed = 0
while time_elapsed <= timeout:
if condition():
return True
time.sleep(SLEEP_DURATION)
time_elapsed += SLEEP_DURATION
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted this method from test_release_cpus_when_actor_creation_task_blocking, but I'm not sure if this is the best way to check for a timeout. Should we be using time_elapsed = time.time() - start_time? Otherwise this is equivalent to having a fixed number of retries.

return False


def resource_condition(resource_name,
expected_capacity,
resource_getter=ray.available_resources):
# This method generates a lambda which can be used with the retry_until
# method for checking a resource's availability and capacity. The
# resource_getter argument is a callable which returns a dictionary,
# usually ray.available_resources or ray.cluster_resources.
def condition(res_name, exp_capacity, resource_getter):
resources = resource_getter()
if resource_name not in resources:
return False
return resources[res_name] == exp_capacity

return lambda: condition(resource_name, expected_capacity, resource_getter)


def node_resource_getter_generator(target_node_id):
def node_resource_getter(target_node_id):
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
return resources

return lambda: node_resource_getter(target_node_id)


def test_dynamic_res_creation(ray_start_regular):
# This test creates a resource locally (without specifying the client_id)
Expand All @@ -23,11 +65,12 @@ def set_res(resource_name, resource_capacity):

ray.get(set_res.remote(res_name, res_capacity))

available_res = ray.available_resources()
cluster_res = ray.cluster_resources()

assert available_res[res_name] == res_capacity
assert cluster_res[res_name] == res_capacity
available_res_condition = resource_condition(res_name, res_capacity,
ray.available_resources)
cluster_res_condition = resource_condition(res_name, res_capacity,
ray.cluster_resources)
assert retry_until(available_res_condition, TIMEOUT_RESOURCE_UPDATE)
assert retry_until(cluster_res_condition, TIMEOUT_RESOURCE_UPDATE)


def test_dynamic_res_deletion(shutdown_only):
Expand Down Expand Up @@ -69,8 +112,9 @@ def f():
oid = remote_task.remote() # This is infeasible
ray.get(set_res.remote(res_name, res_capacity)) # Now should be feasible

available_res = ray.available_resources()
assert available_res[res_name] == res_capacity
available_res_condition = resource_condition(res_name, res_capacity,
ray.available_resources)
assert retry_until(available_res_condition, TIMEOUT_RESOURCE_UPDATE)

successful, unsuccessful = ray.wait([oid], timeout=1)
assert successful # The task completed
Expand Down Expand Up @@ -102,12 +146,11 @@ def set_res(resource_name, resource_capacity, client_id):
new_capacity = res_capacity + 1
ray.get(set_res.remote(res_name, new_capacity, target_node_id))

target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
node_resource_getter = node_resource_getter_generator(target_node_id)

assert res_name in resources
assert resources[res_name] == new_capacity
node_res_condition = resource_condition(res_name, new_capacity,
node_resource_getter)
assert retry_until(node_res_condition, TIMEOUT_RESOURCE_UPDATE)


def test_dynamic_res_creation_clientid(ray_start_cluster):
Expand All @@ -130,12 +173,11 @@ def set_res(resource_name, resource_capacity, res_client_id):
resource_name, resource_capacity, client_id=res_client_id)

ray.get(set_res.remote(res_name, res_capacity, target_node_id))
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]

assert res_name in resources
assert resources[res_name] == res_capacity
node_resource_getter = node_resource_getter_generator(target_node_id)
node_res_condition = resource_condition(res_name, res_capacity,
node_resource_getter)
assert retry_until(node_res_condition, TIMEOUT_RESOURCE_UPDATE)


def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
Expand Down Expand Up @@ -170,10 +212,12 @@ def set_res(resource_name, resource_capacity, res_client_id):
while time.time() - start_time < TIMEOUT and not success:
resources_created = []
for nid in target_node_ids:
target_node = next(
node for node in ray.nodes() if node["NodeID"] == nid)
resources = target_node["Resources"]
resources_created.append(resources[res_name] == res_capacity)
node_resource_getter = node_resource_getter_generator(nid)
node_res_condition = resource_condition(res_name, res_capacity,
node_resource_getter)
was_successful = retry_until(node_res_condition,
TIMEOUT_RESOURCE_UPDATE)
resources_created.append(was_successful)
success = all(resources_created)
assert success

Expand Down Expand Up @@ -277,7 +321,10 @@ def set_res(resource_name, resource_capacity, res_client_id):
# Create the resource on node1
target_node_id = node_ids[1]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity

cluster_res_condition = resource_condition(res_name, res_capacity,
ray.cluster_resources)
assert retry_until(cluster_res_condition, TIMEOUT_RESOURCE_UPDATE)

# Delete the resource
ray.get(delete_res.remote(res_name, target_node_id))
Expand Down Expand Up @@ -325,7 +372,10 @@ def set_res(resource_name, resource_capacity, res_client_id):

# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity

cluster_res_condition = resource_condition(res_name, res_capacity,
ray.cluster_resources)
assert retry_until(cluster_res_condition, TIMEOUT_RESOURCE_UPDATE)

# Task to hold the resource till the driver signals to finish
@ray.remote
Expand Down Expand Up @@ -367,7 +417,10 @@ def test_func():
}) # This should be infeasible
successful, unsuccessful = ray.wait([task_3], timeout=TIMEOUT_DURATION)
assert unsuccessful # The task did not complete because it's infeasible
assert ray.available_resources()[res_name] == updated_capacity

available_res_condition = resource_condition(res_name, updated_capacity,
ray.available_resources)
assert retry_until(available_res_condition, TIMEOUT_RESOURCE_UPDATE)


def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
Expand Down Expand Up @@ -404,7 +457,10 @@ def set_res(resource_name, resource_capacity, res_client_id):

# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity

cluster_res_condition = resource_condition(res_name, res_capacity,
ray.cluster_resources)
assert retry_until(cluster_res_condition, TIMEOUT_RESOURCE_UPDATE)

# Task to hold the resource till the driver signals to finish
@ray.remote
Expand Down Expand Up @@ -446,7 +502,10 @@ def test_func():
}) # This should be infeasible
successful, unsuccessful = ray.wait([task_3], timeout=TIMEOUT_DURATION)
assert unsuccessful # The task did not complete because it's infeasible
assert ray.available_resources()[res_name] == updated_capacity

available_res_condition = resource_condition(res_name, updated_capacity,
ray.available_resources)
assert retry_until(available_res_condition, TIMEOUT_RESOURCE_UPDATE)


def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
Expand Down Expand Up @@ -486,7 +545,10 @@ def delete_res(resource_name, res_client_id):

# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity

cluster_res_condition = resource_condition(res_name, res_capacity,
ray.cluster_resources)
assert retry_until(cluster_res_condition, TIMEOUT_RESOURCE_UPDATE)

# Task to hold the resource till the driver signals to finish
@ray.remote
Expand Down Expand Up @@ -590,18 +652,8 @@ def get_num(self):
a = A.remote()
assert 100 == ray.get(a.get_num.remote())

def wait_until(condition, timeout_ms):
SLEEP_DURATION_MS = 100
time_elapsed = 0
while time_elapsed <= timeout_ms:
if condition():
return True
time.sleep(SLEEP_DURATION_MS)
time_elapsed += SLEEP_DURATION_MS
return False

def assert_available_resources():
return 1 == ray.available_resources()["CPU"]

result = wait_until(assert_available_resources, 1000)
result = retry_until(assert_available_resources, TIMEOUT_RESOURCE_UPDATE)
assert result is True