Skip to content

[core] Tasks are not spilled back when waiting for dependencies, even when there are remote resources available #14094

@rkooo567

Description

@rkooo567

What is the problem?

I just found it while I was looking at code. Probably not a high priority to fix. Created an issue for tracking purposes.

It looks like if tasks are queued because of admission control, they are not spilled back although there are resources in other nodes. I think it is expected because we don't take into account of the object store memory when we spillback tasks.

This can be solved if we can fill up the object-store memory usage of each task automatically (I think this is possible since when we pull, we'd know the object sizes of all dependencies).

Imagine this scenario.

  1. There are 3 nodes A, B, and C.
  2. Let's imagine there's a big object created at node C by a task. (80MB dependency)
  3. Imagine node A has 4 cpus and 100MB object store memory. and node B has 8 cpus with 1000MB object store memory.
nodeA: {CPU:0/4, object_store_memory: 0/100MB}
nodeB: {CPU: 0/8, object_store_memory: 0/1000MB}
  1. Now, create two large dependencies, 80MB each at node C. Let's call it dep1 and dep2.
  2. Schedule the first long-running task with dep1 to node A. Note that the object store memory of node A is now filled, and it cannot pull any more objects by admission control.
nodeA: {CPU:3/4, object_store_memory: 80MB/100MB}
nodeB: {CPU: 0/8, object_store_memory: 0/1000MB}
  1. When we schedule long-running tasks with dep2, they are supposed to be scheduled on node B because node B has enough resources. (It has plenty of CPUs and object store memory to schedule tasks with dep2, 80MB).
  2. With the reproduction script underneath, you can verify they are not scheduled. Tasks are not spilled back from the head node.
  3. You can verify tasks are correctly spilled back if there's no admission control involved (for example, increase the head node object store memory to 1000MB).

Reproduction (REQUIRED)

Please provide a short code snippet (less than 50 lines if possible) that can be copy-pasted to reproduce the issue. The snippet should have no external library dependencies (i.e., use fake or mock data / environments):

  • run the script with a node a object store memory 100MB. You can see only 1 CPU - 5 CPUs is utilized (idk when 5 CPUs are utilized. Most of the time, only 1 CPU is utilized).
  • If you increase the memory to 1000MB in a head node, 9 CPUs are utilized (expected behavior).
from ray.cluster_utils import Cluster
import ray
import numpy as np
import time

cluster = Cluster()

# Add a head node. This will work correctly if the object store memory is 1000MB. Not working if it is 100MB.
node_a = cluster.add_node(num_cpus=4, object_store_memory=100 * 1024 * 1024, resources={"head": 1.0})
cluster.wait_for_nodes()
ray.init(address=cluster.address)

# This worker node should have enough memory to run many tasks
node_b = cluster.add_node(num_cpus=8, object_store_memory=1000*1024*1024)
# This worker node will create a large dependency that is pulled to the head node. It won't do anything other than that.
node_c = cluster.add_node(num_cpus=0, object_store_memory=1000*1024*1024, resources={"worker_for_object":1.0})
cluster.wait_for_nodes()

@ray.remote(resources={"head":0.1})
def task_with_big_object_dep(dep):
    import time
    print(f"dep1 task is scheduled on {ray.get_runtime_context().node_id}")
    time.sleep(30)

@ray.remote
def task_with_big_object_dep2(dep):
    import time
    print(f"dep2 task is scheduled on {ray.get_runtime_context().node_id}")
    time.sleep(30)

@ray.remote(num_cpus=0, resources={"worker_for_object":1.0})
def create_dep_in_worker():
    arr = np.zeros(80 * 1024 * 1024, dtype=np.uint8)  # 80 MB
    return arr

dep = create_dep_in_worker.remote()
dep2 = create_dep_in_worker.remote()

# Head node {CPU:1/4, object_store_memory:80 MB/100 MB}
# Worker node {CPU:0/8, object_store_memory:0/1000 MB}
head_node_task = task_with_big_object_dep.remote(dep)
time.sleep(3)

# Now schedule tasks with the second dependency.
# These tasks should be scheduled in the node B because it has tons of memory & CPUs
# Ideal status:
# Head node {CPU:1/4, object_store_memory:80 MB/100 MB} -> Cannot schedule more task because the arg is pinned in memory.
# worker node: {CPU: 8/8, object_store_memory: 640 MB / 1000 MB}
# This means the total cpu allocation should be 9 / 12
worker_node_tasks = [task_with_big_object_dep2.remote(dep2) for _ in range(8)]

# See the resource usages.
print(f"Head node node id: {node_a.raylet_socket_name}")
print(f"Worker node node id: {node_b.raylet_socket_name}")
while True:
    time.sleep(1)
    print(f"CPU usages: {ray.cluster_resources()['CPU'] - ray.available_resources()['CPU']}/{ray.cluster_resources()['CPU']}")
  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

Metadata

Metadata

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn't

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions