Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/mock/ray/raylet/local_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface {
MOCK_METHOD(size_t, GetNumLeaseSpilled, (), (const, override));
MOCK_METHOD(size_t, GetNumWaitingLeaseSpilled, (), (const, override));
MOCK_METHOD(size_t, GetNumUnschedulableLeaseSpilled, (), (const, override));
MOCK_METHOD(bool,
IsLeaseQueued,
(const SchedulingClass &scheduling_class, const LeaseID &lease_id),
(const, override));
MOCK_METHOD(bool,
AddReplyCallback,
(const SchedulingClass &scheduling_class,
const LeaseID &lease_id,
rpc::SendReplyCallback send_reply_callback,
rpc::RequestWorkerLeaseReply *reply),
(override));
};

} // namespace ray::raylet
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr<GcsActor> actor) {
RayLease lease(
actor->GetLeaseSpecification(),
owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string());
cluster_lease_manager_.QueueAndScheduleLease(std::move(lease),
/*grant_or_reject=*/false,
/*is_selected_based_on_locality=*/false,
/*reply=*/reply.get(),
send_reply_callback);
cluster_lease_manager_.QueueAndScheduleLease(
std::move(lease),
/*grant_or_reject=*/false,
/*is_selected_based_on_locality=*/false,
{ray::raylet::internal::ReplyCallback(std::move(send_reply_callback),
reply.get())});
}

void GcsActorScheduler::ScheduleByRaylet(std::shared_ptr<GcsActor> actor) {
Expand Down
129 changes: 84 additions & 45 deletions src/ray/raylet/local_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ namespace {
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply_;
reply->set_canceled(true);
reply->set_failure_type(failure_type);
reply->set_scheduling_failure_message(scheduling_failure_message);
work->send_reply_callback_(Status::OK(), nullptr, nullptr);
for (const auto &reply_callback : work->reply_callbacks_) {
auto reply = reply_callback.reply_;
reply->set_canceled(true);
reply->set_failure_type(failure_type);
reply->set_scheduling_failure_message(scheduling_failure_message);
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
}
}
} // namespace

Expand Down Expand Up @@ -547,8 +549,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
bool is_detached_actor,
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message) {
const auto &reply = work->reply_;
const auto &send_reply_callback = work->send_reply_callback_;
const auto &reply_callbacks = work->reply_callbacks_;
const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED;
const auto &lease = work->lease_;
bool granted = false;
Expand Down Expand Up @@ -662,12 +663,7 @@ bool LocalLeaseManager::PoppedWorkerHandler(
RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker "
<< worker->WorkerId();

Grant(worker,
leased_workers_,
work->allocated_instances_,
lease,
reply,
send_reply_callback);
Grant(worker, leased_workers_, work->allocated_instances_, lease, reply_callbacks);
erase_from_leases_to_grant_queue_fn(work, scheduling_class);
granted = true;
}
Expand All @@ -677,11 +673,11 @@ bool LocalLeaseManager::PoppedWorkerHandler(

void LocalLeaseManager::Spillback(const NodeID &spillback_to,
const std::shared_ptr<internal::Work> &work) {
auto send_reply_callback = work->send_reply_callback_;

if (work->grant_or_reject_) {
work->reply_->set_rejected(true);
send_reply_callback(Status::OK(), nullptr, nullptr);
for (const auto &reply_callback : work->reply_callbacks_) {
reply_callback.reply_->set_rejected(true);
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
}
return;
}

Expand All @@ -701,13 +697,15 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to,
RAY_CHECK(node_info_ptr)
<< "Spilling back to a node manager, but no GCS info found for node "
<< spillback_to;
auto reply = work->reply_;
reply->mutable_retry_at_raylet_address()->set_ip_address(
node_info_ptr->node_manager_address());
reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port());
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());

send_reply_callback(Status::OK(), nullptr, nullptr);
for (const auto &reply_callback : work->reply_callbacks_) {
auto reply = reply_callback.reply_;
reply->mutable_retry_at_raylet_address()->set_ip_address(
node_info_ptr->node_manager_address());
reply->mutable_retry_at_raylet_address()->set_port(
node_info_ptr->node_manager_port());
reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary());
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
}
}

void LocalLeaseManager::LeasesUnblocked(const std::vector<LeaseID> &ready_ids) {
Expand Down Expand Up @@ -969,8 +967,7 @@ void LocalLeaseManager::Grant(
absl::flat_hash_map<LeaseID, std::shared_ptr<WorkerInterface>> &leased_workers,
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
const RayLease &lease,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const std::vector<internal::ReplyCallback> &reply_callbacks) {
const auto &lease_spec = lease.GetLeaseSpecification();

if (lease_spec.IsActorCreationTask()) {
Expand All @@ -982,11 +979,14 @@ void LocalLeaseManager::Grant(
worker->GrantLease(lease);

// Pass the contact info of the worker to use.
reply->set_worker_pid(worker->GetProcess().GetId());
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
reply->mutable_worker_address()->set_port(worker->Port());
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
reply->mutable_worker_address()->set_node_id(self_node_id_.Binary());
for (const auto &reply_callback : reply_callbacks) {
reply_callback.reply_->set_worker_pid(worker->GetProcess().GetId());
reply_callback.reply_->mutable_worker_address()->set_ip_address(worker->IpAddress());
reply_callback.reply_->mutable_worker_address()->set_port(worker->Port());
reply_callback.reply_->mutable_worker_address()->set_worker_id(
worker->WorkerId().Binary());
reply_callback.reply_->mutable_worker_address()->set_node_id(self_node_id_.Binary());
}

RAY_CHECK(!leased_workers.contains(lease_spec.LeaseId()));
leased_workers[lease_spec.LeaseId()] = worker;
Expand All @@ -1000,26 +1000,29 @@ void LocalLeaseManager::Grant(
} else {
allocated_resources = worker->GetAllocatedInstances();
}
::ray::rpc::ResourceMapEntry *resource;
for (auto &resource_id : allocated_resources->ResourceIds()) {
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
auto instances = allocated_resources->Get(resource_id);
for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) {
if (instances[inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(resource_id.Binary());
first = false;
for (const auto &reply_callback : reply_callbacks) {
::ray::rpc::ResourceMapEntry *resource = nullptr;
for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) {
if (instances[inst_idx] > 0.) {
// Set resource name only if at least one of its instances has available
// capacity.
if (resource == nullptr) {
resource = reply_callback.reply_->add_resource_mapping();
resource->set_name(resource_id.Binary());
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(instances[inst_idx].Double());
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(instances[inst_idx].Double());
}
}
}
// Send the result back.
send_reply_callback(Status::OK(), nullptr, nullptr);
// Send the result back to the clients.
for (const auto &reply_callback : reply_callbacks) {
reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr);
}
}

void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) {
Expand Down Expand Up @@ -1258,5 +1261,41 @@ void LocalLeaseManager::DebugStr(std::stringstream &buffer) const {
}
}

bool LocalLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class,
const LeaseID &lease_id) const {
if (waiting_leases_index_.contains(lease_id)) {
return true;
}
auto leases_to_grant_it = leases_to_grant_.find(scheduling_class);
if (leases_to_grant_it != leases_to_grant_.end()) {
for (const auto &work : leases_to_grant_it->second) {
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
return true;
}
}
}
return false;
}

bool LocalLeaseManager::AddReplyCallback(const SchedulingClass &scheduling_class,
const LeaseID &lease_id,
rpc::SendReplyCallback send_reply_callback,
rpc::RequestWorkerLeaseReply *reply) {
if (leases_to_grant_.contains(scheduling_class)) {
for (const auto &work : leases_to_grant_[scheduling_class]) {
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
return true;
}
}
}
auto it = waiting_leases_index_.find(lease_id);
if (it != waiting_leases_index_.end()) {
(*it->second)->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
return true;
}
return false;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Reply Callbacks Dropped on Missing Leases

The StoreReplyCallback function uses the bracket operator [] to access leases_to_grant_ map without checking if the scheduling_class key exists first. This creates empty entries in the map if the scheduling_class doesn't exist. If a lease is not found in leases_to_grant_ and not in waiting_leases_index_, the function silently returns without storing the reply callback or indicating failure. This can result in reply callbacks being dropped, causing lease requests to not receive responses. The function should use find() instead of operator[] to avoid creating spurious map entries and should handle the error case when a lease is not found.

Fix in Cursor Fix in Web

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real? ^

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Lease Queue Race Condition

A race condition exists between IsLeaseQueued and AddReplyCallback due to their inconsistent search orders for leases. IsLeaseQueued checks waiting_leases_index_ then leases_to_grant_, while AddReplyCallback checks the reverse. This allows a lease to move between queues after IsLeaseQueued returns true, causing AddReplyCallback to fail and trigger a RAY_CHECK in HandleRequestWorkerLease, crashing the Raylet.

Additional Locations (1)

Fix in Cursor Fix in Web


} // namespace raylet
} // namespace ray
11 changes: 9 additions & 2 deletions src/ray/raylet/local_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
return num_unschedulable_lease_spilled_;
}

bool IsLeaseQueued(const SchedulingClass &scheduling_class,
const LeaseID &lease_id) const override;

bool AddReplyCallback(const SchedulingClass &scheduling_class,
const LeaseID &lease_id,
rpc::SendReplyCallback send_reply_callback,
rpc::RequestWorkerLeaseReply *reply) override;

private:
struct SchedulingClassInfo;

Expand Down Expand Up @@ -248,8 +256,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface {
absl::flat_hash_map<LeaseID, std::shared_ptr<WorkerInterface>> &leased_workers_,
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
const RayLease &lease,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback);
const std::vector<internal::ReplyCallback> &reply_callbacks);

void Spillback(const NodeID &spillback_to, const std::shared_ptr<internal::Work> &work);

Expand Down
41 changes: 30 additions & 11 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
rpc::SendReplyCallback send_reply_callback) {
auto lease_id = LeaseID::FromBinary(request.lease_spec().lease_id());
// If the lease is already granted, this is a retry and forward the address of the
// already leased worker to use.
// already leased worker to use
if (leased_workers_.contains(lease_id)) {
const auto &worker = leased_workers_[lease_id];
RAY_LOG(DEBUG) << "Lease " << lease_id
Expand Down Expand Up @@ -1847,9 +1847,6 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
actor_id = lease.GetLeaseSpecification().ActorId();
}

const auto &lease_spec = lease.GetLeaseSpecification();
worker_pool_.PrestartWorkers(lease_spec, request.backlog_size());

auto send_reply_callback_wrapper =
[this, is_actor_creation_task, actor_id, reply, send_reply_callback](
Status status, std::function<void()> success, std::function<void()> failure) {
Expand Down Expand Up @@ -1880,11 +1877,34 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques
send_reply_callback(status, std::move(success), std::move(failure));
};

cluster_lease_manager_.QueueAndScheduleLease(std::move(lease),
request.grant_or_reject(),
request.is_selected_based_on_locality(),
reply,
std::move(send_reply_callback_wrapper));
if (cluster_lease_manager_.IsLeaseQueued(
lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) {
RAY_CHECK(cluster_lease_manager_.AddReplyCallback(
lease.GetLeaseSpecification().GetSchedulingClass(),
lease_id,
std::move(send_reply_callback_wrapper),
reply));
return;
}

if (local_lease_manager_.IsLeaseQueued(
lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) {
RAY_CHECK(local_lease_manager_.AddReplyCallback(
lease.GetLeaseSpecification().GetSchedulingClass(),
lease_id,
std::move(send_reply_callback_wrapper),
reply));
return;
}

const auto &lease_spec = lease.GetLeaseSpecification();
worker_pool_.PrestartWorkers(lease_spec, request.backlog_size());

cluster_lease_manager_.QueueAndScheduleLease(
std::move(lease),
request.grant_or_reject(),
request.is_selected_based_on_locality(),
{internal::ReplyCallback(std::move(send_reply_callback_wrapper), reply)});
}

void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request,
Expand Down Expand Up @@ -2200,8 +2220,7 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request,
cluster_lease_manager_.QueueAndScheduleLease(work->lease_,
work->grant_or_reject_,
work->is_selected_based_on_locality_,
work->reply_,
work->send_reply_callback_);
work->reply_callbacks_);
}
}
}
Expand Down
Loading