Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions python/ray/tests/test_placement_group_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,5 +646,57 @@ def check_bundle_leaks():
wait_for_condition(check_bundle_leaks)


def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster):
"""Please refer to https://github.com/ray-project/ray/pull/19911
for more details.
"""
with monkeypatch.context() as m:
# Increase broadcasting interval so that node resource will arrive
# at raylet after local resource all being allocated.
m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000")
m.setenv("RAY_grpc_based_resource_broadcast", "true")
cluster = ray_start_cluster

cluster.add_node(num_cpus=16, object_store_memory=1e9)
cluster.wait_for_nodes()
cluster.add_node(num_cpus=16, num_gpus=1)
cluster.wait_for_nodes()
NUM_CPU_BUNDLES = 30

@ray.remote(num_cpus=1)
class Worker(object):
def __init__(self, i):
self.i = i

def work(self):
time.sleep(0.1)
print("work ", self.i)

@ray.remote(num_cpus=1, num_gpus=1)
class Trainer(object):
def __init__(self, i):
self.i = i

def train(self):
time.sleep(0.2)
print("train ", self.i)

ray.init(address="auto")
bundles = [{"CPU": 1, "GPU": 1}]
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]
pg = placement_group(bundles, strategy="PACK")
ray.get(pg.ready())

# Local resource will be allocated and here we are to ensure
# local view is consistent and node resouce updates are discarded
workers = [
Worker.options(placement_group=pg).remote(i)
for i in range(NUM_CPU_BUNDLES)
]
trainer = Trainer.options(placement_group=pg).remote(0)
ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)])
ray.get(trainer.train.remote())


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources(
const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(INFO) << "Updating resources, node id = " << node_id;
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
for (const auto &entry : request.resources()) {
changed_resources->emplace(entry.first, entry.second.resource_capacity());
Expand Down
56 changes: 36 additions & 20 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,15 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
const ResourceSet &createUpdatedResources) {
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id
<< " with created or updated resources: "
<< createUpdatedResources.ToString() << ". Updating resource map.";
<< createUpdatedResources.ToString() << ". Updating resource map."
<< " skip=" << (node_id == self_node_id_);

// Skip updating local node since local node always has the latest information.
// Updating local node could result in a inconsistence view in cluster resource
// scheduler which could make task hang.
if (node_id == self_node_id_) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding cluster_task_manager_->ScheduleAndDispatchTasks(); here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like some tests are still failing after this PR, and this is the only behavior change I can imagine (since changes below seem to be only applied to grpc resource broadcast). I think calling the function one more time here won't hurt the performance

}

// Update local_available_resources_ and SchedulingResources
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
Expand All @@ -900,11 +908,7 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";

if (node_id == self_node_id_) {
// The resource update is on the local node, check if we can reschedule tasks.
cluster_task_manager_->ScheduleAndDispatchTasks();
}
cluster_task_manager_->ScheduleAndDispatchTasks();
}

void NodeManager::ResourceDeleted(const NodeID &node_id,
Expand All @@ -916,7 +920,14 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
}
RAY_LOG(DEBUG) << "[ResourceDeleted] received callback from node id " << node_id
<< " with deleted resources: " << oss.str()
<< ". Updating resource map.";
<< ". Updating resource map. skip=" << (node_id == self_node_id_);
}

// Skip updating local node since local node always has the latest information.
// Updating local node could result in a inconsistence view in cluster resource
// scheduler which could make task hang.
if (node_id == self_node_id_) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only this line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what is the mechanism that processing resource deletion for this node slows down the test and makes it flaky?

Copy link
Contributor Author

@fishbone fishbone Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, actually it's not slowing down the test. it's hanging and timeout eventually.

Basically, the local resource was deleted before scheduling and raylet found no resource to run the job.

Luckily, I can always reproduce it locally.

return;
}

// Update local_available_resources_ and SchedulingResources
Expand Down Expand Up @@ -1474,39 +1485,44 @@ void NodeManager::HandleUpdateResourceUsage(
rpc::SendReplyCallback send_reply_callback) {
rpc::ResourceUsageBroadcastData resource_usage_batch;
resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch());

if (resource_usage_batch.seq_no() != next_resource_seq_no_) {
// When next_resource_seq_no_ == 0 it means it just started.
// TODO: Fetch a snapshot from gcs for lightweight resource broadcasting
if (next_resource_seq_no_ != 0 &&
resource_usage_batch.seq_no() != next_resource_seq_no_) {
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
RAY_LOG(WARNING)
<< "Raylet may have missed a resource broadcast. This either means that GCS has "
"restarted, the network is heavily congested and is dropping, reordering, or "
"duplicating packets. Expected seq#: "
<< next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << ".";
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
if (resource_usage_batch.seq_no() < next_resource_seq_no_) {
RAY_LOG(WARNING) << "Discard the the resource update since local version is newer";
return;
}
}
next_resource_seq_no_ = resource_usage_batch.seq_no() + 1;

for (const auto &resource_change_or_data : resource_usage_batch.batch()) {
if (resource_change_or_data.has_data()) {
const auto &resource_usage = resource_change_or_data.data();
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
if (node_id == self_node_id_) {
// Skip messages from self.
continue;
auto node_id = NodeID::FromBinary(resource_usage.node_id());
// Skip messages from self.
if (node_id != self_node_id_) {
UpdateResourceUsage(node_id, resource_usage);
}
UpdateResourceUsage(node_id, resource_usage);
} else if (resource_change_or_data.has_change()) {
const auto &resource_notification = resource_change_or_data.change();
auto id = NodeID::FromBinary(resource_notification.node_id());
auto node_id = NodeID::FromBinary(resource_notification.node_id());
if (resource_notification.updated_resources_size() != 0) {
ResourceSet resource_set(
MapFromProtobuf(resource_notification.updated_resources()));
ResourceCreateUpdated(id, resource_set);
ResourceCreateUpdated(node_id, resource_set);
}

if (resource_notification.deleted_resources_size() != 0) {
ResourceDeleted(id,
ResourceDeleted(node_id,
VectorFromProtobuf(resource_notification.deleted_resources()));
}
}
Expand Down