From ce3f433c6b28bb3ff47ce3bd13e2a554792601dc Mon Sep 17 00:00:00 2001 From: joshlee Date: Wed, 29 Oct 2025 00:55:50 +0000 Subject: [PATCH 1/7] Fix idempotency issues in RequestWorkerLease for scheduled leases Signed-off-by: joshlee --- src/mock/ray/raylet/local_lease_manager.h | 11 ++ src/ray/gcs/gcs_actor_scheduler.cc | 11 +- src/ray/raylet/local_lease_manager.cc | 129 +++++++++++------- src/ray/raylet/local_lease_manager.h | 11 +- src/ray/raylet/node_manager.cc | 41 ++++-- .../scheduling/cluster_lease_manager.cc | 82 ++++++++--- .../raylet/scheduling/cluster_lease_manager.h | 27 +++- .../cluster_lease_manager_interface.h | 30 +++- src/ray/raylet/scheduling/internal.h | 19 ++- .../local_lease_manager_interface.h | 15 +- src/ray/raylet/tests/node_manager_test.cc | 78 ++++++++++- 11 files changed, 349 insertions(+), 105 deletions(-) diff --git a/src/mock/ray/raylet/local_lease_manager.h b/src/mock/ray/raylet/local_lease_manager.h index bb771dd5a6c8..69522f14cd7e 100644 --- a/src/mock/ray/raylet/local_lease_manager.h +++ b/src/mock/ray/raylet/local_lease_manager.h @@ -79,6 +79,17 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface { MOCK_METHOD(size_t, GetNumLeaseSpilled, (), (const, override)); MOCK_METHOD(size_t, GetNumWaitingLeaseSpilled, (), (const, override)); MOCK_METHOD(size_t, GetNumUnschedulableLeaseSpilled, (), (const, override)); + MOCK_METHOD(bool, + IsLeaseQueued, + (const SchedulingClass &scheduling_class, const LeaseID &lease_id), + (const, override)); + MOCK_METHOD(void, + StoreReplyCallback, + (const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply), + (override)); }; } // namespace ray::raylet diff --git a/src/ray/gcs/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_actor_scheduler.cc index ec9ca2a56a56..805350f76285 100644 --- a/src/ray/gcs/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_actor_scheduler.cc @@ -101,11 +101,12 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { RayLease lease( actor->GetLeaseSpecification(), owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); - cluster_lease_manager_.QueueAndScheduleLease(std::move(lease), - /*grant_or_reject=*/false, - /*is_selected_based_on_locality=*/false, - /*reply=*/reply.get(), - send_reply_callback); + cluster_lease_manager_.QueueAndScheduleLease( + std::move(lease), + /*grant_or_reject=*/false, + /*is_selected_based_on_locality=*/false, + {ray::raylet::internal::ReplyCallback(std::move(send_reply_callback), + reply.get())}); } void GcsActorScheduler::ScheduleByRaylet(std::shared_ptr actor) { diff --git a/src/ray/raylet/local_lease_manager.cc b/src/ray/raylet/local_lease_manager.cc index d2b586a71933..9dacc910f5fc 100644 --- a/src/ray/raylet/local_lease_manager.cc +++ b/src/ray/raylet/local_lease_manager.cc @@ -36,11 +36,13 @@ namespace { void ReplyCancelled(const std::shared_ptr &work, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { - auto reply = work->reply_; - reply->set_canceled(true); - reply->set_failure_type(failure_type); - reply->set_scheduling_failure_message(scheduling_failure_message); - work->send_reply_callback_(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work->reply_callbacks_) { + auto reply = reply_callback.reply_; + reply->set_canceled(true); + reply->set_failure_type(failure_type); + reply->set_scheduling_failure_message(scheduling_failure_message); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } } } // namespace @@ -547,8 +549,7 @@ bool LocalLeaseManager::PoppedWorkerHandler( bool is_detached_actor, const rpc::Address &owner_address, const std::string &runtime_env_setup_error_message) { - const auto &reply = work->reply_; - const auto &send_reply_callback = work->send_reply_callback_; + const auto &reply_callbacks = work->reply_callbacks_; const bool canceled = work->GetState() == internal::WorkStatus::CANCELLED; const auto &lease = work->lease_; bool granted = false; @@ -662,12 +663,7 @@ bool LocalLeaseManager::PoppedWorkerHandler( RAY_LOG(DEBUG) << "Granting lease " << lease_id << " to worker " << worker->WorkerId(); - Grant(worker, - leased_workers_, - work->allocated_instances_, - lease, - reply, - send_reply_callback); + Grant(worker, leased_workers_, work->allocated_instances_, lease, reply_callbacks); erase_from_leases_to_grant_queue_fn(work, scheduling_class); granted = true; } @@ -677,11 +673,11 @@ bool LocalLeaseManager::PoppedWorkerHandler( void LocalLeaseManager::Spillback(const NodeID &spillback_to, const std::shared_ptr &work) { - auto send_reply_callback = work->send_reply_callback_; - if (work->grant_or_reject_) { - work->reply_->set_rejected(true); - send_reply_callback(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work->reply_callbacks_) { + reply_callback.reply_->set_rejected(true); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } return; } @@ -701,13 +697,15 @@ void LocalLeaseManager::Spillback(const NodeID &spillback_to, RAY_CHECK(node_info_ptr) << "Spilling back to a node manager, but no GCS info found for node " << spillback_to; - auto reply = work->reply_; - reply->mutable_retry_at_raylet_address()->set_ip_address( - node_info_ptr->node_manager_address()); - reply->mutable_retry_at_raylet_address()->set_port(node_info_ptr->node_manager_port()); - reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); - - send_reply_callback(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work->reply_callbacks_) { + auto reply = reply_callback.reply_; + reply->mutable_retry_at_raylet_address()->set_ip_address( + node_info_ptr->node_manager_address()); + reply->mutable_retry_at_raylet_address()->set_port( + node_info_ptr->node_manager_port()); + reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } } void LocalLeaseManager::LeasesUnblocked(const std::vector &ready_ids) { @@ -969,8 +967,7 @@ void LocalLeaseManager::Grant( absl::flat_hash_map> &leased_workers, const std::shared_ptr &allocated_instances, const RayLease &lease, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) { + const std::vector &reply_callbacks) { const auto &lease_spec = lease.GetLeaseSpecification(); if (lease_spec.IsActorCreationTask()) { @@ -982,11 +979,14 @@ void LocalLeaseManager::Grant( worker->GrantLease(lease); // Pass the contact info of the worker to use. - reply->set_worker_pid(worker->GetProcess().GetId()); - reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); - reply->mutable_worker_address()->set_port(worker->Port()); - reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary()); - reply->mutable_worker_address()->set_node_id(self_node_id_.Binary()); + for (const auto &reply_callback : reply_callbacks) { + reply_callback.reply_->set_worker_pid(worker->GetProcess().GetId()); + reply_callback.reply_->mutable_worker_address()->set_ip_address(worker->IpAddress()); + reply_callback.reply_->mutable_worker_address()->set_port(worker->Port()); + reply_callback.reply_->mutable_worker_address()->set_worker_id( + worker->WorkerId().Binary()); + reply_callback.reply_->mutable_worker_address()->set_node_id(self_node_id_.Binary()); + } RAY_CHECK(!leased_workers.contains(lease_spec.LeaseId())); leased_workers[lease_spec.LeaseId()] = worker; @@ -1000,26 +1000,28 @@ void LocalLeaseManager::Grant( } else { allocated_resources = worker->GetAllocatedInstances(); } - ::ray::rpc::ResourceMapEntry *resource; - for (auto &resource_id : allocated_resources->ResourceIds()) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - auto instances = allocated_resources->Get(resource_id); - for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) { - if (instances[inst_idx] > 0.) { - if (first) { - resource = reply->add_resource_mapping(); - resource->set_name(resource_id.Binary()); - first = false; + for (const auto &reply_callback : reply_callbacks) { + ::ray::rpc::ResourceMapEntry *resource; + for (auto &resource_id : allocated_resources->ResourceIds()) { + bool first = true; // Set resource name only if at least one of its + // instances has available capacity. + auto instances = allocated_resources->Get(resource_id); + for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) { + if (instances[inst_idx] > 0.) { + if (first) { + resource = reply_callback.reply_->add_resource_mapping(); + resource->set_name(resource_id.Binary()); + first = false; + } + auto rid = resource->add_resource_ids(); + rid->set_index(inst_idx); + rid->set_quantity(instances[inst_idx].Double()); } - auto rid = resource->add_resource_ids(); - rid->set_index(inst_idx); - rid->set_quantity(instances[inst_idx].Double()); } } + // Send the result back to the client. + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); } - // Send the result back. - send_reply_callback(Status::OK(), nullptr, nullptr); } void LocalLeaseManager::ClearWorkerBacklog(const WorkerID &worker_id) { @@ -1258,5 +1260,38 @@ void LocalLeaseManager::DebugStr(std::stringstream &buffer) const { } } +bool LocalLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const { + if (waiting_leases_index_.contains(lease_id)) { + return true; + } + auto leases_to_grant_it = leases_to_grant_.find(scheduling_class); + if (leases_to_grant_it != leases_to_grant_.end()) { + for (const auto &work : leases_to_grant_it->second) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + return true; + } + } + } + return false; +} + +void LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) { + for (const auto &work : leases_to_grant_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return; + } + } + auto it = waiting_leases_index_.find(lease_id); + if (it != waiting_leases_index_.end()) { + (*it->second)->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return; + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/local_lease_manager.h b/src/ray/raylet/local_lease_manager.h index c2fa2e8e1c52..c71446d6ed01 100644 --- a/src/ray/raylet/local_lease_manager.h +++ b/src/ray/raylet/local_lease_manager.h @@ -189,6 +189,14 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { return num_unschedulable_lease_spilled_; } + bool IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const override; + + void StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override; + private: struct SchedulingClassInfo; @@ -248,8 +256,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { absl::flat_hash_map> &leased_workers_, const std::shared_ptr &allocated_instances, const RayLease &lease, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback); + const std::vector &reply_callbacks); void Spillback(const NodeID &spillback_to, const std::shared_ptr &work); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bc6bd476d799..4ef4cd258cee 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1813,7 +1813,7 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques rpc::SendReplyCallback send_reply_callback) { auto lease_id = LeaseID::FromBinary(request.lease_spec().lease_id()); // If the lease is already granted, this is a retry and forward the address of the - // already leased worker to use. + // already leased worker to use if (leased_workers_.contains(lease_id)) { const auto &worker = leased_workers_[lease_id]; RAY_LOG(DEBUG) << "Lease " << lease_id @@ -1851,9 +1851,6 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques actor_id = lease.GetLeaseSpecification().ActorId(); } - const auto &lease_spec = lease.GetLeaseSpecification(); - worker_pool_.PrestartWorkers(lease_spec, request.backlog_size()); - auto send_reply_callback_wrapper = [this, is_actor_creation_task, actor_id, reply, send_reply_callback]( Status status, std::function success, std::function failure) { @@ -1884,11 +1881,34 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques send_reply_callback(status, std::move(success), std::move(failure)); }; - cluster_lease_manager_.QueueAndScheduleLease(std::move(lease), - request.grant_or_reject(), - request.is_selected_based_on_locality(), - reply, - std::move(send_reply_callback_wrapper)); + if (cluster_lease_manager_.IsLeaseQueued( + lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { + cluster_lease_manager_.StoreReplyCallback( + lease.GetLeaseSpecification().GetSchedulingClass(), + lease_id, + std::move(send_reply_callback_wrapper), + reply); + return; + } + + if (local_lease_manager_.IsLeaseQueued( + lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { + local_lease_manager_.StoreReplyCallback( + lease.GetLeaseSpecification().GetSchedulingClass(), + lease_id, + std::move(send_reply_callback_wrapper), + reply); + return; + } + + const auto &lease_spec = lease.GetLeaseSpecification(); + worker_pool_.PrestartWorkers(lease_spec, request.backlog_size()); + + cluster_lease_manager_.QueueAndScheduleLease( + std::move(lease), + request.grant_or_reject(), + request.is_selected_based_on_locality(), + {internal::ReplyCallback(std::move(send_reply_callback_wrapper), reply)}); } void NodeManager::HandlePrestartWorkers(rpc::PrestartWorkersRequest request, @@ -2204,8 +2224,7 @@ void NodeManager::HandleDrainRaylet(rpc::DrainRayletRequest request, cluster_lease_manager_.QueueAndScheduleLease(work->lease_, work->grant_or_reject_, work->is_selected_based_on_locality_, - work->reply_, - work->send_reply_callback_); + work->reply_callbacks_); } } } diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.cc b/src/ray/raylet/scheduling/cluster_lease_manager.cc index a1cff110ab6c..0987a8618751 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.cc +++ b/src/ray/raylet/scheduling/cluster_lease_manager.cc @@ -48,16 +48,14 @@ void ClusterLeaseManager::QueueAndScheduleLease( RayLease lease, bool grant_or_reject, bool is_selected_based_on_locality, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) { + std::vector reply_callbacks) { RAY_LOG(DEBUG) << "Queuing and scheduling lease " << lease.GetLeaseSpecification().LeaseId(); const auto scheduling_class = lease.GetLeaseSpecification().GetSchedulingClass(); auto work = std::make_shared(std::move(lease), grant_or_reject, is_selected_based_on_locality, - reply, - std::move(send_reply_callback)); + std::move(reply_callbacks)); // If the scheduling class is infeasible, just add the work to the infeasible queue // directly. auto infeasible_leases_iter = infeasible_leases_.find(scheduling_class); @@ -73,11 +71,13 @@ namespace { void ReplyCancelled(const internal::Work &work, rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type, const std::string &scheduling_failure_message) { - auto reply = work.reply_; - reply->set_canceled(true); - reply->set_failure_type(failure_type); - reply->set_scheduling_failure_message(scheduling_failure_message); - work.send_reply_callback_(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work.reply_callbacks_) { + auto reply = reply_callback.reply_; + reply->set_canceled(true); + reply->set_failure_type(failure_type); + reply->set_scheduling_failure_message(scheduling_failure_message); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } } } // namespace @@ -425,11 +425,11 @@ void ClusterLeaseManager::ScheduleOnNode(const NodeID &spillback_to, return; } - auto send_reply_callback = work->send_reply_callback_; - if (work->grant_or_reject_) { - work->reply_->set_rejected(true); - send_reply_callback(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work->reply_callbacks_) { + reply_callback.reply_->set_rejected(true); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } return; } @@ -449,13 +449,14 @@ void ClusterLeaseManager::ScheduleOnNode(const NodeID &spillback_to, auto node_info = get_node_info_(spillback_to); RAY_CHECK(node_info.has_value()); - auto reply = work->reply_; - reply->mutable_retry_at_raylet_address()->set_ip_address( - (*node_info).node_manager_address()); - reply->mutable_retry_at_raylet_address()->set_port((*node_info).node_manager_port()); - reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); - - send_reply_callback(Status::OK(), nullptr, nullptr); + for (const auto &reply_callback : work->reply_callbacks_) { + auto reply = reply_callback.reply_; + reply->mutable_retry_at_raylet_address()->set_ip_address( + (*node_info).node_manager_address()); + reply->mutable_retry_at_raylet_address()->set_port((*node_info).node_manager_port()); + reply->mutable_retry_at_raylet_address()->set_node_id(spillback_to.Binary()); + reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); + } } ClusterResourceScheduler &ClusterLeaseManager::GetClusterResourceScheduler() const { @@ -482,5 +483,46 @@ void ClusterLeaseManager::FillPendingActorInfo(rpc::ResourcesData &data) const { scheduler_resource_reporter_.FillPendingActorCountByShape(data); } +bool ClusterLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const { + auto it = leases_to_schedule_.find(scheduling_class); + if (it != leases_to_schedule_.end()) { + for (const auto &work : it->second) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + return true; + } + } + } + + auto infeasible_it = infeasible_leases_.find(scheduling_class); + if (infeasible_it != infeasible_leases_.end()) { + for (const auto &work : infeasible_it->second) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + return true; + } + } + } + + return false; +} + +void ClusterLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) { + for (const auto &work : leases_to_schedule_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return; + } + } + for (const auto &work : infeasible_leases_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return; + } + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.h b/src/ray/raylet/scheduling/cluster_lease_manager.h index 2e0efaa58a0e..8ede87931de2 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager.h @@ -64,13 +64,12 @@ class ClusterLeaseManager : public ClusterLeaseManagerInterface { /// \param grant_or_reject: True if we we should either grant or reject the request /// but no spillback. /// \param is_selected_based_on_locality : should schedule on local node if possible. - /// \param reply: The reply of the lease request. - /// \param send_reply_callback: The function used during dispatching. - void QueueAndScheduleLease(RayLease lease, - bool grant_or_reject, - bool is_selected_based_on_locality, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) override; + /// \param reply_callbacks: The reply callbacks of the lease request. + void QueueAndScheduleLease( + RayLease lease, + bool grant_or_reject, + bool is_selected_based_on_locality, + std::vector reply_callbacks) override; /// Attempt to cancel an already queued lease. /// @@ -161,6 +160,20 @@ class ClusterLeaseManager : public ClusterLeaseManagerInterface { /// filled. void FillPendingActorInfo(rpc::ResourcesData &data) const; + /// Check if a lease is queued. + /// + /// \param scheduling_class: The scheduling class of the lease. + /// \param lease_id: The lease id of the lease. + /// + /// \return True if the lease is queued in leases_to_schedule_ or infeasible_leases_. + bool IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const override; + + void StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override; + private: void TryScheduleInfeasibleLease(); diff --git a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h index 46c57a4bdcb8..1b0b4afaeb4e 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h @@ -97,11 +97,11 @@ class ClusterLeaseManagerInterface { /// but no spillback. /// \param reply: The reply of the lease request. /// \param send_reply_callback: The function used during dispatching. - virtual void QueueAndScheduleLease(RayLease lease, - bool grant_or_reject, - bool is_selected_based_on_locality, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) = 0; + virtual void QueueAndScheduleLease( + RayLease lease, + bool grant_or_reject, + bool is_selected_based_on_locality, + std::vector reply_callbacks) = 0; /// Return with an exemplar if any leases are pending resource acquisition. /// @@ -117,6 +117,26 @@ class ClusterLeaseManagerInterface { /// Record the internal metrics. virtual void RecordMetrics() const = 0; + + /// Check if a lease is queued. + /// + /// \param scheduling_class: The scheduling class of the lease. + /// \param lease_id: The lease id of the lease. + /// + /// \return True if the lease is queued in leases_to_schedule_ or infeasible_leases_. + virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const = 0; + + /// Store the reply callback for a lease. + /// + /// \param scheduling_class: The scheduling class of the lease. + /// \param lease_id: The lease id of the lease. + /// \param send_reply_callback: The callback used for the reply. + /// \param reply: The reply of the lease request. + virtual void StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) = 0; }; } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/internal.h b/src/ray/raylet/scheduling/internal.h index 99103a34a729..9e01884daf36 100644 --- a/src/ray/raylet/scheduling/internal.h +++ b/src/ray/raylet/scheduling/internal.h @@ -52,27 +52,34 @@ enum class UnscheduledWorkCause { /// Work represents all the information needed to make a scheduling decision. /// This includes the lease, the information we need to communicate to /// dispatch/spillback and the callback to trigger it. +struct ReplyCallback { + ReplyCallback(rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) + : send_reply_callback_(std::move(send_reply_callback)), reply_(reply) {} + rpc::SendReplyCallback send_reply_callback_; + rpc::RequestWorkerLeaseReply *reply_; +}; + class Work { public: RayLease lease_; bool grant_or_reject_; bool is_selected_based_on_locality_; - rpc::RequestWorkerLeaseReply *reply_; - rpc::SendReplyCallback send_reply_callback_; + std::vector reply_callbacks_; std::shared_ptr allocated_instances_; + Work(RayLease lease, bool grant_or_reject, bool is_selected_based_on_locality, - rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback, + std::vector reply_callbacks, WorkStatus status = WorkStatus::WAITING) : lease_(std::move(lease)), grant_or_reject_(grant_or_reject), is_selected_based_on_locality_(is_selected_based_on_locality), - reply_(reply), - send_reply_callback_(std::move(send_reply_callback)), + reply_callbacks_(std::move(reply_callbacks)), allocated_instances_(nullptr), status_(status){}; + Work(const Work &Work) = delete; Work &operator=(const Work &work) = delete; ~Work() = default; diff --git a/src/ray/raylet/scheduling/local_lease_manager_interface.h b/src/ray/raylet/scheduling/local_lease_manager_interface.h index 19ff311cb1cc..733eb08fbe4b 100644 --- a/src/ray/raylet/scheduling/local_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/local_lease_manager_interface.h @@ -97,6 +97,12 @@ class LocalLeaseManagerInterface { virtual size_t GetNumLeaseSpilled() const = 0; virtual size_t GetNumWaitingLeaseSpilled() const = 0; virtual size_t GetNumUnschedulableLeaseSpilled() const = 0; + virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const = 0; + virtual void StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) = 0; }; /// A noop local lease manager. It is a no-op class. We need this because there's no @@ -177,7 +183,14 @@ class NoopLocalLeaseManager : public LocalLeaseManagerInterface { size_t GetNumLeaseSpilled() const override { return 0; } size_t GetNumWaitingLeaseSpilled() const override { return 0; } size_t GetNumUnschedulableLeaseSpilled() const override { return 0; } + bool IsLeaseQueued(const SchedulingClass &scheduling_class, + const LeaseID &lease_id) const override { + return false; + } + void StoreReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override {} }; - } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 2e5a737ff6be..e98c5876b1f7 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -942,7 +942,7 @@ INSTANTIATE_TEST_SUITE_P(NodeManagerReturnWorkerLeaseIdempotentVariations, NodeManagerReturnWorkerLeaseIdempotentTest, testing::Combine(testing::Bool(), testing::Bool())); -TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseIdempotent) { +TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseGrantedLeaseIdempotent) { auto lease_spec = BuildLeaseSpec({}); rpc::RequestWorkerLeaseRequest request; rpc::RequestWorkerLeaseReply reply1; @@ -983,6 +983,82 @@ TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseIdempotent) { ASSERT_EQ(reply1.worker_address(), reply2.worker_address()); } +TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseScheduledLeaseIdempotent) { + auto lease_spec = BuildLeaseSpec({}); + + // Create a task dependency to test that lease dependencies are requested/pulled only + // once for a lease even if HandleRequestWorkerLease is called multiple times. + ObjectID object_dep = ObjectID::FromRandom(); + auto *object_ref_dep = lease_spec.GetMutableMessage().add_dependencies(); + object_ref_dep->set_object_id(object_dep.Binary()); + + rpc::Address owner_addr; + plasma::flatbuf::ObjectSource source = plasma::flatbuf::ObjectSource::CreatedByWorker; + RAY_UNUSED(mock_store_client_->TryCreateImmediately( + object_dep, owner_addr, 1024, nullptr, 1024, nullptr, source, 0)); + + rpc::RequestWorkerLeaseRequest request; + rpc::RequestWorkerLeaseReply reply1; + rpc::RequestWorkerLeaseReply reply2; + LeaseID lease_id = LeaseID::FromRandom(); + lease_spec.GetMutableMessage().set_lease_id(lease_id.Binary()); + request.mutable_lease_spec()->CopyFrom(lease_spec.GetMessage()); + request.set_backlog_size(1); + request.set_grant_or_reject(true); + request.set_is_selected_based_on_locality(true); + + EXPECT_CALL(*mock_object_manager_, Pull(_, _, _)).Times(1).WillOnce(Return(1)); + + auto worker = std::make_shared(WorkerID::FromRandom(), 10); + PopWorkerCallback pop_worker_callback; + EXPECT_CALL(mock_worker_pool_, PopWorker(_, _)) + .Times(1) + .WillOnce([&](const LeaseSpecification &ls, const PopWorkerCallback &callback) { + pop_worker_callback = callback; + }); + uint32_t callback_count = 0; + node_manager_->HandleRequestWorkerLease( + request, + &reply1, + [&callback_count]( + Status s, std::function success, std::function failure) { + callback_count++; + ASSERT_TRUE(s.ok()); + }); + ASSERT_EQ(leased_workers_.size(), 0); + auto scheduling_class = lease_spec.GetSchedulingClass(); + ASSERT_TRUE(local_lease_manager_->IsLeaseQueued(scheduling_class, lease_id)); + + // Test HandleRequestWorkerLease idempotency for leases that aren't yet granted + node_manager_->HandleRequestWorkerLease( + request, + &reply2, + [&callback_count]( + Status s, std::function success, std::function failure) { + callback_count++; + ASSERT_TRUE(s.ok()); + }); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_TRUE(local_lease_manager_->IsLeaseQueued(scheduling_class, lease_id)); + + // Make the dependency available and notify the local lease manager that leases are + // unblocked so the lease can be granted + auto ready_lease_ids = lease_dependency_manager_->HandleObjectLocal(object_dep); + ASSERT_EQ(ready_lease_ids.size(), 1); + ASSERT_EQ(ready_lease_ids[0], lease_id); + local_lease_manager_->LeasesUnblocked(ready_lease_ids); + + // Grant the lease, both callbacks should be triggered + ASSERT_TRUE(pop_worker_callback); + pop_worker_callback(worker, PopWorkerStatus::OK, ""); + ASSERT_EQ(leased_workers_.size(), 1); + ASSERT_EQ(leased_workers_[lease_id]->GetGrantedLeaseId(), lease_id); + ASSERT_EQ(leased_workers_[lease_id]->WorkerId(), + WorkerID::FromBinary(reply1.worker_address().worker_id())); + ASSERT_EQ(reply1.worker_address(), reply2.worker_address()); + ASSERT_EQ(callback_count, 2); +} + TEST_F(NodeManagerTest, TestHandleRequestWorkerLeaseInfeasibleIdempotent) { auto lease_spec = BuildLeaseSpec({{"CPU", kTestTotalCpuResource + 1}}); lease_spec.GetMutableMessage() From 106cd11fb233ac0f9f6f357777a71b747a1ed7fe Mon Sep 17 00:00:00 2001 From: joshlee Date: Wed, 29 Oct 2025 20:04:34 +0000 Subject: [PATCH 2/7] Fix failing cpp tests Signed-off-by: joshlee --- .../tests/cluster_lease_manager_test.cc | 513 +++++++++++++++--- .../raylet/tests/local_lease_manager_test.cc | 50 +- 2 files changed, 457 insertions(+), 106 deletions(-) diff --git a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc index 50a0bd23e649..f61d78d43a88 100644 --- a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc +++ b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc @@ -553,7 +553,11 @@ TEST_F(ClusterLeaseManagerTest, BasicTest) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -592,7 +596,11 @@ TEST_F(ClusterLeaseManagerTest, IdempotencyTest) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -671,10 +679,22 @@ TEST_F(ClusterLeaseManagerTest, GrantQueueNonBlockingTest) { // Ensure task_A is not at the front of the queue. lease_manager_.QueueAndScheduleLease( - lease_B_1, false, false, &reply_B_1, empty_callback); - lease_manager_.QueueAndScheduleLease(lease_A, false, false, &reply_A, callback); + lease_B_1, + false, + false, + std::vector{ + internal::ReplyCallback(empty_callback, &reply_B_1)}); + lease_manager_.QueueAndScheduleLease( + lease_A, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply_A)}); lease_manager_.QueueAndScheduleLease( - lease_B_2, false, false, &reply_B_2, empty_callback); + lease_B_2, + false, + false, + std::vector{ + internal::ReplyCallback(empty_callback, &reply_B_2)}); pool_.TriggerCallbacks(); // Push a worker that can only run task A. @@ -735,7 +755,11 @@ TEST_F(ClusterLeaseManagerTest, BlockedWorkerDiesTest) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply1, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply1)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); @@ -749,7 +773,11 @@ TEST_F(ClusterLeaseManagerTest, BlockedWorkerDiesTest) { lease_manager_.ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply2, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply2)}); pool_.PushWorker(std::static_pointer_cast(worker2)); lease_manager_.ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); @@ -790,7 +818,11 @@ TEST_F(ClusterLeaseManagerTest, BlockedWorkerDies2Test) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); @@ -835,7 +867,11 @@ TEST_F(ClusterLeaseManagerTest, NoFeasibleNodeTest) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); @@ -859,7 +895,11 @@ TEST_F(ClusterLeaseManagerTest, DrainingWhileResolving) { Status, std::function, std::function) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); std::shared_ptr worker2 = @@ -879,7 +919,11 @@ TEST_F(ClusterLeaseManagerTest, DrainingWhileResolving) { missing_objects_.insert(missing_arg); rpc::RequestWorkerLeaseReply spillback_reply; lease_manager_.QueueAndScheduleLease( - resolving_args_lease, false, false, &spillback_reply, callback); + resolving_args_lease, + false, + false, + std::vector{ + internal::ReplyCallback(callback, &spillback_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); @@ -924,7 +968,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceTakenWhileResolving) { missing_objects_.insert(missing_arg); std::unordered_set expected_subscribed_leases = { lease.GetLeaseSpecification().LeaseId()}; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(lease_dependency_manager_.subscribed_leases, expected_subscribed_leases); @@ -937,7 +985,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceTakenWhileResolving) { /* This lease can run */ auto lease2 = CreateLease({{ray::kCPU_ResourceLabel, 5}}, 1); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(lease_dependency_manager_.subscribed_leases, expected_subscribed_leases); @@ -999,7 +1051,11 @@ TEST_F(ClusterLeaseManagerTest, TestIsSelectedBasedOnLocality) { auto lease1 = CreateLease({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply local_reply; lease_manager_.QueueAndScheduleLease( - lease1, false, /*is_selected_based_on_locality=*/false, &local_reply, callback); + lease1, + false, + /*is_selected_based_on_locality=*/false, + std::vector{ + internal::ReplyCallback(callback, &local_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); // The first lease was dispatched. @@ -1009,7 +1065,11 @@ TEST_F(ClusterLeaseManagerTest, TestIsSelectedBasedOnLocality) { auto lease2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); rpc::RequestWorkerLeaseReply spillback_reply; lease_manager_.QueueAndScheduleLease( - lease2, false, /*is_selected_based_on_locality=*/false, &spillback_reply, callback); + lease2, + false, + /*is_selected_based_on_locality=*/false, + std::vector{ + internal::ReplyCallback(callback, &spillback_reply)}); pool_.TriggerCallbacks(); // The second lease was spilled. ASSERT_EQ(num_callbacks, 2); @@ -1019,7 +1079,11 @@ TEST_F(ClusterLeaseManagerTest, TestIsSelectedBasedOnLocality) { auto lease3 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); lease_manager_.QueueAndScheduleLease( - lease3, false, /*is_selected_based_on_locality=*/true, &local_reply, callback); + lease3, + false, + /*is_selected_based_on_locality=*/true, + std::vector{ + internal::ReplyCallback(callback, &local_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 3); // The third lease was dispatched. @@ -1053,7 +1117,11 @@ TEST_F(ClusterLeaseManagerTest, TestGrantOrReject) { auto lease1 = CreateLease({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply local_reply; lease_manager_.QueueAndScheduleLease( - lease1, /*grant_or_reject=*/false, false, &local_reply, callback); + lease1, + /*grant_or_reject=*/false, + false, + std::vector{ + internal::ReplyCallback(callback, &local_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); // The first lease was dispatched. @@ -1063,7 +1131,11 @@ TEST_F(ClusterLeaseManagerTest, TestGrantOrReject) { auto lease2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); rpc::RequestWorkerLeaseReply spillback_reply; lease_manager_.QueueAndScheduleLease( - lease2, /*grant_or_reject=*/false, false, &spillback_reply, callback); + lease2, + /*grant_or_reject=*/false, + false, + std::vector{ + internal::ReplyCallback(callback, &spillback_reply)}); pool_.TriggerCallbacks(); // The second lease was spilled. ASSERT_EQ(num_callbacks, 2); @@ -1073,7 +1145,11 @@ TEST_F(ClusterLeaseManagerTest, TestGrantOrReject) { auto lease3 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); lease_manager_.QueueAndScheduleLease( - lease3, /*grant_or_reject=*/true, false, &local_reply, callback); + lease3, + /*grant_or_reject=*/true, + false, + std::vector{ + internal::ReplyCallback(callback, &local_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 3); // The third lease was dispatched. @@ -1108,7 +1184,12 @@ TEST_F(ClusterLeaseManagerTest, TestSpillAfterAssigned) { /* Blocked on starting a worker. */ auto lease = CreateLease({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply local_reply; - lease_manager_.QueueAndScheduleLease(lease, false, false, &local_reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{ + internal::ReplyCallback(callback, &local_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 0); @@ -1118,7 +1199,11 @@ TEST_F(ClusterLeaseManagerTest, TestSpillAfterAssigned) { auto lease2 = CreateLease({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply reject_reply; lease_manager_.QueueAndScheduleLease( - lease2, /*grant_or_reject=*/true, false, &reject_reply, callback); + lease2, + /*grant_or_reject=*/true, + false, + std::vector{ + internal::ReplyCallback(callback, &reject_reply)}); pool_.TriggerCallbacks(); // The second lease was rejected. @@ -1129,7 +1214,12 @@ TEST_F(ClusterLeaseManagerTest, TestSpillAfterAssigned) { // Resources are no longer available for the third. auto lease3 = CreateLease({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply spillback_reply; - lease_manager_.QueueAndScheduleLease(lease3, false, false, &spillback_reply, callback); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{ + internal::ReplyCallback(callback, &spillback_reply)}); pool_.TriggerCallbacks(); // The third lease was spilled. @@ -1167,7 +1257,11 @@ TEST_F(ClusterLeaseManagerTest, TestIdleNode) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_TRUE(scheduler_->GetLocalResourceManager().IsLocalNodeIdle()); ASSERT_FALSE(callback_occurred); @@ -1210,8 +1304,16 @@ TEST_F(ClusterLeaseManagerTest, NotOKPopWorkerAfterDrainingTest) { Status, std::function, std::function) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply1, callback); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply2, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply1)}); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply2)}); auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 5); @@ -1242,7 +1344,11 @@ TEST_F(ClusterLeaseManagerTest, NotOKPopWorkerTest) { Status, std::function, std::function) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING_FOR_WORKER), 1); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING), 0); ASSERT_EQ(NumRunningLeases(), 1); @@ -1256,7 +1362,11 @@ TEST_F(ClusterLeaseManagerTest, NotOKPopWorkerTest) { callback_called = false; reply.Clear(); RayLease lease2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING_FOR_WORKER), 1); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING), 0); ASSERT_EQ(NumRunningLeases(), 1); @@ -1275,7 +1385,11 @@ TEST_F(ClusterLeaseManagerTest, NotOKPopWorkerTest) { callback_called = false; reply.Clear(); RayLease lease3 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); - lease_manager_.QueueAndScheduleLease(lease3, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING_FOR_WORKER), 1); ASSERT_EQ(NumLeasesToDispatchWithStatus(internal::WorkStatus::WAITING), 0); ASSERT_EQ(NumRunningLeases(), 1); @@ -1310,7 +1424,10 @@ TEST_F(ClusterLeaseManagerTest, TaskUnschedulableTest) { }; lease_manager_.QueueAndScheduleLease( - RayLease(lease_spec), false, false, &reply, callback); + RayLease(lease_spec), + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(reply.failure_type(), @@ -1335,7 +1452,11 @@ TEST_F(ClusterLeaseManagerTest, TaskCancellationTest) { // Lease1 not queued so we can't cancel it. ASSERT_FALSE(lease_manager_.CancelLease(lease1.GetLeaseSpecification().LeaseId())); - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // Lease1 is now in dispatch queue. @@ -1351,7 +1472,11 @@ TEST_F(ClusterLeaseManagerTest, TaskCancellationTest) { ASSERT_EQ(leased_workers_.size(), 0); RayLease lease2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // Lease2 is now granted so we can't cancel it. @@ -1373,9 +1498,17 @@ TEST_F(ClusterLeaseManagerTest, TaskCancellationTest) { RayLease lease4 = CreateLease({{ray::kCPU_ResourceLabel, 200}}); rpc::RequestWorkerLeaseReply reply4; // Lease 3 should be popping worker - lease_manager_.QueueAndScheduleLease(lease3, false, false, &reply3, callback); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply3)}); // Lease 4 is infeasible - lease_manager_.QueueAndScheduleLease(lease4, false, false, &reply4, callback); + lease_manager_.QueueAndScheduleLease( + lease4, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply4)}); pool_.TriggerCallbacks(); ASSERT_TRUE(lease_manager_.CancelLeases( [](const std::shared_ptr &work) { return true; }, @@ -1403,7 +1536,11 @@ TEST_F(ClusterLeaseManagerTest, TaskCancelInfeasibleTask) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // RayLease is now queued so cancellation works. @@ -1458,9 +1595,17 @@ TEST_F(ClusterLeaseManagerTest, TaskCancelWithResourceShape) { *callback_called_ptr_2 = true; }; - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply1, callback1); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback1, &reply1)}); pool_.TriggerCallbacks(); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply2, callback2); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback2, &reply2)}); pool_.TriggerCallbacks(); callback_called_1 = false; @@ -1503,7 +1648,11 @@ TEST_F(ClusterLeaseManagerTest, HeartbeatTest) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_TRUE(callback_called); // Now {CPU: 7, GPU: 4, MEM:128} @@ -1520,7 +1669,11 @@ TEST_F(ClusterLeaseManagerTest, HeartbeatTest) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // No worker available. // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued lease. @@ -1538,7 +1691,11 @@ TEST_F(ClusterLeaseManagerTest, HeartbeatTest) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible lease {CPU: 9}. @@ -1556,7 +1713,11 @@ TEST_F(ClusterLeaseManagerTest, HeartbeatTest) { *callback_called_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible lease {CPU: 10}. @@ -1622,7 +1783,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceReportForNodeAffinitySchedulingStrategyT scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_soft(false); RayLease lease1 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, 0, {}, nullptr, scheduling_strategy); - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); // Feasible soft lease won't be reported. scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id( @@ -1630,7 +1795,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceReportForNodeAffinitySchedulingStrategyT scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_soft(true); RayLease task2 = CreateLease({{ray::kCPU_ResourceLabel, 2}}, 0, {}, nullptr, scheduling_strategy); - lease_manager_.QueueAndScheduleLease(task2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + task2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); // Infeasible soft lease will be reported. scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_node_id( @@ -1638,7 +1807,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceReportForNodeAffinitySchedulingStrategyT scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_soft(true); RayLease task3 = CreateLease({{ray::kGPU_ResourceLabel, 1}}, 0, {}, nullptr, scheduling_strategy); - lease_manager_.QueueAndScheduleLease(task3, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + task3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_FALSE(callback_occurred); // Infeasible strict lease won't be reported (will fail immediately). @@ -1647,7 +1820,11 @@ TEST_F(ClusterLeaseManagerTest, ResourceReportForNodeAffinitySchedulingStrategyT scheduling_strategy.mutable_node_affinity_scheduling_strategy()->set_soft(false); RayLease task4 = CreateLease({{ray::kGPU_ResourceLabel, 2}}, 0, {}, nullptr, scheduling_strategy); - lease_manager_.QueueAndScheduleLease(task4, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + task4, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); ASSERT_TRUE(callback_occurred); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(reply.failure_type(), @@ -1684,7 +1861,11 @@ TEST_F(ClusterLeaseManagerTest, BacklogReportTest) { std::vector worker_ids; for (int i = 0; i < 10; i++) { RayLease lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); worker_ids.push_back(WorkerID::FromRandom()); local_lease_manager_->SetWorkerBacklog( lease.GetLeaseSpecification().GetSchedulingClass(), worker_ids.back(), 10 - i); @@ -1767,7 +1948,11 @@ TEST_F(ClusterLeaseManagerTest, OwnerDeadTest) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); @@ -1777,7 +1962,11 @@ TEST_F(ClusterLeaseManagerTest, OwnerDeadTest) { AssertNoLeaks(); callback_occurred = false; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); @@ -1799,7 +1988,11 @@ TEST_F(ClusterLeaseManagerTest, TestInfeasibleLeaseWarning) { Status, std::function, std::function) { *callback_occurred = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_lease_calls_, 1); @@ -1846,7 +2039,11 @@ TEST_F(ClusterLeaseManagerTest, TestMultipleInfeasibleLeasesWarnOnce) { Status, std::function, std::function) { *callback_occurred = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_lease_calls_, 1); @@ -1858,7 +2055,11 @@ TEST_F(ClusterLeaseManagerTest, TestMultipleInfeasibleLeasesWarnOnce) { Status, std::function, std::function) { *callback_occurred2 = true; }; - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply2, callback2); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback2, &reply2)}); pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_lease_calls_, 1); } @@ -1879,7 +2080,11 @@ TEST_F(ClusterLeaseManagerTest, TestAnyPendingLeasesForResourceAcquisition) { Status, std::function, std::function) { *callback_occurred = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_TRUE(*callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -1902,7 +2107,11 @@ TEST_F(ClusterLeaseManagerTest, TestAnyPendingLeasesForResourceAcquisition) { Status, std::function, std::function) { *callback_occurred2 = true; }; - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply2, callback2); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback2, &reply2)}); pool_.TriggerCallbacks(); ASSERT_FALSE(*callback_occurred2); auto pending_lease = lease_manager_.AnyPendingLeasesForResourceAcquisition( @@ -1936,7 +2145,11 @@ TEST_F(ClusterLeaseManagerTest, ArgumentEvicted) { missing_objects_.insert(missing_arg); std::unordered_set expected_subscribed_leases = { lease.GetLeaseSpecification().LeaseId()}; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(lease_dependency_manager_.subscribed_leases, expected_subscribed_leases); ASSERT_EQ(num_callbacks, 0); @@ -1975,14 +2188,15 @@ TEST_F(ClusterLeaseManagerTest, FeasibleToNonFeasible) { RayLease lease1 = CreateLease({{ray::kCPU_ResourceLabel, 4}}); rpc::RequestWorkerLeaseReply reply1; bool callback_occurred1 = false; + auto callback1 = [&callback_occurred1]( + Status, std::function, std::function) { + callback_occurred1 = true; + }; lease_manager_.QueueAndScheduleLease( lease1, false, false, - &reply1, - [&callback_occurred1](Status, std::function, std::function) { - callback_occurred1 = true; - }); + std::vector{internal::ReplyCallback(callback1, &reply1)}); pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_TRUE(callback_occurred1); @@ -1999,14 +2213,15 @@ TEST_F(ClusterLeaseManagerTest, FeasibleToNonFeasible) { RayLease lease2 = CreateLease({{ray::kCPU_ResourceLabel, 4}}); rpc::RequestWorkerLeaseReply reply2; bool callback_occurred2 = false; + auto callback2 = [&callback_occurred2]( + Status, std::function, std::function) { + callback_occurred2 = true; + }; lease_manager_.QueueAndScheduleLease( lease2, false, false, - &reply2, - [&callback_occurred2](Status, std::function, std::function) { - callback_occurred2 = true; - }); + std::vector{internal::ReplyCallback(callback2, &reply2)}); pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_FALSE(callback_occurred2); @@ -2196,7 +2411,12 @@ TEST_F(ClusterLeaseManagerTest, TestSpillWaitingLeases) { .mutable_scheduling_strategy() ->mutable_spread_scheduling_strategy(); } - lease_manager_.QueueAndScheduleLease(lease, false, false, replies[i].get(), callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{ + internal::ReplyCallback(callback, replies[i].get())}); pool_.TriggerCallbacks(); } ASSERT_EQ(num_callbacks, 0); @@ -2288,7 +2508,11 @@ TEST_F(ClusterLeaseManagerTest, PinnedArgsMemoryTest) { nullptr, rpc::SchedulingStrategy(), lease_id1); - lease_manager_.QueueAndScheduleLease(lease1, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); @@ -2303,7 +2527,11 @@ TEST_F(ClusterLeaseManagerTest, PinnedArgsMemoryTest) { nullptr, rpc::SchedulingStrategy(), lease_id2); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); @@ -2348,7 +2576,11 @@ TEST_F(ClusterLeaseManagerTest, PinnedArgsSameMemoryTest) { // This lease can run. default_arg_size_ = 600; auto lease = CreateLease({{ray::kCPU_ResourceLabel, 1}}, 1); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); @@ -2359,7 +2591,11 @@ TEST_F(ClusterLeaseManagerTest, PinnedArgsSameMemoryTest) { auto lease2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, 1, lease.GetLeaseSpecification().GetDependencyIds()); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(leased_workers_.size(), 2); @@ -2388,7 +2624,11 @@ TEST_F(ClusterLeaseManagerTest, LargeArgsNoStarvationTest) { default_arg_size_ = 2000; auto lease = CreateLease({{ray::kCPU_ResourceLabel, 1}}, 1); pool_.PushWorker(std::static_pointer_cast(worker)); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); @@ -2417,7 +2657,11 @@ TEST_F(ClusterLeaseManagerTest, PopWorkerExactlyOnce) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); // Make sure callback doesn't occurred. ASSERT_FALSE(callback_occurred); @@ -2479,9 +2723,21 @@ TEST_F(ClusterLeaseManagerTest, CapRunningOnDispatchQueue) { auto callback = [&num_callbacks](Status, std::function, std::function) { num_callbacks++; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease3, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 2); @@ -2528,9 +2784,21 @@ TEST_F(ClusterLeaseManagerTest, ZeroCPULeases) { auto callback = [&num_callbacks](Status, std::function, std::function) { num_callbacks++; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease3, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // We shouldn't cap anything for zero cpu leases (and shouldn't crash before @@ -2563,9 +2831,21 @@ TEST_F(ClusterLeaseManagerTestWithoutCPUsAtHead, ZeroCPUNode) { auto callback = [&num_callbacks](Status, std::function, std::function) { num_callbacks++; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease2, false, false, &reply, callback); - lease_manager_.QueueAndScheduleLease(lease3, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); + lease_manager_.QueueAndScheduleLease( + lease3, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // We shouldn't cap anything for zero cpu leases (and shouldn't crash before @@ -2598,7 +2878,12 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapSpillback) { RayLease lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}); leases.push_back(lease); replies.push_back(std::make_unique()); - lease_manager_.QueueAndScheduleLease(lease, false, false, replies[i].get(), callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{ + internal::ReplyCallback(callback, replies[i].get())}); pool_.TriggerCallbacks(); } @@ -2642,7 +2927,11 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapIncrease) { num_callbacks++; }; for (const auto &lease : leases) { - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); } auto runtime_env_hash = leases[0].GetLeaseSpecification().GetRuntimeEnvHash(); @@ -2699,7 +2988,11 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapIncrease) { RayLease lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/0, /*args=*/{}); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); std::shared_ptr new_worker = std::make_shared(WorkerID::FromRandom(), 1234, runtime_env_hash); @@ -2741,7 +3034,11 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapResetTest) { num_callbacks++; }; for (const auto &lease : leases) { - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); } auto runtime_env_hash = leases[0].GetLeaseSpecification().GetRuntimeEnvHash(); @@ -2773,7 +3070,11 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapResetTest) { RayLease lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/0, /*args=*/{}); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); } std::shared_ptr worker3 = @@ -2799,7 +3100,11 @@ TEST_F(ClusterLeaseManagerTest, SchedulingClassCapResetTest) { RayLease lease5 = CreateLease({}, /*num_args=*/0, /*args=*/{}); - lease_manager_.QueueAndScheduleLease(lease5, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease5, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); std::shared_ptr worker5 = std::make_shared(WorkerID::FromRandom(), 1234, runtime_env_hash); pool_.PushWorker(std::static_pointer_cast(worker5)); @@ -2828,7 +3133,11 @@ TEST_F(ClusterLeaseManagerTest, DispatchTimerAfterRequestTest) { auto callback = [&num_callbacks](Status, std::function, std::function) { num_callbacks++; }; - lease_manager_.QueueAndScheduleLease(first_lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + first_lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); auto runtime_env_hash = first_lease.GetLeaseSpecification().GetRuntimeEnvHash(); std::vector> workers; @@ -2846,7 +3155,11 @@ TEST_F(ClusterLeaseManagerTest, DispatchTimerAfterRequestTest) { RayLease second_lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/0, /*args=*/{}); - lease_manager_.QueueAndScheduleLease(second_lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + second_lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); /// Can't schedule yet due to the cap. @@ -2874,7 +3187,11 @@ TEST_F(ClusterLeaseManagerTest, DispatchTimerAfterRequestTest) { RayLease third_lease = CreateLease({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/0, /*args=*/{}); - lease_manager_.QueueAndScheduleLease(third_lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + third_lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); /// We still can't schedule the third lease since the timer doesn't start @@ -2908,7 +3225,11 @@ TEST_F(ClusterLeaseManagerTest, PopWorkerBeforeDraining) { Status, std::function, std::function) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); // Drain the local node. rpc::DrainRayletRequest drain_request; @@ -2935,7 +3256,11 @@ TEST_F(ClusterLeaseManagerTest, UnscheduleableWhileDraining) { Status, std::function, std::function) { *callback_occurred_ptr = true; }; - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); std::shared_ptr worker2 = @@ -2958,7 +3283,11 @@ TEST_F(ClusterLeaseManagerTest, UnscheduleableWhileDraining) { RayLease spillback_lease = CreateLease({{ray::kCPU_ResourceLabel, 1}}); rpc::RequestWorkerLeaseReply spillback_reply; lease_manager_.QueueAndScheduleLease( - spillback_lease, false, false, &spillback_reply, callback); + spillback_lease, + false, + false, + std::vector{ + internal::ReplyCallback(callback, &spillback_reply)}); pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); @@ -2988,7 +3317,11 @@ TEST_F(ClusterLeaseManagerTestWithoutCPUsAtHead, OneCpuInfeasibleLease) { for (int i = 0; i < num_cases; ++i) { RayLease lease = CreateLease({{ray::kCPU_ResourceLabel, cpu_request[i]}}); - lease_manager_.QueueAndScheduleLease(lease, false, false, &reply, callback); + lease_manager_.QueueAndScheduleLease( + lease, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply)}); pool_.TriggerCallbacks(); // The lease cannot run because there is only 1 node (head) with 0 CPU. diff --git a/src/ray/raylet/tests/local_lease_manager_test.cc b/src/ray/raylet/tests/local_lease_manager_test.cc index 94047e18ff6c..2b8593e5b6ee 100644 --- a/src/ray/raylet/tests/local_lease_manager_test.cc +++ b/src/ray/raylet/tests/local_lease_manager_test.cc @@ -393,7 +393,11 @@ TEST_F(LocalLeaseManagerTest, TestCancelLeasesWithoutReply) { rpc::RequestWorkerLeaseReply reply1; // lease1 is waiting for a worker local_lease_manager_->QueueAndScheduleLease(std::make_shared( - lease1, false, false, &reply1, callback, internal::WorkStatus::WAITING)); + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply1)}, + internal::WorkStatus::WAITING)); auto arg_id = ObjectID::FromRandom(); std::vector> args; @@ -404,7 +408,11 @@ TEST_F(LocalLeaseManagerTest, TestCancelLeasesWithoutReply) { rpc::RequestWorkerLeaseReply reply2; // lease2 is waiting for args local_lease_manager_->QueueAndScheduleLease(std::make_shared( - lease2, false, false, &reply2, callback, internal::WorkStatus::WAITING)); + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply2)}, + internal::WorkStatus::WAITING)); auto cancelled_works = local_lease_manager_->CancelLeasesWithoutReply( [](const std::shared_ptr &work) { return true; }); @@ -429,12 +437,14 @@ TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { auto lease_f1 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "f"); auto lease_f2 = CreateLease({{ray::kCPU_ResourceLabel, 1}}, "f"); rpc::RequestWorkerLeaseReply reply; + auto empty_callback = + [](Status status, std::function success, std::function failure) {}; local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( lease_f1, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); @@ -442,8 +452,8 @@ TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { lease_f2, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); @@ -457,29 +467,29 @@ TEST_F(LocalLeaseManagerTest, TestLeaseGrantingOrder) { lease_f3, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( lease_f4, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( lease_f5, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->WaitForLeaseArgsRequests(std::make_shared( lease_g1, false, false, - &reply, - [](Status status, std::function success, std::function failure) {}, + std::vector{ + internal::ReplyCallback(empty_callback, &reply)}, internal::WorkStatus::WAITING)); local_lease_manager_->ScheduleAndGrantLeases(); pool_.TriggerCallbacks(); @@ -520,10 +530,18 @@ TEST_F(LocalLeaseManagerTest, TestNoLeakOnImpossibleInfeasibleLease) { }; rpc::RequestWorkerLeaseReply reply1; local_lease_manager_->QueueAndScheduleLease(std::make_shared( - lease1, false, false, &reply1, callback, internal::WorkStatus::WAITING)); + lease1, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply1)}, + internal::WorkStatus::WAITING)); rpc::RequestWorkerLeaseReply reply2; local_lease_manager_->QueueAndScheduleLease(std::make_shared( - lease2, false, false, &reply2, callback, internal::WorkStatus::WAITING)); + lease2, + false, + false, + std::vector{internal::ReplyCallback(callback, &reply2)}, + internal::WorkStatus::WAITING)); // Node no longer has cpu. scheduler_->GetLocalResourceManager().DeleteLocalResource( From 8ee05d48dd61fddb527364d70147586069032781 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 30 Oct 2025 17:51:13 +0000 Subject: [PATCH 3/7] Addressing comments Signed-off-by: joshlee --- src/ray/raylet/local_lease_manager.cc | 34 +++++++++++-------- src/ray/raylet/local_lease_manager.h | 2 +- src/ray/raylet/node_manager.cc | 8 ++--- .../scheduling/cluster_lease_manager.cc | 23 ++++++++----- .../raylet/scheduling/cluster_lease_manager.h | 2 +- .../cluster_lease_manager_interface.h | 4 ++- .../local_lease_manager_interface.h | 8 +++-- 7 files changed, 47 insertions(+), 34 deletions(-) diff --git a/src/ray/raylet/local_lease_manager.cc b/src/ray/raylet/local_lease_manager.cc index 9dacc910f5fc..209bb8b70462 100644 --- a/src/ray/raylet/local_lease_manager.cc +++ b/src/ray/raylet/local_lease_manager.cc @@ -1000,18 +1000,17 @@ void LocalLeaseManager::Grant( } else { allocated_resources = worker->GetAllocatedInstances(); } - for (const auto &reply_callback : reply_callbacks) { - ::ray::rpc::ResourceMapEntry *resource; - for (auto &resource_id : allocated_resources->ResourceIds()) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - auto instances = allocated_resources->Get(resource_id); + for (auto &resource_id : allocated_resources->ResourceIds()) { + auto instances = allocated_resources->Get(resource_id); + for (const auto &reply_callback : reply_callbacks) { + ::ray::rpc::ResourceMapEntry *resource = nullptr; for (size_t inst_idx = 0; inst_idx < instances.size(); inst_idx++) { if (instances[inst_idx] > 0.) { - if (first) { + // Set resource name only if at least one of its instances has available + // capacity. + if (resource == nullptr) { resource = reply_callback.reply_->add_resource_mapping(); resource->set_name(resource_id.Binary()); - first = false; } auto rid = resource->add_resource_ids(); rid->set_index(inst_idx); @@ -1019,7 +1018,9 @@ void LocalLeaseManager::Grant( } } } - // Send the result back to the client. + } + // Send the result back to the clients. + for (const auto &reply_callback : reply_callbacks) { reply_callback.send_reply_callback_(Status::OK(), nullptr, nullptr); } } @@ -1276,21 +1277,24 @@ bool LocalLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, return false; } -void LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, +bool LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) { - for (const auto &work : leases_to_grant_[scheduling_class]) { - if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { - work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); - return; + if (leases_to_grant_.contains(scheduling_class)) { + for (const auto &work : leases_to_grant_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return true; + } } } auto it = waiting_leases_index_.find(lease_id); if (it != waiting_leases_index_.end()) { (*it->second)->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); - return; + return true; } + return false; } } // namespace raylet diff --git a/src/ray/raylet/local_lease_manager.h b/src/ray/raylet/local_lease_manager.h index c71446d6ed01..39da48a00fd3 100644 --- a/src/ray/raylet/local_lease_manager.h +++ b/src/ray/raylet/local_lease_manager.h @@ -192,7 +192,7 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - void StoreReplyCallback(const SchedulingClass &scheduling_class, + bool StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) override; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1de2546a8b29..5e115c48f6f7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1879,21 +1879,21 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques if (cluster_lease_manager_.IsLeaseQueued( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { - cluster_lease_manager_.StoreReplyCallback( + RAY_CHECK(cluster_lease_manager_.StoreReplyCallback( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id, std::move(send_reply_callback_wrapper), - reply); + reply)); return; } if (local_lease_manager_.IsLeaseQueued( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { - local_lease_manager_.StoreReplyCallback( + RAY_CHECK(local_lease_manager_.StoreReplyCallback( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id, std::move(send_reply_callback_wrapper), - reply); + reply)); return; } diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.cc b/src/ray/raylet/scheduling/cluster_lease_manager.cc index 0987a8618751..0f5ee6144683 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.cc +++ b/src/ray/raylet/scheduling/cluster_lease_manager.cc @@ -506,22 +506,27 @@ bool ClusterLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, return false; } -void ClusterLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, +bool ClusterLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) { - for (const auto &work : leases_to_schedule_[scheduling_class]) { - if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { - work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); - return; + if (leases_to_schedule_.contains(scheduling_class)) { + for (const auto &work : leases_to_schedule_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return true; + } } } - for (const auto &work : infeasible_leases_[scheduling_class]) { - if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { - work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); - return; + if (infeasible_leases_.contains(scheduling_class)) { + for (const auto &work : infeasible_leases_[scheduling_class]) { + if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { + work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); + return true; + } } } + return false; } } // namespace raylet diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.h b/src/ray/raylet/scheduling/cluster_lease_manager.h index 8ede87931de2..f481929ca370 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager.h @@ -169,7 +169,7 @@ class ClusterLeaseManager : public ClusterLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - void StoreReplyCallback(const SchedulingClass &scheduling_class, + bool StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) override; diff --git a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h index 1b0b4afaeb4e..b2d0cfba08d4 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h @@ -133,7 +133,9 @@ class ClusterLeaseManagerInterface { /// \param lease_id: The lease id of the lease. /// \param send_reply_callback: The callback used for the reply. /// \param reply: The reply of the lease request. - virtual void StoreReplyCallback(const SchedulingClass &scheduling_class, + /// + /// \return True if the reply callback was stored successfully. + virtual bool StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) = 0; diff --git a/src/ray/raylet/scheduling/local_lease_manager_interface.h b/src/ray/raylet/scheduling/local_lease_manager_interface.h index 733eb08fbe4b..861e6fd7c23f 100644 --- a/src/ray/raylet/scheduling/local_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/local_lease_manager_interface.h @@ -99,7 +99,7 @@ class LocalLeaseManagerInterface { virtual size_t GetNumUnschedulableLeaseSpilled() const = 0; virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const = 0; - virtual void StoreReplyCallback(const SchedulingClass &scheduling_class, + virtual bool StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) = 0; @@ -187,10 +187,12 @@ class NoopLocalLeaseManager : public LocalLeaseManagerInterface { const LeaseID &lease_id) const override { return false; } - void StoreReplyCallback(const SchedulingClass &scheduling_class, + bool StoreReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) override {} + rpc::RequestWorkerLeaseReply *reply) override { + return false; + } }; } // namespace raylet } // namespace ray From 0c81aed0eb5fba3117b4b32e34a52aa761f3b9fc Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 30 Oct 2025 21:13:20 +0000 Subject: [PATCH 4/7] fix cpp test failure Signed-off-by: joshlee --- src/mock/ray/raylet/local_lease_manager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mock/ray/raylet/local_lease_manager.h b/src/mock/ray/raylet/local_lease_manager.h index 69522f14cd7e..8acda6822fc5 100644 --- a/src/mock/ray/raylet/local_lease_manager.h +++ b/src/mock/ray/raylet/local_lease_manager.h @@ -83,7 +83,7 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface { IsLeaseQueued, (const SchedulingClass &scheduling_class, const LeaseID &lease_id), (const, override)); - MOCK_METHOD(void, + MOCK_METHOD(bool, StoreReplyCallback, (const SchedulingClass &scheduling_class, const LeaseID &lease_id, From f5e86f892373894d21993ae6057cebd3bfdb6780 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 30 Oct 2025 22:52:50 +0000 Subject: [PATCH 5/7] Addressing comments Signed-off-by: joshlee --- src/mock/ray/raylet/local_lease_manager.h | 2 +- src/ray/raylet/local_lease_manager.cc | 8 ++++---- src/ray/raylet/local_lease_manager.h | 10 ++++++---- src/ray/raylet/node_manager.cc | 4 ++-- .../raylet/scheduling/cluster_lease_manager.cc | 8 ++++---- .../raylet/scheduling/cluster_lease_manager.h | 10 ++++++---- .../scheduling/cluster_lease_manager_interface.h | 8 ++++---- src/ray/raylet/scheduling/internal.h | 3 ++- .../scheduling/local_lease_manager_interface.h | 16 ++++++++-------- 9 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/mock/ray/raylet/local_lease_manager.h b/src/mock/ray/raylet/local_lease_manager.h index 8acda6822fc5..050afc39da69 100644 --- a/src/mock/ray/raylet/local_lease_manager.h +++ b/src/mock/ray/raylet/local_lease_manager.h @@ -84,7 +84,7 @@ class MockLocalLeaseManager : public LocalLeaseManagerInterface { (const SchedulingClass &scheduling_class, const LeaseID &lease_id), (const, override)); MOCK_METHOD(bool, - StoreReplyCallback, + AddReplyCallback, (const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, diff --git a/src/ray/raylet/local_lease_manager.cc b/src/ray/raylet/local_lease_manager.cc index 209bb8b70462..8e181d7d0121 100644 --- a/src/ray/raylet/local_lease_manager.cc +++ b/src/ray/raylet/local_lease_manager.cc @@ -1277,10 +1277,10 @@ bool LocalLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, return false; } -bool LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) { +bool LocalLeaseManager::AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) { if (leases_to_grant_.contains(scheduling_class)) { for (const auto &work : leases_to_grant_[scheduling_class]) { if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { diff --git a/src/ray/raylet/local_lease_manager.h b/src/ray/raylet/local_lease_manager.h index 39da48a00fd3..ebc2176a64d4 100644 --- a/src/ray/raylet/local_lease_manager.h +++ b/src/ray/raylet/local_lease_manager.h @@ -192,10 +192,12 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - bool StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) override; + /// Add a reply callback to the lease. We don't overwrite the existing reply callback + /// since due to message reordering we may receive the retry before the initial request. + bool AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override; private: struct SchedulingClassInfo; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 5e115c48f6f7..f549d5ed27bd 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1879,7 +1879,7 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques if (cluster_lease_manager_.IsLeaseQueued( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { - RAY_CHECK(cluster_lease_manager_.StoreReplyCallback( + RAY_CHECK(cluster_lease_manager_.AddReplyCallback( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id, std::move(send_reply_callback_wrapper), @@ -1889,7 +1889,7 @@ void NodeManager::HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest reques if (local_lease_manager_.IsLeaseQueued( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id)) { - RAY_CHECK(local_lease_manager_.StoreReplyCallback( + RAY_CHECK(local_lease_manager_.AddReplyCallback( lease.GetLeaseSpecification().GetSchedulingClass(), lease_id, std::move(send_reply_callback_wrapper), diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.cc b/src/ray/raylet/scheduling/cluster_lease_manager.cc index 0f5ee6144683..a27c6e52f30b 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.cc +++ b/src/ray/raylet/scheduling/cluster_lease_manager.cc @@ -506,10 +506,10 @@ bool ClusterLeaseManager::IsLeaseQueued(const SchedulingClass &scheduling_class, return false; } -bool ClusterLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) { +bool ClusterLeaseManager::AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) { if (leases_to_schedule_.contains(scheduling_class)) { for (const auto &work : leases_to_schedule_[scheduling_class]) { if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.h b/src/ray/raylet/scheduling/cluster_lease_manager.h index f481929ca370..b4638b1e8a80 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager.h @@ -169,10 +169,12 @@ class ClusterLeaseManager : public ClusterLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - bool StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) override; + /// Add a reply callback to the lease. We don't overwrite the existing reply callback + /// since due to message reordering we may receive the retry before the initial request. + bool AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override; private: void TryScheduleInfeasibleLease(); diff --git a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h index b2d0cfba08d4..4f08cae5a0c4 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h @@ -135,10 +135,10 @@ class ClusterLeaseManagerInterface { /// \param reply: The reply of the lease request. /// /// \return True if the reply callback was stored successfully. - virtual bool StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) = 0; + virtual bool AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) = 0; }; } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/internal.h b/src/ray/raylet/scheduling/internal.h index 9e01884daf36..d08f00b13291 100644 --- a/src/ray/raylet/scheduling/internal.h +++ b/src/ray/raylet/scheduling/internal.h @@ -51,7 +51,7 @@ enum class UnscheduledWorkCause { /// Work represents all the information needed to make a scheduling decision. /// This includes the lease, the information we need to communicate to -/// dispatch/spillback and the callback to trigger it. +/// dispatch/spillback and the callbacks to trigger it. struct ReplyCallback { ReplyCallback(rpc::SendReplyCallback send_reply_callback, rpc::RequestWorkerLeaseReply *reply) @@ -65,6 +65,7 @@ class Work { RayLease lease_; bool grant_or_reject_; bool is_selected_based_on_locality_; + // All the callbacks will be triggered when the lease is granted or rejected. std::vector reply_callbacks_; std::shared_ptr allocated_instances_; diff --git a/src/ray/raylet/scheduling/local_lease_manager_interface.h b/src/ray/raylet/scheduling/local_lease_manager_interface.h index 861e6fd7c23f..6326d0725af3 100644 --- a/src/ray/raylet/scheduling/local_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/local_lease_manager_interface.h @@ -99,10 +99,10 @@ class LocalLeaseManagerInterface { virtual size_t GetNumUnschedulableLeaseSpilled() const = 0; virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const = 0; - virtual bool StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) = 0; + virtual bool AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) = 0; }; /// A noop local lease manager. It is a no-op class. We need this because there's no @@ -187,10 +187,10 @@ class NoopLocalLeaseManager : public LocalLeaseManagerInterface { const LeaseID &lease_id) const override { return false; } - bool StoreReplyCallback(const SchedulingClass &scheduling_class, - const LeaseID &lease_id, - rpc::SendReplyCallback send_reply_callback, - rpc::RequestWorkerLeaseReply *reply) override { + bool AddReplyCallback(const SchedulingClass &scheduling_class, + const LeaseID &lease_id, + rpc::SendReplyCallback send_reply_callback, + rpc::RequestWorkerLeaseReply *reply) override { return false; } }; From bf5cc929cb9667adec1f9b2cbc8a62d0acd2efec Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 30 Oct 2025 22:53:40 +0000 Subject: [PATCH 6/7] nit Signed-off-by: joshlee --- src/ray/raylet/scheduling/internal.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/scheduling/internal.h b/src/ray/raylet/scheduling/internal.h index d08f00b13291..abfc872a4ea4 100644 --- a/src/ray/raylet/scheduling/internal.h +++ b/src/ray/raylet/scheduling/internal.h @@ -65,7 +65,7 @@ class Work { RayLease lease_; bool grant_or_reject_; bool is_selected_based_on_locality_; - // All the callbacks will be triggered when the lease is granted or rejected. + // All the callbacks will be triggered when the lease is scheduled. std::vector reply_callbacks_; std::shared_ptr allocated_instances_; From 72f44aabe470c4cfc56eb0c96c35bd42103a3964 Mon Sep 17 00:00:00 2001 From: joshlee Date: Thu, 30 Oct 2025 22:56:31 +0000 Subject: [PATCH 7/7] nit Signed-off-by: joshlee --- src/ray/raylet/local_lease_manager.h | 2 -- src/ray/raylet/scheduling/cluster_lease_manager.h | 2 -- .../scheduling/cluster_lease_manager_interface.h | 5 +++-- .../raylet/scheduling/local_lease_manager_interface.h | 10 ++++++++++ 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/ray/raylet/local_lease_manager.h b/src/ray/raylet/local_lease_manager.h index ebc2176a64d4..6d038d55886a 100644 --- a/src/ray/raylet/local_lease_manager.h +++ b/src/ray/raylet/local_lease_manager.h @@ -192,8 +192,6 @@ class LocalLeaseManager : public LocalLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - /// Add a reply callback to the lease. We don't overwrite the existing reply callback - /// since due to message reordering we may receive the retry before the initial request. bool AddReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, diff --git a/src/ray/raylet/scheduling/cluster_lease_manager.h b/src/ray/raylet/scheduling/cluster_lease_manager.h index b4638b1e8a80..dafcaf4408f8 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager.h @@ -169,8 +169,6 @@ class ClusterLeaseManager : public ClusterLeaseManagerInterface { bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const override; - /// Add a reply callback to the lease. We don't overwrite the existing reply callback - /// since due to message reordering we may receive the retry before the initial request. bool AddReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, diff --git a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h index 4f08cae5a0c4..ef68006ea826 100644 --- a/src/ray/raylet/scheduling/cluster_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/cluster_lease_manager_interface.h @@ -127,14 +127,15 @@ class ClusterLeaseManagerInterface { virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const = 0; - /// Store the reply callback for a lease. + /// Add a reply callback to the lease. We don't overwrite the existing reply callback + /// since due to message reordering we may receive the retry before the initial request. /// /// \param scheduling_class: The scheduling class of the lease. /// \param lease_id: The lease id of the lease. /// \param send_reply_callback: The callback used for the reply. /// \param reply: The reply of the lease request. /// - /// \return True if the reply callback was stored successfully. + /// \return True if the reply callback is added successfully. virtual bool AddReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback, diff --git a/src/ray/raylet/scheduling/local_lease_manager_interface.h b/src/ray/raylet/scheduling/local_lease_manager_interface.h index 6326d0725af3..9f1661382c41 100644 --- a/src/ray/raylet/scheduling/local_lease_manager_interface.h +++ b/src/ray/raylet/scheduling/local_lease_manager_interface.h @@ -99,6 +99,16 @@ class LocalLeaseManagerInterface { virtual size_t GetNumUnschedulableLeaseSpilled() const = 0; virtual bool IsLeaseQueued(const SchedulingClass &scheduling_class, const LeaseID &lease_id) const = 0; + + /// Add a reply callback to the lease. We don't overwrite the existing reply callback + /// since due to message reordering we may receive the retry before the initial request. + /// + /// \param scheduling_class: The scheduling class of the lease. + /// \param lease_id: The lease id of the lease. + /// \param send_reply_callback: The callback used for the reply. + /// \param reply: The reply of the lease request. + /// + /// \return True if the reply callback is added successfully. virtual bool AddReplyCallback(const SchedulingClass &scheduling_class, const LeaseID &lease_id, rpc::SendReplyCallback send_reply_callback,