diff --git a/BUILD.bazel b/BUILD.bazel index f59963167c564..4d5f2d96db1a2 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -325,6 +325,9 @@ cc_library( deps = [ ":gcs", ":gcs_service_rpc", + ":node_manager_rpc", + ":raylet_lib", + ":worker_rpc", ], ) @@ -725,6 +728,45 @@ cc_test( ], ) +cc_test( + name = "gcs_node_manager_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc", + "src/ray/gcs/gcs_server/test/gcs_test_util.h", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "gcs_actor_scheduler_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc", + "src/ray/gcs/gcs_server/test/gcs_test_util.h", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "gcs_actor_manager_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc", + "src/ray/gcs/gcs_server/test/gcs_test_util.h", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "service_based_gcs_client_lib", srcs = glob( @@ -739,7 +781,8 @@ cc_library( ), copts = COPTS, deps = [ - ":gcs_server_lib", + ":gcs", + ":gcs_service_rpc", ], ) @@ -754,6 +797,7 @@ cc_test( "//:redis-server", ], deps = [ + ":gcs_server_lib", ":service_based_gcs_client_lib", "@com_google_googletest//:gtest_main", ], diff --git a/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java b/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java index 95eaa83e824a1..b944e4ece38c6 100644 --- a/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java @@ -40,7 +40,7 @@ public boolean wasCurrentActorReconstructed() { return false; } - return runtime.getGcsClient().actorExists(getCurrentActorId()); + return runtime.getGcsClient().wasCurrentActorReconstructed(getCurrentActorId()); } @Override diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 24b49f640633b..f1882c404dc0d 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -9,6 +9,7 @@ import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; import io.ray.api.runtimecontext.NodeInfo; +import io.ray.runtime.config.RayConfig; import io.ray.runtime.generated.Gcs; import io.ray.runtime.generated.Gcs.ActorCheckpointIdData; import io.ray.runtime.generated.Gcs.GcsNodeInfo; @@ -27,9 +28,7 @@ * An implementation of GcsClient. */ public class GcsClient { - private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class); - private RedisClient primary; private List shards; @@ -126,6 +125,29 @@ public boolean actorExists(ActorId actorId) { return primary.exists(key); } + public boolean wasCurrentActorReconstructed(ActorId actorId) { + byte[] key = ArrayUtils.addAll(TablePrefix.ACTOR.toString().getBytes(), actorId.getBytes()); + if (!RayConfig.getInstance().gcsServiceEnabled) { + return primary.exists(key); + } + + // TODO(ZhuSenlin): Get the actor table data from CoreWorker later. + byte[] value = primary.get(key); + if (value == null) { + return false; + } + Gcs.ActorTableData actorTableData = null; + try { + actorTableData = Gcs.ActorTableData.parseFrom(value); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Received invalid protobuf data from GCS."); + } + + long maxReconstructions = actorTableData.getMaxReconstructions(); + long remainingReconstructions = actorTableData.getRemainingReconstructions(); + return maxReconstructions - remainingReconstructions != 0; + } + /** * Query whether the raylet task exists in Gcs. */ diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index c4ef6be427488..e074595c13011 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -227,7 +227,7 @@ def getpid(self): if child_actor_info["state"] == -1: assert child_actor_info["requiredResources"]["CustomResource"] == 1 else: - assert child_actor_info["state"] == 0 + assert child_actor_info["state"] == 1 assert len(child_actor_info["children"]) == 0 assert child_actor_info["usedResources"]["CPU"] == 1 diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 1e7982a606a29..d9adc019de3f6 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -263,6 +263,10 @@ RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100) /// The interval at which the gcs server will check if redis has gone down. /// When this happens, gcs server will kill itself. RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100) +/// Duration to wait between retries for leasing worker in gcs server. +RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200) +/// Duration to wait between retries for creating actor in gcs server. +RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200) /// Maximum number of times to retry putting an object when the plasma store is full. /// Can be set to -1 to enable unlimited retries. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 260b8118a5721..3d54c0f4e39f7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -410,6 +410,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ return std::shared_ptr( new raylet::RayletClient(std::move(grpc_client))); }; + + std::function + actor_create_callback = nullptr; + if (RayConfig::instance().gcs_service_enabled()) { + actor_create_callback = [this](const TaskSpecification &task_spec, + const gcs::StatusCallback &callback) { + return gcs_client_->Actors().AsyncCreateActor(task_spec, callback); + }; + } + direct_actor_submitter_ = std::unique_ptr( new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_, task_manager_)); @@ -418,7 +428,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ std::unique_ptr(new CoreWorkerDirectTaskSubmitter( rpc_address_, local_raylet_client_, client_factory, raylet_client_factory, memory_store_, task_manager_, local_raylet_id, - RayConfig::instance().worker_lease_timeout_milliseconds())); + RayConfig::instance().worker_lease_timeout_milliseconds(), + std::move(actor_create_callback))); future_resolver_.reset(new FutureResolver(memory_store_, client_factory)); // Unfortunately the raylet client has to be constructed after the receivers. if (direct_task_receiver_ != nullptr) { @@ -624,7 +635,6 @@ void CoreWorker::RegisterToGcs() { RAY_CHECK_OK(gcs_client_->Workers().AsyncRegisterWorker(options_.worker_type, worker_id, worker_info, nullptr)); } - void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) { if (error == boost::asio::error::operation_aborted) { return; @@ -1244,7 +1254,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, // Register a callback to handle actor notifications. auto actor_notification_callback = [this](const ActorID &actor_id, const gcs::ActorTableData &actor_data) { - if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { + if (actor_data.state() == gcs::ActorTableData::PENDING) { + // The actor is being created and not yet ready, just ignore! + } else if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { absl::MutexLock lock(&actor_handles_mutex_); auto it = actor_handles_.find(actor_id); RAY_CHECK(it != actor_handles_.end()); @@ -1265,8 +1277,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, direct_actor_submitter_->ConnectActor(actor_id, actor_data.address()); } - RAY_LOG(INFO) << "received notification on actor, state=" - << static_cast(actor_data.state()) << ", actor_id: " << actor_id + const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state()); + RAY_LOG(INFO) << "received notification on actor, state: " << actor_state + << ", actor_id: " << actor_id << ", ip address: " << actor_data.address().ip_address() << ", port: " << actor_data.address().port() << ", worker_id: " << WorkerID::FromBinary(actor_data.address().worker_id()) diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 043e68bbc2018..b5b98b6e4a940 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -15,7 +15,6 @@ #include "ray/core_worker/transport/direct_task_transport.h" #include "ray/core_worker/transport/dependency_resolver.h" -#include "ray/core_worker/transport/direct_actor_transport.h" namespace ray { @@ -23,6 +22,30 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId(); resolver_.ResolveDependencies(task_spec, [this, task_spec]() { RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); + if (actor_create_callback_ && task_spec.IsActorCreationTask()) { + // If gcs actor management is enabled, the actor creation task will be sent to gcs + // server directly after the in-memory dependent objects are resolved. + // For more details please see the protocol of actor management based on gcs. + // https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing + auto actor_id = task_spec.ActorCreationId(); + auto task_id = task_spec.TaskId(); + RAY_LOG(INFO) << "Submitting actor creation task to GCS: " << actor_id; + auto status = + actor_create_callback_(task_spec, [this, actor_id, task_id](Status status) { + // If GCS is failed, GcsRpcClient may receive IOError status but it will not + // trigger this callback, because GcsRpcClient has retry logic at the + // bottom. So if this callback is invoked with an error there must be + // something wrong with the protocol of gcs-based actor management. + // So just check `status.ok()` here. + RAY_CHECK_OK(status); + RAY_LOG(INFO) << "Actor creation task submitted to GCS: " << actor_id; + task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(), + rpc::Address()); + }); + RAY_CHECK_OK(status); + return; + } + absl::MutexLock lock(&mu_); // Note that the dependencies in the task spec are mutated to only contain // plasma dependencies after ResolveDependencies finishes. diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index 163e9279cd6f3..6f4ec27df40d3 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -49,21 +49,23 @@ using SchedulingKey = std::tuple, ActorID // This class is thread-safe. class CoreWorkerDirectTaskSubmitter { public: - CoreWorkerDirectTaskSubmitter(rpc::Address rpc_address, - std::shared_ptr lease_client, - rpc::ClientFactoryFn client_factory, - LeaseClientFactoryFn lease_client_factory, - std::shared_ptr store, - std::shared_ptr task_finisher, - ClientID local_raylet_id, int64_t lease_timeout_ms) + explicit CoreWorkerDirectTaskSubmitter( + rpc::Address rpc_address, std::shared_ptr lease_client, + rpc::ClientFactoryFn client_factory, LeaseClientFactoryFn lease_client_factory, + std::shared_ptr store, + std::shared_ptr task_finisher, ClientID local_raylet_id, + int64_t lease_timeout_ms, + std::function + actor_create_callback = nullptr) : rpc_address_(rpc_address), local_lease_client_(lease_client), client_factory_(client_factory), lease_client_factory_(lease_client_factory), resolver_(store, task_finisher), task_finisher_(task_finisher), + lease_timeout_ms_(lease_timeout_ms), local_raylet_id_(local_raylet_id), - lease_timeout_ms_(lease_timeout_ms) {} + actor_create_callback_(std::move(actor_create_callback)) {} /// Schedule a task for direct submission to a worker. /// @@ -148,6 +150,13 @@ class CoreWorkerDirectTaskSubmitter { /// if a remote raylet tells us to spill the task back to the local raylet. const ClientID local_raylet_id_; + /// A function to override actor creation. The callback will be called once the actor + /// creation task has been accepted for submission, but the actor may not be created + /// yet. + std::function + actor_create_callback_; + // Protects task submission state below. absl::Mutex mu_; diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 3cdf30598f034..10ea7edae4447 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -16,6 +16,7 @@ #define RAY_GCS_ACCESSOR_H #include "ray/common/id.h" +#include "ray/common/task/task_spec.h" #include "ray/gcs/callback.h" #include "ray/gcs/entry_change_notification.h" #include "ray/protobuf/gcs.pb.h" @@ -46,6 +47,14 @@ class ActorInfoAccessor { virtual Status AsyncGet(const ActorID &actor_id, const OptionalItemCallback &callback) = 0; + /// Create an actor to GCS asynchronously. + /// + /// \param task_spec The specification for the actor creation task. + /// \param callback Callback that will be called after the actor info is written to GCS. + /// \return Status + virtual Status AsyncCreateActor(const TaskSpecification &task_spec, + const StatusCallback &callback) = 0; + /// Register an actor to GCS asynchronously. /// /// \param data_ptr The actor that will be registered to the GCS. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index b3dada08b7404..ee5ff43ba0ced 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -79,7 +79,7 @@ ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor( ServiceBasedGcsClient *client_impl) : subscribe_id_(ClientID::FromRandom()), client_impl_(client_impl), - actor_sub_executor_(client_impl->GetRedisGcsClient().log_based_actor_table()) {} + actor_sub_executor_(client_impl->GetRedisGcsClient().actor_table()) {} Status ServiceBasedActorInfoAccessor::GetAll( std::vector *actor_table_data_list) { @@ -106,6 +106,22 @@ Status ServiceBasedActorInfoAccessor::AsyncGet( return Status::OK(); } +Status ServiceBasedActorInfoAccessor::AsyncCreateActor( + const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { + RAY_CHECK(task_spec.IsActorCreationTask() && callback); + rpc::CreateActorRequest request; + request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + client_impl_->GetGcsRpcClient().CreateActor( + request, [callback](const Status &, const rpc::CreateActorReply &reply) { + auto status = + reply.status().code() == (int)StatusCode::OK + ? Status() + : Status(StatusCode(reply.status().code()), reply.status().message()); + callback(status); + }); + return Status::OK(); +} + Status ServiceBasedActorInfoAccessor::AsyncRegister( const std::shared_ptr &data_ptr, const StatusCallback &callback) { diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 714327949d076..ea6599554edc2 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_SERVICE_BASED_ACCESSOR_H #define RAY_GCS_SERVICE_BASED_ACCESSOR_H +#include #include "ray/gcs/accessor.h" #include "ray/gcs/subscription_executor.h" #include "ray/util/sequencer.h" @@ -63,6 +64,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncGet(const ActorID &actor_id, const OptionalItemCallback &callback) override; + Status AsyncCreateActor(const TaskSpecification &task_spec, + const StatusCallback &callback) override; + Status AsyncRegister(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; @@ -97,7 +101,7 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { private: ServiceBasedGcsClient *client_impl_; - typedef SubscriptionExecutor + typedef SubscriptionExecutor ActorSubscriptionExecutor; ActorSubscriptionExecutor actor_sub_executor_; diff --git a/src/ray/gcs/gcs_server/actor_info_handler_impl.cc b/src/ray/gcs/gcs_server/actor_info_handler_impl.cc index c6a667dbee0c4..68866f4b62eac 100644 --- a/src/ray/gcs/gcs_server/actor_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/actor_info_handler_impl.cc @@ -18,6 +18,21 @@ namespace ray { namespace rpc { +void DefaultActorInfoHandler::HandleCreateActor( + const ray::rpc::CreateActorRequest &request, ray::rpc::CreateActorReply *reply, + ray::rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK); + auto actor_id = + ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id()); + + RAY_LOG(INFO) << "Registering actor, actor id = " << actor_id; + gcs_actor_manager_.RegisterActor(request, [reply, send_reply_callback, actor_id]( + std::shared_ptr actor) { + RAY_LOG(INFO) << "Registered actor, actor id = " << actor_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }); +} + void DefaultActorInfoHandler::HandleGetActorInfo( const rpc::GetActorInfoRequest &request, rpc::GetActorInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/gcs/gcs_server/actor_info_handler_impl.h b/src/ray/gcs/gcs_server/actor_info_handler_impl.h index d5d47f66872af..4cbefd4af453c 100644 --- a/src/ray/gcs/gcs_server/actor_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/actor_info_handler_impl.h @@ -15,17 +15,22 @@ #ifndef RAY_GCS_ACTOR_INFO_HANDLER_IMPL_H #define RAY_GCS_ACTOR_INFO_HANDLER_IMPL_H +#include "gcs_actor_manager.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" namespace ray { -namespace rpc { +namespace rpc { /// This implementation class of `ActorInfoHandler`. class DefaultActorInfoHandler : public rpc::ActorInfoHandler { public: - explicit DefaultActorInfoHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} + explicit DefaultActorInfoHandler(gcs::RedisGcsClient &gcs_client, + gcs::GcsActorManager &gcs_actor_manager) + : gcs_client_(gcs_client), gcs_actor_manager_(gcs_actor_manager) {} + + void HandleCreateActor(const CreateActorRequest &request, CreateActorReply *reply, + SendReplyCallback send_reply_callback) override; void HandleGetActorInfo(const GetActorInfoRequest &request, GetActorInfoReply *reply, SendReplyCallback send_reply_callback) override; @@ -52,6 +57,7 @@ class DefaultActorInfoHandler : public rpc::ActorInfoHandler { private: gcs::RedisGcsClient &gcs_client_; + gcs::GcsActorManager &gcs_actor_manager_; }; } // namespace rpc diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc new file mode 100644 index 0000000000000..8b36f742f3e02 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -0,0 +1,283 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gcs_actor_manager.h" +#include + +#include + +namespace ray { +namespace gcs { + +ClientID GcsActor::GetNodeID() const { + const auto &raylet_id_binary = actor_table_data_.address().raylet_id(); + if (raylet_id_binary.empty()) { + return ClientID::Nil(); + } + return ClientID::FromBinary(raylet_id_binary); +} + +void GcsActor::UpdateAddress(const rpc::Address &address) { + actor_table_data_.mutable_address()->CopyFrom(address); +} + +const rpc::Address &GcsActor::GetAddress() const { return actor_table_data_.address(); } + +WorkerID GcsActor::GetWorkerID() const { + const auto &address = actor_table_data_.address(); + if (address.worker_id().empty()) { + return WorkerID::Nil(); + } + return WorkerID::FromBinary(address.worker_id()); +} + +void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) { + actor_table_data_.set_state(state); +} + +rpc::ActorTableData::ActorState GcsActor::GetState() const { + return actor_table_data_.state(); +} + +ActorID GcsActor::GetActorID() const { + return ActorID::FromBinary(actor_table_data_.actor_id()); +} + +TaskSpecification GcsActor::GetCreationTaskSpecification() const { + const auto &task_spec = actor_table_data_.task_spec(); + return TaskSpecification(task_spec); +} + +const rpc::ActorTableData &GcsActor::GetActorTableData() const { + return actor_table_data_; +} + +rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } + +///////////////////////////////////////////////////////////////////////////////////////// +GcsActorManager::GcsActorManager(boost::asio::io_context &io_context, + gcs::ActorInfoAccessor &actor_info_accessor, + gcs::GcsNodeManager &gcs_node_manager, + LeaseClientFactoryFn lease_client_factory, + rpc::ClientFactoryFn client_factory) + : actor_info_accessor_(actor_info_accessor), + gcs_actor_scheduler_(new gcs::GcsActorScheduler( + io_context, actor_info_accessor, gcs_node_manager, + /*schedule_failure_handler=*/ + [this](std::shared_ptr actor) { + // When there are no available nodes to schedule the actor the + // gcs_actor_scheduler will treat it as failed and invoke this handler. In + // this case, the actor should be appended to the `pending_actors_` and wait + // for the registration of new node. + pending_actors_.emplace_back(std::move(actor)); + }, + /*schedule_success_handler=*/ + [this](std::shared_ptr actor) { + OnActorCreateSuccess(std::move(actor)); + }, + std::move(lease_client_factory), std::move(client_factory))) { + RAY_LOG(INFO) << "Initializing GcsActorManager."; + gcs_node_manager.AddNodeAddedListener( + [this](const std::shared_ptr &) { + // Because a new node has been added, we need to try to schedule the pending + // actors. + SchedulePendingActors(); + }); + + gcs_node_manager.AddNodeRemovedListener([this](std::shared_ptr node) { + // All of the related actors should be reconstructed when a node is removed from the + // GCS. + ReconstructActorsOnNode(ClientID::FromBinary(node->node_id())); + }); + RAY_LOG(INFO) << "Finished initialing GcsActorManager."; +} + +void GcsActorManager::RegisterActor( + const ray::rpc::CreateActorRequest &request, + std::function)> callback) { + RAY_CHECK(callback); + const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); + auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); + + auto iter = registered_actors_.find(actor_id); + if (iter != registered_actors_.end()) { + // When the network fails, Driver/Worker is not sure whether GcsServer has received + // the request of actor creation task, so Driver/Worker will try again and again until + // receiving the reply from GcsServer. If the actor is already records on the GCS + // Server side, the GCS Server will be responsible for creating or reconstructing the + // actor regardless of whether the Driver/Worker sends the request to create the actor + // again, so we just need fast reply OK to the Driver/Worker that the actor is already + // recorded by GCS Server. + callback(iter->second); + return; + } + + auto pending_register_iter = actor_to_register_callbacks_.find(actor_id); + if (pending_register_iter != actor_to_register_callbacks_.end()) { + // It is a duplicate message, just mark the callback as pending and invoke it after + // the related actor is flushed. + pending_register_iter->second.emplace_back(std::move(callback)); + return; + } + + // Mark the callback as pending and invoke it after the related actor is flushed. + actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback)); + + auto actor = std::make_shared(request); + auto actor_table_data = + std::make_shared(actor->GetActorTableData()); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate( + actor_id, actor_table_data, [this, actor](Status status) { + RAY_CHECK_OK(status); + RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); + // Invoke all callbacks for all registration requests of this actor (duplicated + // requests are included) and remove all of them from + // actor_to_register_callbacks_. + auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); + RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty()); + for (auto &callback : iter->second) { + callback(actor); + } + actor_to_register_callbacks_.erase(iter); + gcs_actor_scheduler_->Schedule(actor); + })); +} + +void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id, + const ray::WorkerID &worker_id, + bool need_reschedule) { + std::shared_ptr actor; + // Cancel the scheduling of the related actor. + auto actor_id = gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id); + if (!actor_id.IsNil()) { + auto iter = registered_actors_.find(actor_id); + RAY_CHECK(iter != registered_actors_.end()); + actor = iter->second; + } else { + // Find from worker_to_created_actor_. + auto iter = worker_to_created_actor_.find(worker_id); + if (iter != worker_to_created_actor_.end()) { + actor = std::move(iter->second); + // Remove the created actor from worker_to_created_actor_. + worker_to_created_actor_.erase(iter); + // remove the created actor from node_to_created_actors_. + auto node_iter = node_to_created_actors_.find(node_id); + RAY_CHECK(node_iter != node_to_created_actors_.end()); + RAY_CHECK(node_iter->second.erase(actor->GetActorID()) != 0); + if (node_iter->second.empty()) { + node_to_created_actors_.erase(node_iter); + } + } + } + if (actor != nullptr) { + // Reconstruct the actor. + ReconstructActor(actor, need_reschedule); + } +} + +void GcsActorManager::ReconstructActorsOnNode(const ClientID &node_id) { + // Cancel the scheduling of all related actors. + auto scheduling_actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id); + for (auto &actor_id : scheduling_actor_ids) { + auto iter = registered_actors_.find(actor_id); + if (iter != registered_actors_.end()) { + // Reconstruct the canceled actor. + ReconstructActor(iter->second); + } + } + + // Find all actors that were created on this node. + auto iter = node_to_created_actors_.find(node_id); + if (iter != node_to_created_actors_.end()) { + auto created_actors = std::move(iter->second); + // Remove all created actors from node_to_created_actors_. + node_to_created_actors_.erase(iter); + for (auto &entry : created_actors) { + // Remove the actor from worker_to_created_actor_. + RAY_CHECK(worker_to_created_actor_.erase(entry.second->GetWorkerID()) != 0); + // Reconstruct the removed actor. + ReconstructActor(std::move(entry.second)); + } + } +} + +void GcsActorManager::ReconstructActor(std::shared_ptr actor, + bool need_reschedule) { + RAY_CHECK(actor != nullptr); + auto node_id = actor->GetNodeID(); + auto worker_id = actor->GetWorkerID(); + actor->UpdateAddress(rpc::Address()); + auto mutable_actor_table_data = actor->GetMutableActorTableData(); + // If the need_reschedule is set to false, then set the `remaining_reconstructions` to 0 + // so that the actor will never be rescheduled. + auto remaining_reconstructions = + need_reschedule ? mutable_actor_table_data->remaining_reconstructions() : 0; + RAY_LOG(WARNING) << "Actor is failed " << actor->GetActorID() << " on worker " + << worker_id << " at node " << node_id + << ", need_reschedule = " << need_reschedule + << ", remaining_reconstructions = " << remaining_reconstructions; + + if (remaining_reconstructions > 0) { + mutable_actor_table_data->set_remaining_reconstructions(--remaining_reconstructions); + mutable_actor_table_data->set_state(rpc::ActorTableData::RECONSTRUCTING); + auto actor_table_data = + std::make_shared(*mutable_actor_table_data); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, + [this, actor](Status status) { + RAY_CHECK_OK(status); + gcs_actor_scheduler_->Schedule(actor); + })); + } else { + mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD); + auto actor_table_data = + std::make_shared(*mutable_actor_table_data); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK( + actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr)); + } +} + +void GcsActorManager::OnActorCreateSuccess(std::shared_ptr actor) { + auto worker_id = actor->GetWorkerID(); + RAY_CHECK(!worker_id.IsNil()); + RAY_CHECK(worker_to_created_actor_.emplace(worker_id, actor).second); + + auto actor_id = actor->GetActorID(); + auto node_id = actor->GetNodeID(); + RAY_CHECK(!node_id.IsNil()); + RAY_CHECK(node_to_created_actors_[node_id].emplace(actor_id, actor).second); + + actor->UpdateState(rpc::ActorTableData::ALIVE); + auto actor_table_data = + std::make_shared(actor->GetActorTableData()); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor_id, actor_table_data, nullptr)); +} + +void GcsActorManager::SchedulePendingActors() { + if (pending_actors_.empty()) { + return; + } + + RAY_LOG(DEBUG) << "Scheduling actor creation tasks, size = " << pending_actors_.size(); + auto actors = std::move(pending_actors_); + for (auto &actor : actors) { + gcs_actor_scheduler_->Schedule(std::move(actor)); + } +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h new file mode 100644 index 0000000000000..6e999654e156d --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -0,0 +1,193 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_GCS_ACTOR_MANAGER_H +#define RAY_GCS_ACTOR_MANAGER_H + +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "gcs_actor_scheduler.h" + +namespace ray { +namespace gcs { + +/// GcsActor just wraps `ActorTableData` and provides some convenient interfaces to access +/// the fields inside `ActorTableData`. +/// This class is not thread-safe. +class GcsActor { + public: + /// Create a GcsActor by actor_table_data. + /// + /// \param actor_table_data Data of the actor (see gcs.proto). + explicit GcsActor(rpc::ActorTableData actor_table_data) + : actor_table_data_(std::move(actor_table_data)) {} + + /// Create a GcsActor by CreateActorRequest. + /// + /// \param request Contains the actor creation task specification. + explicit GcsActor(const ray::rpc::CreateActorRequest &request) { + RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK); + const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); + actor_table_data_.set_actor_id(actor_creation_task_spec.actor_id()); + actor_table_data_.set_job_id(request.task_spec().job_id()); + actor_table_data_.set_max_reconstructions( + actor_creation_task_spec.max_actor_reconstructions()); + actor_table_data_.set_remaining_reconstructions( + actor_creation_task_spec.max_actor_reconstructions()); + + auto dummy_object = + TaskSpecification(request.task_spec()).ActorDummyObject().Binary(); + actor_table_data_.set_actor_creation_dummy_object_id(dummy_object); + + actor_table_data_.set_is_detached(actor_creation_task_spec.is_detached()); + actor_table_data_.mutable_owner_address()->CopyFrom( + request.task_spec().caller_address()); + + actor_table_data_.set_state(rpc::ActorTableData::PENDING); + actor_table_data_.mutable_task_spec()->CopyFrom(request.task_spec()); + + actor_table_data_.mutable_address()->set_raylet_id(ClientID::Nil().Binary()); + actor_table_data_.mutable_address()->set_worker_id(WorkerID::Nil().Binary()); + } + + /// Get the node id on which this actor is created. + ClientID GetNodeID() const; + /// Get the id of the worker on which this actor is created. + WorkerID GetWorkerID() const; + + /// Update the `Address` of this actor (see gcs.proto). + void UpdateAddress(const rpc::Address &address); + /// Get the `Address` of this actor. + const rpc::Address &GetAddress() const; + + /// Update the state of this actor. + void UpdateState(rpc::ActorTableData::ActorState state); + /// Get the state of this gcs actor. + rpc::ActorTableData::ActorState GetState() const; + + /// Get the id of this actor. + ActorID GetActorID() const; + /// Get the task specification of this actor. + TaskSpecification GetCreationTaskSpecification() const; + + /// Get the immutable ActorTableData of this actor. + const rpc::ActorTableData &GetActorTableData() const; + /// Get the mutable ActorTableData of this actor. + rpc::ActorTableData *GetMutableActorTableData(); + + private: + /// The actor meta data which contains the task specification as well as the state of + /// the gcs actor and so on (see gcs.proto). + rpc::ActorTableData actor_table_data_; +}; + +using RegisterActorCallback = std::function)>; +/// GcsActorManager is responsible for managing the lifecycle of all actors. +/// This class is not thread-safe. +class GcsActorManager { + public: + /// Create a GcsActorManager + /// + /// \param io_context The main event loop. + /// \param actor_info_accessor Used to flush actor data to storage. + /// \param gcs_node_manager The actor manager needs to listen to the node change events + /// inside gcs_node_manager. + /// \param lease_client_factory Factory to create remote lease client, it will be passed + /// through to the constructor of gcs_actor_scheduler, the gcs_actor_scheduler will use + /// default factory inside itself if it is not set. + /// \param client_factory Factory to create remote core worker client, it will be passed + /// through to the constructor of gcs_actor_scheduler, the gcs_actor_scheduler will use + /// default factory inside itself if it is not set. + explicit GcsActorManager(boost::asio::io_context &io_context, + gcs::ActorInfoAccessor &actor_info_accessor, + gcs::GcsNodeManager &gcs_node_manager, + LeaseClientFactoryFn lease_client_factory = nullptr, + rpc::ClientFactoryFn client_factory = nullptr); + + virtual ~GcsActorManager() = default; + + /// Register actor asynchronously. + /// + /// \param request Contains the meta info to create the actor. + /// \param callback Will be invoked after the meta info is flushed to the storage or be + /// invoked immediately if the meta info already exists. + void RegisterActor(const rpc::CreateActorRequest &request, + RegisterActorCallback callback); + + /// Reconstruct all actors associated with the specified node id, including actors which + /// are scheduled or have been created on this node. Triggered when the given node goes + /// down. + /// + /// \param node_id The specified node id. + void ReconstructActorsOnNode(const ClientID &node_id); + + /// Reconstruct actor associated with the specified node_id and worker_id. + /// The actor may be pending or already created. + /// + /// \param node_id ID of the node where the worker is located + /// \param worker_id ID of the worker that the actor is creating/created on + /// \param need_reschedule Whether to reschedule the actor creation task, sometimes + /// users want to kill an actor intentionally and don't want it to be rescheduled + /// again. + void ReconstructActorOnWorker(const ClientID &node_id, const WorkerID &worker_id, + bool need_reschedule = true); + + protected: + /// Schedule actors in the `pending_actors_` queue. + /// This method is triggered when new nodes are registered or resources change. + void SchedulePendingActors(); + + /// Reconstruct the specified actor. + /// + /// \param actor The target actor to be reconstructed. + /// \param need_reschedule Whether to reschedule the actor creation task, sometimes + /// users want to kill an actor intentionally and don't want it to be reconstructed + /// again. + void ReconstructActor(std::shared_ptr actor, bool need_reschedule = true); + + /// This method is a callback of gcs_actor_scheduler when actor is created successfully. + /// It will update the state of actor as well as the worker_to_created_actor_ and + /// node_to_created_actors_ and flush the actor data to the storage. + void OnActorCreateSuccess(std::shared_ptr actor); + + protected: + /// Callbacks of actor registration requests that are not yet flushed. + /// This map is used to filter duplicated messages from a Driver/Worker caused by some + /// network problems. + absl::flat_hash_map> + actor_to_register_callbacks_; + /// All registered actors (pending actors are also included). + absl::flat_hash_map> registered_actors_; + /// The pending actors which will not be scheduled until there's a resource change. + std::vector> pending_actors_; + /// Map contains the relationship of worker and created actor. + absl::flat_hash_map> worker_to_created_actor_; + /// Map contains the relationship of node and created actors. + absl::flat_hash_map>> + node_to_created_actors_; + /// The access info accessor. + gcs::ActorInfoAccessor &actor_info_accessor_; + /// The scheduler to schedule all registered actors. + std::unique_ptr gcs_actor_scheduler_; +}; + +} // namespace gcs +} // namespace ray + +#endif // RAY_GCS_ACTOR_MANAGER_H diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc new file mode 100644 index 0000000000000..9fe965decbcb5 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -0,0 +1,382 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gcs_actor_scheduler.h" +#include +#include +#include +#include "gcs_actor_manager.h" + +namespace ray { +namespace gcs { + +GcsActorScheduler::GcsActorScheduler( + boost::asio::io_context &io_context, gcs::ActorInfoAccessor &actor_info_accessor, + const gcs::GcsNodeManager &gcs_node_manager, + std::function)> schedule_failure_handler, + std::function)> schedule_success_handler, + LeaseClientFactoryFn lease_client_factory, rpc::ClientFactoryFn client_factory) + : io_context_(io_context), + client_call_manager_(io_context_), + actor_info_accessor_(actor_info_accessor), + gcs_node_manager_(gcs_node_manager), + schedule_failure_handler_(std::move(schedule_failure_handler)), + schedule_success_handler_(std::move(schedule_success_handler)), + lease_client_factory_(std::move(lease_client_factory)), + client_factory_(std::move(client_factory)) { + RAY_CHECK(schedule_failure_handler_ != nullptr && schedule_success_handler_ != nullptr); + if (lease_client_factory_ == nullptr) { + lease_client_factory_ = [this](const rpc::Address &address) { + auto node_manager_worker_client = rpc::NodeManagerWorkerClient::make( + address.ip_address(), address.port(), client_call_manager_); + return std::make_shared( + std::move(node_manager_worker_client)); + }; + } + if (client_factory_ == nullptr) { + client_factory_ = [this](const rpc::Address &address) { + return std::make_shared(address, client_call_manager_); + }; + } +} + +void GcsActorScheduler::Schedule(std::shared_ptr actor) { + auto node_id = actor->GetNodeID(); + if (!node_id.IsNil()) { + if (auto node = gcs_node_manager_.GetNode(node_id)) { + // If the actor is already tied to a node and the node is available, then record + // the relationship of the node and actor and then lease worker directly from the + // node. + RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()] + .emplace(actor->GetActorID()) + .second); + LeaseWorkerFromNode(actor, node); + return; + } + + // The actor is already tied to a node which is unavailable now, so we should reset + // the address. + actor->UpdateAddress(rpc::Address()); + } + + // Select a node to lease worker for the actor. + auto node = SelectNodeRandomly(); + if (node == nullptr) { + // There are no available nodes to schedule the actor, so just trigger the failed + // handler. + schedule_failure_handler_(std::move(actor)); + return; + } + + // Update the address of the actor as it is tied to a new node. + rpc::Address address; + address.set_raylet_id(node->node_id()); + actor->UpdateAddress(address); + auto actor_table_data = + std::make_shared(actor->GetActorTableData()); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, + [this, actor](Status status) { + RAY_CHECK_OK(status); + // There is no promise that the node the + // actor tied to is still alive as the + // flush is asynchronously, so just + // invoke `Schedule` which will lease + // worker directly if the node is still + // available or select a new one if not. + Schedule(actor); + })); +} + +std::vector GcsActorScheduler::CancelOnNode(const ClientID &node_id) { + // Remove all the actors from the map associated with this node, and return them as they + // will be reconstructed later. + std::vector actor_ids; + + // Remove all actors in phase of leasing. + { + auto iter = node_to_actors_when_leasing_.find(node_id); + if (iter != node_to_actors_when_leasing_.end()) { + actor_ids.insert(actor_ids.end(), iter->second.begin(), iter->second.end()); + node_to_actors_when_leasing_.erase(iter); + } + } + + // Remove all actors in phase of creating. + { + auto iter = node_to_workers_when_creating_.find(node_id); + if (iter != node_to_workers_when_creating_.end()) { + for (auto &entry : iter->second) { + actor_ids.emplace_back(entry.second->GetAssignedActorID()); + // Remove core worker client. + RAY_CHECK(core_worker_clients_.erase(entry.first) != 0); + } + node_to_workers_when_creating_.erase(iter); + } + } + + // Remove the related remote lease client from remote_lease_clients_. + // There is no need to check in this place, because it is possible that there are no + // workers leased on this node. + remote_lease_clients_.erase(node_id); + + return actor_ids; +} + +ActorID GcsActorScheduler::CancelOnWorker(const ClientID &node_id, + const WorkerID &worker_id) { + // Remove the worker from creating map and return ID of the actor associated with the + // removed worker if exist, else return NilID. + ActorID assigned_actor_id; + auto iter = node_to_workers_when_creating_.find(node_id); + if (iter != node_to_workers_when_creating_.end()) { + auto actor_iter = iter->second.find(worker_id); + if (actor_iter != iter->second.end()) { + assigned_actor_id = actor_iter->second->GetAssignedActorID(); + // Remove core worker client. + RAY_CHECK(core_worker_clients_.erase(worker_id) != 0); + iter->second.erase(actor_iter); + if (iter->second.empty()) { + node_to_workers_when_creating_.erase(iter); + } + } + } + return assigned_actor_id; +} + +void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, + std::shared_ptr node) { + RAY_CHECK(actor && node); + + auto node_id = ClientID::FromBinary(node->node_id()); + RAY_LOG(INFO) << "Start leasing worker from node " << node_id << " for actor " + << actor->GetActorID(); + + rpc::Address remote_address; + remote_address.set_raylet_id(node->node_id()); + remote_address.set_ip_address(node->node_manager_address()); + remote_address.set_port(node->node_manager_port()); + auto lease_client = GetOrConnectLeaseClient(remote_address); + auto status = lease_client->RequestWorkerLease( + actor->GetCreationTaskSpecification(), + [this, node_id, actor, node](const Status &status, + const rpc::RequestWorkerLeaseReply &reply) { + // If the actor is still in the leasing map and the status is ok, remove the actor + // from the leasing map and handle the reply. Otherwise, lease again, because it + // may be a network exception. + // If the actor is not in the leasing map, it means that the actor has been + // cancelled as the node is dead, just do nothing in this case because the + // gcs_actor_manager will reconstruct it again. + auto iter = node_to_actors_when_leasing_.find(node_id); + if (iter != node_to_actors_when_leasing_.end()) { + // If the node is still available, the actor must be still in the leasing map as + // it is erased from leasing map only when `CancelOnNode` or the + // `RequestWorkerLeaseReply` is received from the node, so try lease again. + auto actor_iter = iter->second.find(actor->GetActorID()); + RAY_CHECK(actor_iter != iter->second.end()); + if (status.ok()) { + // Remove the actor from the leasing map as the reply is returned from the + // remote node. + iter->second.erase(actor_iter); + if (iter->second.empty()) { + node_to_actors_when_leasing_.erase(iter); + } + RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor " + << actor->GetActorID(); + HandleWorkerLeasedReply(actor, reply); + } else { + RetryLeasingWorkerFromNode(actor, node); + } + } + }); + + if (!status.ok()) { + RetryLeasingWorkerFromNode(actor, node); + } +} + +void GcsActorScheduler::RetryLeasingWorkerFromNode( + std::shared_ptr actor, std::shared_ptr node) { + execute_after(io_context_, + [this, node, actor] { DoRetryLeasingWorkerFromNode(actor, node); }, + RayConfig::instance().gcs_lease_worker_retry_interval_ms()); +} + +void GcsActorScheduler::DoRetryLeasingWorkerFromNode( + std::shared_ptr actor, std::shared_ptr node) { + auto iter = node_to_actors_when_leasing_.find(actor->GetNodeID()); + if (iter != node_to_actors_when_leasing_.end()) { + // If the node is still available, the actor must be still in the + // leasing map as it is erased from leasing map only when + // `CancelOnNode` or the `RequestWorkerLeaseReply` is received from + // the node, so try leasing again. + RAY_CHECK(iter->second.count(actor->GetActorID()) != 0); + RAY_LOG(INFO) << "Retry leasing worker from " << actor->GetNodeID() << " for actor " + << actor->GetActorID(); + LeaseWorkerFromNode(actor, node); + } +} + +void GcsActorScheduler::HandleWorkerLeasedReply( + std::shared_ptr actor, const ray::rpc::RequestWorkerLeaseReply &reply) { + const auto &retry_at_raylet_address = reply.retry_at_raylet_address(); + const auto &worker_address = reply.worker_address(); + if (worker_address.raylet_id().empty()) { + // The worker did not succeed in the lease, but the specified node returned a new + // node, and then try again on the new node. + RAY_CHECK(!retry_at_raylet_address.raylet_id().empty()); + actor->UpdateAddress(retry_at_raylet_address); + auto actor_table_data = + std::make_shared(actor->GetActorTableData()); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, + [this, actor](Status status) { + RAY_CHECK_OK(status); + Schedule(actor); + })); + } else { + // The worker is leased successfully from the specified node. + std::vector resources; + for (auto &resource : reply.resource_mapping()) { + resources.emplace_back(resource); + } + auto leased_worker = std::make_shared( + worker_address, std::move(resources), actor->GetActorID()); + auto node_id = leased_worker->GetNodeID(); + RAY_CHECK(node_to_workers_when_creating_[node_id] + .emplace(leased_worker->GetWorkerID(), leased_worker) + .second); + actor->UpdateAddress(leased_worker->GetAddress()); + CreateActorOnWorker(actor, leased_worker); + } +} + +void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr actor, + std::shared_ptr worker) { + RAY_CHECK(actor && worker); + RAY_LOG(INFO) << "Start creating actor " << actor->GetActorID() << " on worker " + << worker->GetWorkerID() << " at node " << actor->GetNodeID(); + + std::unique_ptr request(new rpc::PushTaskRequest()); + request->set_intended_worker_id(worker->GetWorkerID().Binary()); + request->mutable_task_spec()->CopyFrom( + actor->GetCreationTaskSpecification().GetMessage()); + google::protobuf::RepeatedPtrField resources; + for (auto resource : worker->GetLeasedResources()) { + resources.Add(std::move(resource)); + } + request->mutable_resource_mapping()->CopyFrom(resources); + + auto client = GetOrConnectCoreWorkerClient(worker->GetAddress()); + auto status = client->PushNormalTask( + std::move(request), + [this, actor, worker](Status status, const rpc::PushTaskReply &reply) { + RAY_UNUSED(reply); + // If the actor is still in the creating map and the status is ok, remove the + // actor from the creating map and invoke the schedule_success_handler_. + // Otherwise, create again, because it may be a network exception. + // If the actor is not in the creating map, it means that the actor has been + // cancelled as the worker or node is dead, just do nothing in this case because + // the gcs_actor_manager will reconstruct it again. + auto iter = node_to_workers_when_creating_.find(actor->GetNodeID()); + if (iter != node_to_workers_when_creating_.end()) { + auto worker_iter = iter->second.find(actor->GetWorkerID()); + if (worker_iter != iter->second.end()) { + // The worker is still in the creating map. + if (status.ok()) { + // Remove related core worker client. + RAY_CHECK(core_worker_clients_.erase(actor->GetWorkerID()) != 0); + // Remove related worker in phase of creating. + iter->second.erase(worker_iter); + if (iter->second.empty()) { + node_to_workers_when_creating_.erase(iter); + } + RAY_LOG(INFO) << "Succeeded in creating actor " << actor->GetActorID() + << " on worker " << worker->GetWorkerID() << " at node " + << actor->GetNodeID(); + schedule_success_handler_(actor); + } else { + RetryCreatingActorOnWorker(actor, worker); + } + } + } + }); + if (!status.ok()) { + RetryCreatingActorOnWorker(actor, worker); + } +} + +void GcsActorScheduler::RetryCreatingActorOnWorker( + std::shared_ptr actor, std::shared_ptr worker) { + execute_after(io_context_, + [this, actor, worker] { DoRetryCreatingActorOnWorker(actor, worker); }, + RayConfig::instance().gcs_create_actor_retry_interval_ms()); +} + +void GcsActorScheduler::DoRetryCreatingActorOnWorker( + std::shared_ptr actor, std::shared_ptr worker) { + auto iter = node_to_workers_when_creating_.find(actor->GetNodeID()); + if (iter != node_to_workers_when_creating_.end()) { + auto worker_iter = iter->second.find(actor->GetWorkerID()); + if (worker_iter != iter->second.end()) { + // The worker is still in the creating map, try create again. + // The worker is erased from creating map only when `CancelOnNode` + // or `CancelOnWorker` or the actor is created successfully. + RAY_LOG(INFO) << "Retry creating actor " << actor->GetActorID() << " on worker " + << worker->GetWorkerID() << " at node " << actor->GetNodeID(); + CreateActorOnWorker(actor, worker); + } + } +} + +std::shared_ptr GcsActorScheduler::SelectNodeRandomly() const { + auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); + if (alive_nodes.empty()) { + return nullptr; + } + + static std::mt19937_64 gen_( + std::chrono::high_resolution_clock::now().time_since_epoch().count()); + std::uniform_int_distribution distribution(0, alive_nodes.size() - 1); + int key_index = distribution(gen_); + int index = 0; + auto iter = alive_nodes.begin(); + for (; index != key_index && iter != alive_nodes.end(); ++index, ++iter) + ; + return iter->second; +} + +std::shared_ptr GcsActorScheduler::GetOrConnectLeaseClient( + const rpc::Address &raylet_address) { + auto node_id = ClientID::FromBinary(raylet_address.raylet_id()); + auto iter = remote_lease_clients_.find(node_id); + if (iter == remote_lease_clients_.end()) { + auto lease_client = lease_client_factory_(raylet_address); + iter = remote_lease_clients_.emplace(node_id, std::move(lease_client)).first; + } + return iter->second; +} + +std::shared_ptr +GcsActorScheduler::GetOrConnectCoreWorkerClient(const rpc::Address &worker_address) { + auto worker_id = WorkerID::FromBinary(worker_address.worker_id()); + auto iter = core_worker_clients_.find(worker_id); + if (iter == core_worker_clients_.end()) { + iter = core_worker_clients_.emplace(worker_id, client_factory_(worker_address)).first; + } + return iter->second; +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h new file mode 100644 index 0000000000000..4022a4a8cda0c --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -0,0 +1,243 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_GCS_ACTOR_SCHEDULER_H +#define RAY_GCS_ACTOR_SCHEDULER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "gcs_node_manager.h" + +namespace ray { +namespace gcs { + +using LeaseClientFactoryFn = + std::function(const rpc::Address &address)>; + +class GcsActor; +/// GcsActorScheduler is responsible for scheduling actors registered to GcsActorManager. +/// This class is not thread-safe. +class GcsActorScheduler { + public: + /// Create a GcsActorScheduler + /// + /// \param io_context The main event loop. + /// \param actor_info_accessor Used to flush actor info to storage. + /// \param gcs_node_manager The node manager which is used when scheduling. + /// \param schedule_failure_handler Invoked when there are no available nodes to + /// schedule actors. + /// \param schedule_success_handler Invoked when actors are created on the worker + /// successfully. + /// \param lease_client_factory Factory to create remote lease client, default factor + /// will be used if not set. + /// \param client_factory Factory to create remote core worker client, default factor + /// will be used if not set. + explicit GcsActorScheduler( + boost::asio::io_context &io_context, gcs::ActorInfoAccessor &actor_info_accessor, + const GcsNodeManager &gcs_node_manager, + std::function)> schedule_failure_handler, + std::function)> schedule_success_handler, + LeaseClientFactoryFn lease_client_factory = nullptr, + rpc::ClientFactoryFn client_factory = nullptr); + virtual ~GcsActorScheduler() = default; + + /// Schedule the specified actor. + /// If there is no available nodes then the `schedule_failed_handler_` will be + /// triggered, otherwise the actor will be scheduled until succeed or canceled. + /// + /// \param actor to be scheduled. + void Schedule(std::shared_ptr actor); + + /// Cancel all actors that are being scheduled to the specified node. + /// + /// \param node_id ID of the node where the worker is located. + /// \return ID list of actors associated with the specified node id. + std::vector CancelOnNode(const ClientID &node_id); + + /// Cancel the actor that is being scheduled to the specified worker. + /// + /// \param node_id ID of the node where the worker is located. + /// \param worker_id ID of the worker that the actor is creating on. + /// \return ID of actor associated with the specified node id and worker id. + ActorID CancelOnWorker(const ClientID &node_id, const WorkerID &worker_id); + + protected: + /// The GcsLeasedWorker is kind of abstraction of remote leased worker inside raylet. It + /// contains the address of remote leased worker as well as the leased resources and the + /// ID of the actor associated with this worker. Through this class, we can easily get + /// the WorkerID, Endpoint, NodeID and the associated ActorID of the remote worker. + class GcsLeasedWorker { + public: + /// Create a GcsLeasedWorker + /// + /// \param address the Address of the remote leased worker. + /// \param resources the resources that leased from the remote node(raylet). + /// \param actor_id ID of the actor associated with this leased worker. + explicit GcsLeasedWorker(rpc::Address address, + std::vector resources, + const ActorID &actor_id) + : address_(std::move(address)), + resources_(std::move(resources)), + assigned_actor_id_(actor_id) {} + virtual ~GcsLeasedWorker() = default; + + /// Get the Address of this leased worker. + const rpc::Address &GetAddress() const { return address_; } + + /// Get the ip address of this leased worker. + const std::string &GetIpAddress() const { return address_.ip_address(); } + + /// Get the listening port of the leased worker at remote side. + uint16_t GetPort() const { return address_.port(); } + + /// Get the WorkerID of this leased worker. + WorkerID GetWorkerID() const { return WorkerID::FromBinary(address_.worker_id()); } + + /// Get the NodeID of this leased worker. + ClientID GetNodeID() const { return ClientID::FromBinary(address_.raylet_id()); } + + /// Get the id of the actor which is assigned to this leased worker. + ActorID GetAssignedActorID() const { return assigned_actor_id_; } + + /// Get the leased resources. + const std::vector &GetLeasedResources() const { + return resources_; + } + + protected: + /// The address of the remote leased worker. + rpc::Address address_; + /// The resources leased from remote node. + std::vector resources_; + /// Id of the actor assigned to this worker. + ActorID assigned_actor_id_; + }; + + /// Lease a worker from the specified node for the specified actor. + /// + /// \param actor A description of the actor to create. This object has the resource + /// specification needed to lease workers from the specified node. + /// \param node The node that the worker will be leased from. + void LeaseWorkerFromNode(std::shared_ptr actor, + std::shared_ptr node); + + /// Retry leasing a worker from the specified node for the specified actor. + /// Make it a virtual method so that the io_context_ could be mocked out. + /// + /// \param actor A description of the actor to create. This object has the resource + /// specification needed to lease workers from the specified node. + /// \param node The node that the worker will be leased from. + virtual void RetryLeasingWorkerFromNode(std::shared_ptr actor, + std::shared_ptr node); + + /// This method is only invoked inside `RetryLeasingWorkerFromNode`, the purpose of this + /// is to make it easy to write unit tests. + /// + /// \param actor A description of the actor to create. This object has the resource + /// specification needed to lease workers from the specified node. + /// \param node The node that the worker will be leased from. + void DoRetryLeasingWorkerFromNode(std::shared_ptr actor, + std::shared_ptr node); + + /// Handler to process a granted lease. + /// + /// \param actor Contains the resources needed to lease workers from the specified node. + /// \param reply The reply of `RequestWorkerLeaseRequest`. + void HandleWorkerLeasedReply(std::shared_ptr actor, + const rpc::RequestWorkerLeaseReply &reply); + + /// Create the specified actor on the specified worker. + /// + /// \param actor The actor to be created. + /// \param worker The worker that the actor will created on. + void CreateActorOnWorker(std::shared_ptr actor, + std::shared_ptr worker); + + /// Retry creating the specified actor on the specified worker asynchoronously. + /// Make it a virtual method so that the io_context_ could be mocked out. + /// + /// \param actor The actor to be created. + /// \param worker The worker that the actor will created on. + virtual void RetryCreatingActorOnWorker(std::shared_ptr actor, + std::shared_ptr worker); + + /// This method is only invoked inside `RetryCreatingActorOnWorker`, the purpose of this + /// is to make it easy to write unit tests. + /// + /// \param actor The actor to be created. + /// \param worker The worker that the actor will created on. + void DoRetryCreatingActorOnWorker(std::shared_ptr actor, + std::shared_ptr worker); + + /// Select a node from alive nodes randomly. + std::shared_ptr SelectNodeRandomly() const; + + /// Get an existing lease client or connect a new one. + std::shared_ptr GetOrConnectLeaseClient( + const rpc::Address &raylet_address); + + /// Get or create CoreWorkerClient to communicate with the remote leased worker. + std::shared_ptr GetOrConnectCoreWorkerClient( + const rpc::Address &worker_address); + + protected: + /// The io loop which is used to construct `client_call_manager_` and delay execution of + /// tasks(e.g. execute_after). + boost::asio::io_context &io_context_; + /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. + rpc::ClientCallManager client_call_manager_; + /// The actor info accessor. + gcs::ActorInfoAccessor &actor_info_accessor_; + /// Map from node ID to the set of actors for whom we are trying to acquire a lease from + /// that node. This is needed so that we can retry lease requests from the node until we + /// receive a reply or the node is removed. + absl::flat_hash_map> + node_to_actors_when_leasing_; + /// Map from node ID to the workers on which we are trying to create actors. This is + /// needed so that we can cancel actor creation requests if the worker is removed. + absl::flat_hash_map>> + node_to_workers_when_creating_; + /// The cached node clients which are used to communicate with raylet to lease workers. + absl::flat_hash_map> + remote_lease_clients_; + /// The cached core worker clients which are used to communicate with leased worker. + absl::flat_hash_map> + core_worker_clients_; + /// Reference of GcsNodeManager. + const GcsNodeManager &gcs_node_manager_; + /// The handler to handle the scheduling failures. + std::function)> schedule_failure_handler_; + /// The handler to handle the successful scheduling. + std::function)> schedule_success_handler_; + /// Factory for producing new clients to request leases from remote nodes. + LeaseClientFactoryFn lease_client_factory_; + /// Factory for producing new core worker clients. + rpc::ClientFactoryFn client_factory_; +}; + +} // namespace gcs +} // namespace ray + +#endif // RAY_GCS_ACTOR_SCHEDULER_H diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index a8b541f42f1ea..30f7a04a09628 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -1,16 +1,29 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include "gcs_node_manager.h" #include #include -#include -#include "ray/gcs/redis_gcs_client.h" +#include namespace ray { namespace gcs { - GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service, - std::shared_ptr gcs_client) - : client_call_manager_(io_service), - gcs_client_(std::move(gcs_client)), + gcs::NodeInfoAccessor &node_info_accessor, + gcs::ErrorInfoAccessor &error_info_accessor) + : node_info_accessor_(node_info_accessor), + error_info_accessor_(error_info_accessor), num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), heartbeat_timer_(io_service) { Start(); @@ -24,26 +37,26 @@ void GcsNodeManager::HandleHeartbeat(const ClientID &node_id, void GcsNodeManager::Start() { RAY_LOG(INFO) << "Starting gcs node manager."; - const auto lookup_callback = [this](Status status, - const std::vector &node_info_list) { - for (const auto &node_info : node_info_list) { - if (node_info.state() != rpc::GcsNodeInfo::DEAD) { - // If there're any existing alive clients in client table, add them to - // our `heartbeats_` cache. Thus, if they died before monitor starts, - // we can also detect their death. - // Use `emplace` instead of `operator []` because we just want to add this - // client to `heartbeats_` only if it has not yet received heartbeat event. - // Besides, it is not necessary to add an empty `HeartbeatTableData` - // to `heartbeat_buffer_` as it doesn't make sense to broadcast an empty - // message to the cluster and it's ok to add it when actually receive - // its heartbeat event. - heartbeats_.emplace(ClientID::FromBinary(node_info.node_id()), - num_heartbeats_timeout_); - } - } - Tick(); - }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); + const auto lookup_callback = + [this](Status status, const std::vector &node_info_list) { + for (const auto &node_info : node_info_list) { + if (node_info.state() != rpc::GcsNodeInfo::DEAD) { + // If there're any existing alive clients in client table, add them to + // our `heartbeats_` cache. Thus, if they died before monitor starts, + // we can also detect their death. + // Use `emplace` instead of `operator []` because we just want to add this + // client to `heartbeats_` only if it has not yet received heartbeat event. + // Besides, it is not necessary to add an empty `HeartbeatTableData` + // to `heartbeat_buffer_` as it doesn't make sense to broadcast an empty + // message to the cluster and it's ok to add it when actually receive + // its heartbeat event. + heartbeats_.emplace(ClientID::FromBinary(node_info.node_id()), + num_heartbeats_timeout_); + } + } + Tick(); + }; + RAY_CHECK_OK(node_info_accessor_.AsyncGetAll(lookup_callback)); } /// A periodic timer that checks for timed out clients. @@ -55,55 +68,57 @@ void GcsNodeManager::Tick() { void GcsNodeManager::DetectDeadNodes() { for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { - it->second = it->second - 1; - if (it->second == 0) { - if (dead_nodes_.count(it->first) == 0) { - auto node_id = it->first; + auto current = it++; + current->second = current->second - 1; + if (current->second == 0) { + if (dead_nodes_.count(current->first) == 0) { + auto node_id = current->first; RAY_LOG(WARNING) << "Node timed out: " << node_id; - auto lookup_callback = [this, node_id](Status status, - const std::vector &all_node) { + auto lookup_callback = [this, node_id]( + Status status, + const std::vector &all_node) { RAY_CHECK_OK(status); bool marked = false; for (const auto &node : all_node) { - if (node_id.Binary() == node.node_id() && node.state() == GcsNodeInfo::DEAD) { + if (node_id.Binary() == node.node_id() && + node.state() == rpc::GcsNodeInfo::DEAD) { // The node has been marked dead by itself. marked = true; break; } } if (!marked) { - RAY_CHECK_OK(gcs_client_->Nodes().AsyncUnregister(node_id, nullptr)); + RemoveNode(node_id); + RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, nullptr)); // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. // TODO(rkn): Define this constant somewhere else. std::string type = "node_removed"; std::ostringstream error_message; - error_message << "The node with node ID " << node_id + error_message << "The node with node id " << node_id << " has been marked dead because the monitor" << " has missed too many heartbeats from it."; auto error_data_ptr = gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); RAY_CHECK_OK( - gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); + error_info_accessor_.AsyncReportJobError(error_data_ptr, nullptr)); } }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); + RAY_CHECK_OK(node_info_accessor_.AsyncGetAll(lookup_callback)); dead_nodes_.insert(node_id); } - it = heartbeats_.erase(it); - } else { - it++; + heartbeats_.erase(current); } } } void GcsNodeManager::SendBatchedHeartbeat() { if (!heartbeat_buffer_.empty()) { - auto batch = std::make_shared(); + auto batch = std::make_shared(); for (const auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->CopyFrom(heartbeat.second); } - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); + RAY_CHECK_OK(node_info_accessor_.AsyncReportBatchHeartbeat(batch, nullptr)); heartbeat_buffer_.clear(); } } @@ -123,5 +138,42 @@ void GcsNodeManager::ScheduleTick() { }); } +std::shared_ptr GcsNodeManager::GetNode( + const ray::ClientID &node_id) const { + auto iter = alive_nodes_.find(node_id); + if (iter == alive_nodes_.end()) { + return nullptr; + } + + return iter->second; +} + +const absl::flat_hash_map> + &GcsNodeManager::GetAllAliveNodes() const { + return alive_nodes_; +} + +void GcsNodeManager::AddNode(std::shared_ptr node) { + auto node_id = ClientID::FromBinary(node->node_id()); + auto iter = alive_nodes_.find(node_id); + if (iter == alive_nodes_.end()) { + alive_nodes_.emplace(node_id, node); + for (auto &listener : node_added_listeners_) { + listener(node); + } + } +} + +void GcsNodeManager::RemoveNode(const ray::ClientID &node_id) { + auto iter = alive_nodes_.find(node_id); + if (iter != alive_nodes_.end()) { + auto node = std::move(iter->second); + alive_nodes_.erase(iter); + for (auto &listener : node_removed_listeners_) { + listener(node); + } + } +} + } // namespace gcs -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 2a389908714cf..2e84e4dae418e 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -1,23 +1,82 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #ifndef RAY_GCS_NODE_MANAGER_H #define RAY_GCS_NODE_MANAGER_H #include +#include #include #include +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" namespace ray { - namespace gcs { -class RedisGcsClient; /// GcsNodeManager is responsible for managing and monitoring nodes. +/// This class is not thread-safe. class GcsNodeManager { public: /// Create a GcsNodeManager. /// /// \param io_service The event loop to run the monitor on. - /// \param gcs_client The client of gcs to access/pub/sub data. + /// \param node_info_accessor The node info accessor. + /// \param error_info_accessor The error info accessor, which is used to report error + /// when detecting the death of nodes. explicit GcsNodeManager(boost::asio::io_service &io_service, - std::shared_ptr gcs_client); + gcs::NodeInfoAccessor &node_info_accessor, + gcs::ErrorInfoAccessor &error_info_accessor); + + /// Add an alive node. + /// + /// \param node The info of the node to be added. + void AddNode(std::shared_ptr node); + + /// Remove from alive nodes. + /// + /// \param node_id The ID of the node to be removed. + void RemoveNode(const ClientID &node_id); + + /// Get alive node by ID. + /// + /// \param node_id The id of the node. + /// \return the node if it is alive else return nullptr. + std::shared_ptr GetNode(const ClientID &node_id) const; + + /// Get all alive nodes. + /// + /// \return all alive nodes. + const absl::flat_hash_map> + &GetAllAliveNodes() const; + + /// Add listener to monitor the remove action of nodes. + /// + /// \param listener The handler which process the remove of nodes. + void AddNodeRemovedListener( + std::function)> listener) { + RAY_CHECK(listener); + node_removed_listeners_.emplace_back(std::move(listener)); + } + + /// Add listener to monitor the add action of nodes. + /// + /// \param listener The handler which process the add of nodes. + void AddNodeAddedListener( + std::function)> listener) { + RAY_CHECK(listener); + node_added_listeners_.emplace_back(std::move(listener)); + } /// Handle a heartbeat from a Raylet. /// @@ -47,20 +106,29 @@ class GcsNodeManager { void ScheduleTick(); private: - rpc::ClientCallManager client_call_manager_; - /// A client to the GCS, through which heartbeats are received. - std::shared_ptr gcs_client_; + /// Alive nodes. + absl::flat_hash_map> alive_nodes_; + /// Node info accessor. + gcs::NodeInfoAccessor &node_info_accessor_; + /// Error info accessor. + gcs::ErrorInfoAccessor &error_info_accessor_; /// The number of heartbeats that can be missed before a node is removed. int64_t num_heartbeats_timeout_; /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. boost::asio::deadline_timer heartbeat_timer_; /// For each Raylet that we receive a heartbeat from, the number of ticks /// that may pass before the Raylet will be declared dead. - std::unordered_map heartbeats_; + absl::flat_hash_map heartbeats_; /// The Raylets that have been marked as dead in gcs. - std::unordered_set dead_nodes_; + absl::flat_hash_set dead_nodes_; /// A buffer containing heartbeats received from node managers in the last tick. - std::unordered_map heartbeat_buffer_; + absl::flat_hash_map heartbeat_buffer_; + /// Listeners which monitors the addition of nodes. + std::vector)>> + node_added_listeners_; + /// Listeners which monitors the removal of nodes. + std::vector)>> + node_removed_listeners_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index c455024f785a8..91ac168a508e6 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -15,6 +15,7 @@ #include "gcs_server.h" #include "actor_info_handler_impl.h" #include "error_info_handler_impl.h" +#include "gcs_actor_manager.h" #include "gcs_node_manager.h" #include "job_info_handler_impl.h" #include "node_info_handler_impl.h" @@ -45,6 +46,9 @@ void GcsServer::Start() { main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); }); gcs_redis_failure_detector_->Start(); + // Init gcs actor manager + InitGcsActorManager(); + // Register rpc service. job_info_handler_ = InitJobInfoHandler(); job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_)); @@ -119,7 +123,15 @@ void GcsServer::InitBackendClient() { } void GcsServer::InitGcsNodeManager() { - gcs_node_manager_ = std::make_shared(main_service_, redis_gcs_client_); + RAY_CHECK(redis_gcs_client_ != nullptr); + gcs_node_manager_ = std::make_shared( + main_service_, redis_gcs_client_->Nodes(), redis_gcs_client_->Errors()); +} + +void GcsServer::InitGcsActorManager() { + RAY_CHECK(redis_gcs_client_ != nullptr && gcs_node_manager_ != nullptr); + gcs_actor_manager_ = std::make_shared( + main_service_, redis_gcs_client_->Actors(), *gcs_node_manager_); } std::unique_ptr GcsServer::InitJobInfoHandler() { @@ -129,7 +141,7 @@ std::unique_ptr GcsServer::InitJobInfoHandler() { std::unique_ptr GcsServer::InitActorInfoHandler() { return std::unique_ptr( - new rpc::DefaultActorInfoHandler(*redis_gcs_client_)); + new rpc::DefaultActorInfoHandler(*redis_gcs_client_, *gcs_actor_manager_)); } std::unique_ptr GcsServer::InitNodeInfoHandler() { @@ -198,7 +210,7 @@ std::unique_ptr GcsServer::InitErrorInfoHandler() { std::unique_ptr GcsServer::InitWorkerInfoHandler() { return std::unique_ptr( - new rpc::DefaultWorkerInfoHandler(*redis_gcs_client_)); + new rpc::DefaultWorkerInfoHandler(*redis_gcs_client_, *gcs_actor_manager_)); } } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index b367253250493..0c789c67d13a9 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -34,6 +34,7 @@ struct GcsServerConfig { }; class GcsNodeManager; +class GcsActorManager; /// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent /// transmit the command to the backend reliable storage for the time being. @@ -72,6 +73,9 @@ class GcsServer { /// cluster. virtual void InitGcsNodeManager(); + /// Initialize the gcs node manager. + virtual void InitGcsActorManager(); + /// The job info handler virtual std::unique_ptr InitJobInfoHandler(); @@ -114,6 +118,8 @@ class GcsServer { std::shared_ptr gcs_node_manager_; /// The gcs redis failure detector. std::shared_ptr gcs_redis_failure_detector_; + /// The gcs actor manager + std::shared_ptr gcs_actor_manager_; /// Job info handler and service std::unique_ptr job_info_handler_; std::unique_ptr job_info_service_; diff --git a/src/ray/gcs/gcs_server/node_info_handler_impl.cc b/src/ray/gcs/gcs_server/node_info_handler_impl.cc index 2e16b55a2556e..b2c15b00097d0 100644 --- a/src/ray/gcs/gcs_server/node_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/node_info_handler_impl.cc @@ -23,7 +23,7 @@ void DefaultNodeInfoHandler::HandleRegisterNode( rpc::SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.node_info().node_id()); RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id; - + gcs_node_manager_.AddNode(std::make_shared(request.node_info())); auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to register node info: " << status.ToString() @@ -44,6 +44,7 @@ void DefaultNodeInfoHandler::HandleUnregisterNode( rpc::SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.node_id()); RAY_LOG(DEBUG) << "Unregistering node info, node id = " << node_id; + gcs_node_manager_.RemoveNode(node_id); auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc new file mode 100644 index 0000000000000..4dc5ccc880ffd --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -0,0 +1,173 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include "gtest/gtest.h" + +namespace ray { + +class MockedGcsActorManager : public gcs::GcsActorManager { + public: + explicit MockedGcsActorManager(boost::asio::io_context &io_context, + gcs::ActorInfoAccessor &actor_info_accessor, + gcs::GcsNodeManager &gcs_node_manager, + gcs::LeaseClientFactoryFn lease_client_factory = nullptr, + rpc::ClientFactoryFn client_factory = nullptr) + : gcs::GcsActorManager(io_context, actor_info_accessor, gcs_node_manager, + lease_client_factory, client_factory) { + gcs_actor_scheduler_.reset(new Mocker::MockedGcsActorScheduler( + io_context, actor_info_accessor, gcs_node_manager, + /*schedule_failure_handler=*/ + [this](std::shared_ptr actor) { + // When there are no available nodes to schedule the actor the + // gcs_actor_scheduler will treat it as failed and invoke this handler. In + // this case, the actor should be appended to the `pending_actors_` and wait + // for the registration of new node. + pending_actors_.emplace_back(std::move(actor)); + }, + /*schedule_success_handler=*/ + [this](std::shared_ptr actor) { + OnActorCreateSuccess(std::move(actor)); + }, + std::move(lease_client_factory), std::move(client_factory))); + } + + public: + void ResetLeaseClientFactory(gcs::LeaseClientFactoryFn lease_client_factory) { + auto gcs_actor_scheduler = + dynamic_cast(gcs_actor_scheduler_.get()); + gcs_actor_scheduler->ResetLeaseClientFactory(std::move(lease_client_factory)); + } + + void ResetClientFactory(rpc::ClientFactoryFn client_factory) { + auto gcs_actor_scheduler = + dynamic_cast(gcs_actor_scheduler_.get()); + gcs_actor_scheduler->ResetClientFactory(std::move(client_factory)); + } + + const absl::flat_hash_map> + &GetAllRegisteredActors() const { + return registered_actors_; + } + + const std::vector> &GetAllPendingActors() const { + return pending_actors_; + } +}; + +class GcsActorManagerTest : public ::testing::Test { + public: + void SetUp() override { + raylet_client_ = std::make_shared(); + worker_client_ = std::make_shared(); + gcs_node_manager_ = std::make_shared( + io_service_, node_info_accessor_, error_info_accessor_); + gcs_actor_manager_ = std::make_shared( + io_service_, actor_info_accessor_, *gcs_node_manager_, + /*lease_client_factory=*/ + [this](const rpc::Address &address) { return raylet_client_; }, + /*client_factory=*/ + [this](const rpc::Address &address) { return worker_client_; }); + } + + protected: + boost::asio::io_service io_service_; + Mocker::MockedActorInfoAccessor actor_info_accessor_; + Mocker::MockedNodeInfoAccessor node_info_accessor_; + Mocker::MockedErrorInfoAccessor error_info_accessor_; + + std::shared_ptr raylet_client_; + std::shared_ptr worker_client_; + std::shared_ptr gcs_node_manager_; + std::shared_ptr gcs_actor_manager_; +}; + +TEST_F(GcsActorManagerTest, TestNormalFlow) { + gcs_actor_manager_->ResetLeaseClientFactory([this](const rpc::Address &address) { + raylet_client_->auto_grant_node_id = ClientID::FromBinary(address.raylet_id()); + return raylet_client_; + }); + gcs_actor_manager_->ResetClientFactory([this](const rpc::Address &address) { + worker_client_->enable_auto_reply = true; + return worker_client_; + }); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = + Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/2); + std::vector> registered_actors; + gcs_actor_manager_->RegisterActor( + create_actor_request, [®istered_actors](std::shared_ptr actor) { + registered_actors.emplace_back(actor); + }); + + ASSERT_EQ(1, registered_actors.size()); + ASSERT_EQ(1, gcs_actor_manager_->GetAllRegisteredActors().size()); + ASSERT_EQ(1, gcs_actor_manager_->GetAllPendingActors().size()); + + auto actor = registered_actors.front(); + ASSERT_EQ(rpc::ActorTableData::PENDING, actor->GetState()); + + // Add node_1 and then check if the actor is in state `ALIVE` + auto node_1 = Mocker::GenNodeInfo(); + auto node_id_1 = ClientID::FromBinary(node_1->node_id()); + gcs_node_manager_->AddNode(node_1); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size()); + ASSERT_EQ(rpc::ActorTableData::ALIVE, actor->GetState()); + ASSERT_EQ(node_id_1, actor->GetNodeID()); + + // Remove node_1 and then check if the actor is in state `RECONSTRUCTING` + gcs_node_manager_->RemoveNode(node_id_1); + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + ASSERT_EQ(1, gcs_actor_manager_->GetAllPendingActors().size()); + ASSERT_EQ(rpc::ActorTableData::RECONSTRUCTING, actor->GetState()); + + // Add node_2 and then check if the actor is alive again. + auto node_2 = Mocker::GenNodeInfo(); + auto node_id_2 = ClientID::FromBinary(node_2->node_id()); + gcs_node_manager_->AddNode(node_2); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size()); + ASSERT_EQ(rpc::ActorTableData::ALIVE, actor->GetState()); + ASSERT_EQ(node_id_2, actor->GetNodeID()); + + // Add node_3. + auto node_3 = Mocker::GenNodeInfo(); + auto node_id_3 = ClientID::FromBinary(node_3->node_id()); + gcs_node_manager_->AddNode(node_3); + ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); + + // Remove node_2 and then check if the actor drift to node_3. + gcs_node_manager_->RemoveNode(node_id_2); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size()); + ASSERT_EQ(rpc::ActorTableData::ALIVE, actor->GetState()); + ASSERT_EQ(node_id_3, actor->GetNodeID()); + + // Remove node_3 and then check if the actor is dead. + gcs_node_manager_->RemoveNode(node_id_3); + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size()); + ASSERT_EQ(rpc::ActorTableData::DEAD, actor->GetState()); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc new file mode 100644 index 0000000000000..2c4b95c22592a --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -0,0 +1,364 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace ray { + +class GcsActorSchedulerTest : public ::testing::Test { + public: + void SetUp() override { + raylet_client_ = std::make_shared(); + worker_client_ = std::make_shared(); + gcs_node_manager_ = std::make_shared( + io_service_, node_info_accessor_, error_info_accessor_); + gcs_actor_scheduler_ = std::make_shared( + io_service_, actor_info_accessor_, *gcs_node_manager_, + /*schedule_failure_handler=*/ + [this](std::shared_ptr actor) { + failure_actors_.emplace_back(std::move(actor)); + }, + /*schedule_success_handler=*/ + [this](std::shared_ptr actor) { + success_actors_.emplace_back(std::move(actor)); + }, + /*lease_client_factory=*/ + [this](const rpc::Address &address) { return raylet_client_; }, + /*client_factory=*/ + [this](const rpc::Address &address) { return worker_client_; }); + } + + protected: + boost::asio::io_service io_service_; + Mocker::MockedActorInfoAccessor actor_info_accessor_; + Mocker::MockedNodeInfoAccessor node_info_accessor_; + Mocker::MockedErrorInfoAccessor error_info_accessor_; + + std::shared_ptr raylet_client_; + std::shared_ptr worker_client_; + std::shared_ptr gcs_node_manager_; + std::shared_ptr gcs_actor_scheduler_; + std::vector> success_actors_; + std::vector> failure_actors_; +}; + +TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) { + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with zero node. + gcs_actor_scheduler_->Schedule(actor); + + // The lease request should not be send and the scheduling of actor should fail as there + // are no available nodes. + ASSERT_EQ(raylet_client_->num_workers_requested, 0); + ASSERT_EQ(0, success_actors_.size()); + ASSERT_EQ(1, failure_actors_.size()); + ASSERT_EQ(actor, failure_actors_.front()); +} + +TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, failure_actors_.size()); + ASSERT_EQ(1, success_actors_.size()); + ASSERT_EQ(actor, success_actors_.front()); +} + +TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_leasing_count_); + + // Mock a IOError reply, then the lease request will retry again. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil(), Status::IOError(""))); + ASSERT_EQ(1, gcs_actor_scheduler_->num_retry_leasing_count_); + ASSERT_EQ(2, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, failure_actors_.size()); + ASSERT_EQ(1, success_actors_.size()); + ASSERT_EQ(actor, success_actors_.front()); +} + +TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_creating_count_); + + // Reply a IOError, then the actor creation request will retry again. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); + ASSERT_EQ(1, gcs_actor_scheduler_->num_retry_creating_count_); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, failure_actors_.size()); + ASSERT_EQ(1, success_actors_.size()); + ASSERT_EQ(actor, success_actors_.front()); +} + +TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + + // Remove the node and cancel the scheduling on this node, the scheduling should be + // interrupted. + gcs_node_manager_->RemoveNode(node_id); + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + auto actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id); + ASSERT_EQ(1, actor_ids.size()); + ASSERT_EQ(actor->GetActorID(), actor_ids.front()); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + + // Grant a worker, which will influence nothing. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_leasing_count_); + + ASSERT_EQ(0, success_actors_.size()); + ASSERT_EQ(0, failure_actors_.size()); +} + +TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Remove the node and cancel the scheduling on this node, the scheduling should be + // interrupted. + gcs_node_manager_->RemoveNode(node_id); + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + auto actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id); + ASSERT_EQ(1, actor_ids.size()); + ASSERT_EQ(actor->GetActorID(), actor_ids.front()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, which will influence nothing. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_creating_count_); + + ASSERT_EQ(0, success_actors_.size()); + ASSERT_EQ(0, failure_actors_.size()); +} + +TEST_F(GcsActorSchedulerTest, TestWorkerFailedWhenCreating) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + auto worker_id = WorkerID::FromRandom(); + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(), + node->node_manager_port(), worker_id, + node_id, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Cancel the scheduling on this node, the scheduling should be interrupted. + ASSERT_EQ(actor->GetActorID(), + gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id)); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, which will influence nothing. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_creating_count_); + + ASSERT_EQ(0, success_actors_.size()); + ASSERT_EQ(0, failure_actors_.size()); +} + +TEST_F(GcsActorSchedulerTest, TestSpillback) { + auto node1 = Mocker::GenNodeInfo(); + auto node_id_1 = ClientID::FromBinary(node1->node_id()); + gcs_node_manager_->AddNode(node1); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Add another node. + auto node2 = Mocker::GenNodeInfo(); + auto node_id_2 = ClientID::FromBinary(node2->node_id()); + gcs_node_manager_->AddNode(node2); + ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); + + // Grant with a spillback node(node2), and the lease request should be send to the + // node2. + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node2->node_manager_address(), + node2->node_manager_port(), + WorkerID::Nil(), node_id_1, node_id_2)); + ASSERT_EQ(2, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node2->node_manager_address(), node2->node_manager_port(), WorkerID::FromRandom(), + node_id_2, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + ASSERT_EQ(node_id_2, actor->GetNodeID()); + + ASSERT_EQ(0, failure_actors_.size()); + ASSERT_EQ(1, success_actors_.size()); + ASSERT_EQ(actor, success_actors_.front()); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc new file mode 100644 index 0000000000000..6f2d96e2e0a70 --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -0,0 +1,85 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include "gtest/gtest.h" + +namespace ray { +class GcsNodeManagerTest : public ::testing::Test {}; + +TEST_F(GcsNodeManagerTest, TestManagement) { + boost::asio::io_service io_service; + auto node_info_accessor = Mocker::MockedNodeInfoAccessor(); + auto error_info_accessor = Mocker::MockedErrorInfoAccessor(); + gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor); + // Test Add/Get/Remove functionality. + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + + node_manager.AddNode(node); + ASSERT_EQ(node, node_manager.GetNode(node_id)); + + node_manager.RemoveNode(node_id); + ASSERT_EQ(nullptr, node_manager.GetNode(node_id)); +} + +TEST_F(GcsNodeManagerTest, TestListener) { + boost::asio::io_service io_service; + auto node_info_accessor = Mocker::MockedNodeInfoAccessor(); + auto error_info_accessor = Mocker::MockedErrorInfoAccessor(); + gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor); + // Test AddNodeAddedListener. + int node_count = 1000; + std::vector> added_nodes; + node_manager.AddNodeAddedListener( + [&added_nodes](std::shared_ptr node) { + added_nodes.emplace_back(std::move(node)); + }); + for (int i = 0; i < node_count; ++i) { + auto node = Mocker::GenNodeInfo(); + node_manager.AddNode(node); + } + ASSERT_EQ(node_count, added_nodes.size()); + + // Test GetAllAliveNodes. + auto &alive_nodes = node_manager.GetAllAliveNodes(); + ASSERT_EQ(added_nodes.size(), alive_nodes.size()); + for (const auto &node : added_nodes) { + ASSERT_EQ(1, alive_nodes.count(ClientID::FromBinary(node->node_id()))); + } + + // Test AddNodeRemovedListener. + std::vector> removed_nodes; + node_manager.AddNodeRemovedListener( + [&removed_nodes](std::shared_ptr node) { + removed_nodes.emplace_back(std::move(node)); + }); + for (int i = 0; i < node_count; ++i) { + node_manager.RemoveNode(ClientID::FromBinary(added_nodes[i]->node_id())); + } + ASSERT_EQ(node_count, removed_nodes.size()); + ASSERT_TRUE(node_manager.GetAllAliveNodes().empty()); + for (int i = 0; i < node_count; ++i) { + ASSERT_EQ(added_nodes[i], removed_nodes[i]); + } +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/gcs_server/test/gcs_test_util.h b/src/ray/gcs/gcs_server/test/gcs_test_util.h new file mode 100644 index 0000000000000..158cf9e13b533 --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_test_util.h @@ -0,0 +1,392 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_GCS_TEST_UTIL_H +#define RAY_GCS_TEST_UTIL_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace ray { + +struct Mocker { + static TaskSpecification GenActorCreationTask(const JobID &job_id, + int max_reconstructions = 100) { + TaskSpecBuilder builder; + rpc::Address empty_address; + ray::FunctionDescriptor empty_descriptor = + ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""); + auto actor_id = ActorID::Of(job_id, RandomTaskId(), 0); + auto task_id = TaskID::ForActorCreationTask(actor_id); + builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id, + TaskID::Nil(), 0, TaskID::Nil(), empty_address, 1, {}, {}); + builder.SetActorCreationTaskSpec(actor_id, max_reconstructions); + return builder.Build(); + } + + static rpc::CreateActorRequest GenCreateActorRequest(const JobID &job_id, + int max_reconstructions = 100) { + rpc::CreateActorRequest request; + auto actor_creation_task_spec = GenActorCreationTask(job_id, max_reconstructions); + request.mutable_task_spec()->CopyFrom(actor_creation_task_spec.GetMessage()); + return request; + } + + static std::shared_ptr GenNodeInfo(uint16_t port = 0) { + auto node = std::make_shared(); + node->set_node_id(ClientID::FromRandom().Binary()); + node->set_node_manager_port(port); + node->set_node_manager_address("127.0.0.1"); + return node; + } + + class MockWorkerClient : public rpc::CoreWorkerClientInterface { + public: + ray::Status PushNormalTask( + std::unique_ptr request, + const rpc::ClientCallback &callback) override { + callbacks.push_back(callback); + if (enable_auto_reply) { + ReplyPushTask(); + } + return Status::OK(); + } + + bool ReplyPushTask(Status status = Status::OK(), bool exit = false) { + if (callbacks.size() == 0) { + return false; + } + auto callback = callbacks.front(); + auto reply = rpc::PushTaskReply(); + if (exit) { + reply.set_worker_exiting(true); + } + callback(status, reply); + callbacks.pop_front(); + return true; + } + + bool enable_auto_reply = false; + std::list> callbacks; + }; + + class MockRayletClient : public WorkerLeaseInterface { + public: + ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id, + bool disconnect_worker) override { + if (disconnect_worker) { + num_workers_disconnected++; + } else { + num_workers_returned++; + } + return Status::OK(); + } + + ray::Status RequestWorkerLease( + const ray::TaskSpecification &resource_spec, + const rpc::ClientCallback &callback) override { + num_workers_requested += 1; + callbacks.push_back(callback); + if (!auto_grant_node_id.IsNil()) { + GrantWorkerLease("", 0, WorkerID::FromRandom(), auto_grant_node_id, + ClientID::Nil()); + } + return Status::OK(); + } + + ray::Status CancelWorkerLease( + const TaskID &task_id, + const rpc::ClientCallback &callback) override { + num_leases_canceled += 1; + cancel_callbacks.push_back(callback); + return Status::OK(); + } + + // Trigger reply to RequestWorkerLease. + bool GrantWorkerLease(const std::string &address, int port, const WorkerID &worker_id, + const ClientID &raylet_id, const ClientID &retry_at_raylet_id, + Status status = Status::OK()) { + rpc::RequestWorkerLeaseReply reply; + if (!retry_at_raylet_id.IsNil()) { + reply.mutable_retry_at_raylet_address()->set_ip_address(address); + reply.mutable_retry_at_raylet_address()->set_port(port); + reply.mutable_retry_at_raylet_address()->set_raylet_id( + retry_at_raylet_id.Binary()); + } else { + reply.mutable_worker_address()->set_ip_address(address); + reply.mutable_worker_address()->set_port(port); + reply.mutable_worker_address()->set_raylet_id(raylet_id.Binary()); + reply.mutable_worker_address()->set_worker_id(worker_id.Binary()); + } + if (callbacks.size() == 0) { + return false; + } else { + auto callback = callbacks.front(); + callback(status, reply); + callbacks.pop_front(); + return true; + } + } + + bool ReplyCancelWorkerLease(bool success = true) { + rpc::CancelWorkerLeaseReply reply; + reply.set_success(success); + if (cancel_callbacks.size() == 0) { + return false; + } else { + auto callback = cancel_callbacks.front(); + callback(Status::OK(), reply); + cancel_callbacks.pop_front(); + return true; + } + } + + ~MockRayletClient() {} + + int num_workers_requested = 0; + int num_workers_returned = 0; + int num_workers_disconnected = 0; + int num_leases_canceled = 0; + ClientID auto_grant_node_id; + std::list> callbacks = {}; + std::list> cancel_callbacks = {}; + }; + + class MockedGcsActorScheduler : public gcs::GcsActorScheduler { + public: + using gcs::GcsActorScheduler::GcsActorScheduler; + + void ResetLeaseClientFactory(gcs::LeaseClientFactoryFn lease_client_factory) { + lease_client_factory_ = std::move(lease_client_factory); + } + + void ResetClientFactory(rpc::ClientFactoryFn client_factory) { + client_factory_ = std::move(client_factory); + } + + protected: + void RetryLeasingWorkerFromNode(std::shared_ptr actor, + std::shared_ptr node) override { + ++num_retry_leasing_count_; + DoRetryLeasingWorkerFromNode(actor, node); + } + + void RetryCreatingActorOnWorker(std::shared_ptr actor, + std::shared_ptr worker) override { + ++num_retry_creating_count_; + DoRetryCreatingActorOnWorker(actor, worker); + } + + public: + int num_retry_leasing_count_ = 0; + int num_retry_creating_count_ = 0; + }; + + class MockedActorInfoAccessor : public gcs::ActorInfoAccessor { + public: + Status GetAll(std::vector *actor_table_data_list) override { + return Status::NotImplemented(""); + } + + Status AsyncGet( + const ActorID &actor_id, + const gcs::OptionalItemCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncCreateActor(const TaskSpecification &task_spec, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncRegister(const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncUpdate(const ActorID &actor_id, + const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + if (callback) { + callback(Status::OK()); + } + return Status::OK(); + } + + Status AsyncSubscribeAll( + const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + Status AsyncSubscribe( + const ActorID &actor_id, + const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + Status AsyncUnsubscribe(const ActorID &actor_id, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + Status AsyncAddCheckpoint(const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncGetCheckpoint( + const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, + const gcs::OptionalItemCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncGetCheckpointID( + const ActorID &actor_id, + const gcs::OptionalItemCallback &callback) override { + return Status::NotImplemented(""); + } + }; + + class MockedNodeInfoAccessor : public gcs::NodeInfoAccessor { + public: + Status RegisterSelf(const rpc::GcsNodeInfo &local_node_info) override { + return Status::NotImplemented(""); + } + + Status UnregisterSelf() override { return Status::NotImplemented(""); } + + const ClientID &GetSelfId() const override { + static ClientID node_id; + return node_id; + } + + const rpc::GcsNodeInfo &GetSelfInfo() const override { + static rpc::GcsNodeInfo node_info; + return node_info; + } + + Status AsyncRegister(const rpc::GcsNodeInfo &node_info, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncUnregister(const ClientID &node_id, + const gcs::StatusCallback &callback) override { + if (callback) { + callback(Status::OK()); + } + return Status::OK(); + } + + Status AsyncGetAll( + const gcs::MultiItemCallback &callback) override { + if (callback) { + callback(Status::OK(), {}); + } + return Status::OK(); + } + + Status AsyncSubscribeToNodeChange( + const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + boost::optional Get(const ClientID &node_id) const override { + return boost::none; + } + + const std::unordered_map &GetAll() const override { + static std::unordered_map node_info_list; + return node_info_list; + } + + bool IsRemoved(const ClientID &node_id) const override { return false; } + + Status AsyncGetResources( + const ClientID &node_id, + const gcs::OptionalItemCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncUpdateResources(const ClientID &node_id, const ResourceMap &resources, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncDeleteResources(const ClientID &node_id, + const std::vector &resource_names, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncSubscribeToResources( + const gcs::SubscribeCallback + &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + return Status::NotImplemented(""); + } + + Status AsyncSubscribeHeartbeat( + const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + + Status AsyncReportBatchHeartbeat( + const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + if (callback) { + callback(Status::OK()); + } + return Status::OK(); + } + + Status AsyncSubscribeBatchHeartbeat( + const gcs::ItemCallback &subscribe, + const gcs::StatusCallback &done) override { + return Status::NotImplemented(""); + } + }; + + class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor { + public: + Status AsyncReportJobError(const std::shared_ptr &data_ptr, + const gcs::StatusCallback &callback) override { + if (callback) { + callback(Status::OK()); + } + return Status::OK(); + } + }; +}; + +} // namespace ray + +#endif // RAY_GCS_TEST_UTIL_H diff --git a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc index bfe148503be7c..072af9156d4d5 100644 --- a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc @@ -24,6 +24,11 @@ void DefaultWorkerInfoHandler::HandleReportWorkerFailure( RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString(); auto worker_failure_data = std::make_shared(); worker_failure_data->CopyFrom(request.worker_failure()); + auto need_reschedule = !worker_failure_data->intentional_disconnect(); + auto node_id = ClientID::FromBinary(worker_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(worker_address.worker_id()); + gcs_actor_manager_.ReconstructActorOnWorker(node_id, worker_id, need_reschedule); + auto on_done = [worker_address, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to report worker failure, " diff --git a/src/ray/gcs/gcs_server/worker_info_handler_impl.h b/src/ray/gcs/gcs_server/worker_info_handler_impl.h index 2929c4ecc36f9..0c508638b85a7 100644 --- a/src/ray/gcs/gcs_server/worker_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/worker_info_handler_impl.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_WORKER_INFO_HANDLER_IMPL_H #define RAY_GCS_WORKER_INFO_HANDLER_IMPL_H +#include "gcs_actor_manager.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" @@ -24,8 +25,9 @@ namespace rpc { /// This implementation class of `WorkerInfoHandler`. class DefaultWorkerInfoHandler : public rpc::WorkerInfoHandler { public: - explicit DefaultWorkerInfoHandler(gcs::RedisGcsClient &gcs_client) - : gcs_client_(gcs_client) {} + explicit DefaultWorkerInfoHandler(gcs::RedisGcsClient &gcs_client, + gcs::GcsActorManager &gcs_actor_manager) + : gcs_client_(gcs_client), gcs_actor_manager_(gcs_actor_manager) {} void HandleReportWorkerFailure(const ReportWorkerFailureRequest &request, ReportWorkerFailureReply *reply, @@ -37,6 +39,7 @@ class DefaultWorkerInfoHandler : public rpc::WorkerInfoHandler { private: gcs::RedisGcsClient &gcs_client_; + gcs::GcsActorManager &gcs_actor_manager_; }; } // namespace rpc diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index a930e4bc355fa..94b416967e5ca 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -84,13 +84,15 @@ inline std::shared_ptr CreateActorTableData( /// Helper function to produce worker failure data. inline std::shared_ptr CreateWorkerFailureData( const ClientID &raylet_id, const WorkerID &worker_id, const std::string &address, - int32_t port, int64_t timestamp = std::time(nullptr)) { + int32_t port, int64_t timestamp = std::time(nullptr), + bool intentional_disconnect = false) { auto worker_failure_info_ptr = std::make_shared(); worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary()); worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary()); worker_failure_info_ptr->mutable_worker_address()->set_ip_address(address); worker_failure_info_ptr->mutable_worker_address()->set_port(port); worker_failure_info_ptr->set_timestamp(timestamp); + worker_failure_info_ptr->set_intentional_disconnect(intentional_disconnect); return worker_failure_info_ptr; } diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 9a1ff44455b54..1eeb191efa114 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -63,6 +63,14 @@ Status RedisLogBasedActorInfoAccessor::AsyncGet( on_done); } +Status RedisLogBasedActorInfoAccessor::AsyncCreateActor( + const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { + const std::string error_msg = + "Unsupported method of AsyncCreateActor in RedisLogBasedActorInfoAccessor."; + RAY_LOG(FATAL) << error_msg; + return Status::Invalid(error_msg); +} + Status RedisLogBasedActorInfoAccessor::AsyncRegister( const std::shared_ptr &data_ptr, const StatusCallback &callback) { auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id, diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 50cdeabc1518d..bd4a0ab9b214d 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -15,6 +15,7 @@ #ifndef RAY_GCS_REDIS_ACCESSOR_H #define RAY_GCS_REDIS_ACCESSOR_H +#include #include "ray/common/id.h" #include "ray/gcs/accessor.h" #include "ray/gcs/callback.h" @@ -41,6 +42,9 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor { Status AsyncGet(const ActorID &actor_id, const OptionalItemCallback &callback) override; + Status AsyncCreateActor(const TaskSpecification &task_spec, + const StatusCallback &callback) override; + Status AsyncRegister(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 1500b6d62fae6..f8eedd8c725dd 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -70,7 +70,11 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { resource_table_.reset(new DynamicResourceTable({primary_context}, this)); worker_failure_table_.reset(new WorkerFailureTable(shard_contexts, this)); - actor_accessor_.reset(new RedisLogBasedActorInfoAccessor(this)); + if (RayConfig::instance().gcs_service_enabled()) { + actor_accessor_.reset(new RedisActorInfoAccessor(this)); + } else { + actor_accessor_.reset(new RedisLogBasedActorInfoAccessor(this)); + } job_accessor_.reset(new RedisJobInfoAccessor(this)); object_accessor_.reset(new RedisObjectInfoAccessor(this)); node_accessor_.reset(new RedisNodeInfoAccessor(this)); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index c137fbf25280a..e0dcb0997227c 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -99,13 +99,15 @@ message TaskTableData { message ActorTableData { // State of an actor. enum ActorState { + // Actor is pending. + PENDING = 0; // Actor is alive. - ALIVE = 0; + ALIVE = 1; // Actor is dead, now being reconstructed. // After reconstruction finishes, the state will become alive again. - RECONSTRUCTING = 1; + RECONSTRUCTING = 2; // Actor is already dead and won't be reconstructed. - DEAD = 2; + DEAD = 3; } // The ID of the actor that was created. bytes actor_id = 1; @@ -131,6 +133,8 @@ message ActorTableData { bool is_detached = 11; // Timestamp that the actor is created or reconstructed. double timestamp = 12; + // The task specification of this actor's creation task. + TaskSpec task_spec = 13; } message ErrorTableData { @@ -295,6 +299,8 @@ message WorkerFailureData { Address worker_address = 1; // The UNIX timestamp at which the worker failed. int64 timestamp = 3; + // Is intentional disconnect + bool intentional_disconnect = 4; } // This enum type is used as object's metadata to indicate the object's creating diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 8f3a5150ba162..b71974186e013 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -108,6 +108,8 @@ message GetActorCheckpointIDReply { // Service for actor info access. service ActorInfoGcsService { + // Create actor via gcs service + rpc CreateActor(CreateActorRequest) returns (CreateActorReply); // Get actor data from GCS Service. rpc GetActorInfo(GetActorInfoRequest) returns (GetActorInfoReply); // Register an actor to GCS Service. @@ -366,3 +368,11 @@ service WorkerInfoGcsService { // Register a worker to GCS Service. rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerReply); } + +message CreateActorRequest { + TaskSpec task_spec = 1; +} + +message CreateActorReply { + GcsStatus status = 1; +} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 292f653eefa77..e83c867d23463 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -805,21 +805,25 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, if (it == actor_registry_.end()) { it = actor_registry_.emplace(actor_id, actor_registration).first; } else { - // Only process the state transition if it is to a later state than ours. - if (actor_registration.GetState() > it->second.GetState() && - actor_registration.GetRemainingReconstructions() == - it->second.GetRemainingReconstructions()) { - // The new state is later than ours if it is about the same lifetime, but - // a greater state. - it->second = actor_registration; - } else if (actor_registration.GetRemainingReconstructions() < - it->second.GetRemainingReconstructions()) { - // The new state is also later than ours it is about a later lifetime of - // the actor. + if (RayConfig::instance().gcs_service_enabled()) { it->second = actor_registration; } else { - // Our state is already at or past the update, so skip the update. - return; + // Only process the state transition if it is to a later state than ours. + if (actor_registration.GetState() > it->second.GetState() && + actor_registration.GetRemainingReconstructions() == + it->second.GetRemainingReconstructions()) { + // The new state is later than ours if it is about the same lifetime, but + // a greater state. + it->second = actor_registration; + } else if (actor_registration.GetRemainingReconstructions() < + it->second.GetRemainingReconstructions()) { + // The new state is also later than ours it is about a later lifetime of + // the actor. + it->second = actor_registration; + } else { + // Our state is already at or past the update, so skip the update. + return; + } } } RAY_LOG(DEBUG) << "Actor notification received: actor_id = " << actor_id @@ -868,13 +872,14 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, for (auto const &task : removed_tasks) { TreatTaskAsFailed(task, ErrorType::ACTOR_DIED); } - } else { - RAY_CHECK(actor_registration.GetState() == ActorTableData::RECONSTRUCTING); + } else if (actor_registration.GetState() == ActorTableData::RECONSTRUCTING) { RAY_LOG(DEBUG) << "Actor is being reconstructed: " << actor_id; - // The actor is dead and needs reconstruction. Attempting to reconstruct its - // creation task. - reconstruction_policy_.ListenAndMaybeReconstruct( - actor_registration.GetActorCreationDependency()); + if (!RayConfig::instance().gcs_service_enabled()) { + // The actor is dead and needs reconstruction. Attempting to reconstruct its + // creation task. + reconstruction_policy_.ListenAndMaybeReconstruct( + actor_registration.GetActorCreationDependency()); + } // When an actor fails but can be reconstructed, resubmit all of the queued // tasks for that actor. This will mark the tasks as waiting for actor @@ -884,6 +889,9 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, for (auto const &task : removed_tasks) { SubmitTask(task, Lineage()); } + } else { + RAY_CHECK(actor_registration.GetState() == ActorTableData::PENDING); + // Do nothing. } } @@ -1119,6 +1127,11 @@ void NodeManager::ProcessRegisterClientRequestMessage( void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_local, bool intentional_disconnect) { + if (RayConfig::instance().gcs_service_enabled()) { + // If gcs actor management is enabled, the gcs will take over the status change of all + // actors. + return; + } auto actor_entry = actor_registry_.find(actor_id); RAY_CHECK(actor_entry != actor_registry_.end()); auto &actor_registration = actor_entry->second; @@ -1224,6 +1237,9 @@ void NodeManager::ProcessDisconnectClientMessage( // particular, we are no longer waiting for their dependencies. if (worker) { if (is_worker && worker->IsDead()) { + // If the worker was killed by us because the driver exited, + // treat it as intentionally disconnected. + intentional_disconnect = true; // Don't need to unblock the client if it's a worker and is already dead. // Because in this case, its task is already cleaned up. RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead."; @@ -1245,19 +1261,12 @@ void NodeManager::ProcessDisconnectClientMessage( // Publish the worker failure. auto worker_failure_data_ptr = gcs::CreateWorkerFailureData( self_node_id_, worker->WorkerId(), initial_config_.node_manager_address, - worker->Port()); + worker->Port(), time(nullptr), intentional_disconnect); RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); } if (is_worker) { - // The client is a worker. - if (worker->IsDead()) { - // If the worker was killed by us because the driver exited, - // treat it as intentionally disconnected. - intentional_disconnect = true; - } - const ActorID &actor_id = worker->GetActorId(); if (!actor_id.IsNil()) { // If the worker was an actor, update actor state, reconstruct the actor if needed, @@ -1569,6 +1578,8 @@ void NodeManager::DispatchScheduledTasksToWorkers() { } worker->SetOwnerAddress(spec.CallerAddress()); if (spec.IsActorCreationTask()) { + // The actor belongs to this worker now. + worker->AssignActorId(spec.ActorCreationId()); worker->SetLifetimeAllocatedInstances(allocated_instances); } else { worker->SetAllocatedInstances(allocated_instances); @@ -1647,7 +1658,7 @@ void NodeManager::WaitForTaskArgsRequests(std::pair &work) { } else { tasks_to_dispatch_.push_back(work); } -}; +} void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request, rpc::RequestWorkerLeaseReply *reply, @@ -2667,6 +2678,15 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { worker.MarkDetachedActor(); } + if (RayConfig::instance().gcs_service_enabled()) { + // Gcs server is responsible for notifying other nodes of the changes of actor + // status, and thus raylet doesn't need to handle this anymore. + // And if `new_scheduler_enabled_` is true, this function `FinishAssignedActorTask` + // will not be called because raylet is not aware of the actual task when receiving + // a worker lease request. + return; + } + // Lookup the parent actor id. auto parent_task_id = task_spec.ParentTaskId(); int port = worker.Port(); diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index df9c6bf98fec6..0e746bec25d70 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -96,6 +96,9 @@ class GcsRpcClient { /// Mark job as finished to gcs server. VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, MarkJobFinished, job_info_grpc_client_, ) + /// Create actor via GCS Service. + VOID_RPC_CLIENT_METHOD(ActorInfoGcsService, CreateActor, actor_info_grpc_client_, ) + /// Get actor data from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, GetActorInfo, actor_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index d12340814d793..edf46c6d5818e 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -94,6 +94,10 @@ class ActorInfoGcsServiceHandler { public: virtual ~ActorInfoGcsServiceHandler() = default; + virtual void HandleCreateActor(const CreateActorRequest &request, + CreateActorReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetActorInfo(const GetActorInfoRequest &request, GetActorInfoReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -135,6 +139,7 @@ class ActorInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, std::vector> *server_call_factories) override { + ACTOR_INFO_SERVICE_RPC_HANDLER(CreateActor); ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo); diff --git a/src/ray/util/asio_util.h b/src/ray/util/asio_util.h new file mode 100644 index 0000000000000..7e70916054364 --- /dev/null +++ b/src/ray/util/asio_util.h @@ -0,0 +1,31 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_ASIO_UTIL_H +#define RAY_ASIO_UTIL_H + +#include + +inline void execute_after(boost::asio::io_context &io_context, + const std::function &fn, uint32_t delay_milliseconds) { + auto timer = std::make_shared(io_context); + timer->expires_from_now(boost::posix_time::milliseconds(delay_milliseconds)); + timer->async_wait([timer, fn](const boost::system::error_code &error) { + if (error != boost::system::errc::operation_canceled && fn) { + fn(); + } + }); +} + +#endif // RAY_ASIO_UTIL_H