Skip to content

Commit

Permalink
Fix resource bookkeeping bug with acquiring unknown resource. (#4945)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara authored and pcmoritz committed Jun 8, 2019
1 parent 77689d1 commit a82e811
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
14 changes: 12 additions & 2 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,7 @@ def f(n):
def g(n):
time.sleep(n)

time_buffer = 0.5
time_buffer = 2

start_time = time.time()
ray.get([f.remote(0.5), g.remote(0.5)])
Expand Down Expand Up @@ -1878,13 +1878,23 @@ def test(self):
def test_zero_cpus(shutdown_only):
ray.init(num_cpus=0)

# We should be able to execute a task that requires 0 CPU resources.
@ray.remote(num_cpus=0)
def f():
return 1

# The task should be able to execute.
ray.get(f.remote())

# We should be able to create an actor that requires 0 CPU resources.
@ray.remote(num_cpus=0)
class Actor(object):
def method(self):
pass

a = Actor.remote()
x = a.method.remote()
ray.get(x)


def test_zero_cpus_actor(ray_start_cluster):
cluster = ray_start_cluster
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1809,9 +1809,9 @@ bool NodeManager::AssignTask(const Task &task) {
cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources());

if (spec.IsActorCreationTask()) {
// Check that we are not placing an actor creation task on a node with 0 CPUs.
RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetResourceMap().at(
kCPU_ResourceLabel) != 0);
// Check that the actor's placement resource requirements are satisfied.
RAY_CHECK(spec.GetRequiredPlacementResources().IsSubset(
cluster_resource_map_[my_client_id].GetTotalResources()));
worker->SetLifetimeResourceIds(acquired_resources);
} else {
worker->SetTaskResourceIds(acquired_resources);
Expand Down
15 changes: 11 additions & 4 deletions src/ray/raylet/scheduling_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ ResourceSet::ResourceSet() {}

ResourceSet::ResourceSet(
const std::unordered_map<std::string, FractionalResourceQuantity> &resource_map)
: resource_capacity_(resource_map) {}
: resource_capacity_(resource_map) {
for (auto const &resource_pair : resource_map) {
RAY_CHECK(resource_pair.second > 0);
}
}

ResourceSet::ResourceSet(const std::unordered_map<std::string, double> &resource_map) {
for (auto const &resource_pair : resource_map) {
Expand Down Expand Up @@ -169,7 +173,8 @@ void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) {
const std::string &resource_label = resource_pair.first;
const FractionalResourceQuantity &resource_capacity = resource_pair.second;
RAY_CHECK(resource_capacity_.count(resource_label) == 1)
<< "Attempt to acquire unknown resource: " << resource_label;
<< "Attempt to acquire unknown resource: " << resource_label << " capacity "
<< resource_capacity.ToDouble();
resource_capacity_[resource_label] -= resource_capacity;

// Ensure that quantity is positive. Note, we have to have the check before
Expand Down Expand Up @@ -233,8 +238,10 @@ FractionalResourceQuantity ResourceSet::GetResource(

const ResourceSet ResourceSet::GetNumCpus() const {
ResourceSet cpu_resource_set;
cpu_resource_set.resource_capacity_[kCPU_ResourceLabel] =
GetResource(kCPU_ResourceLabel);
const FractionalResourceQuantity cpu_quantity = GetResource(kCPU_ResourceLabel);
if (cpu_quantity > 0) {
cpu_resource_set.resource_capacity_[kCPU_ResourceLabel] = cpu_quantity;
}
return cpu_resource_set;
}

Expand Down

0 comments on commit a82e811

Please sign in to comment.