Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS-Based actor management implementation #6763

Merged
merged 15 commits into from Apr 13, 2020
46 changes: 45 additions & 1 deletion BUILD.bazel
Expand Up @@ -325,6 +325,9 @@ cc_library(
deps = [
":gcs",
":gcs_service_rpc",
":node_manager_rpc",
":raylet_lib",
":worker_rpc",
],
)

Expand Down Expand Up @@ -725,6 +728,45 @@ cc_test(
],
)

cc_test(
name = "gcs_node_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_actor_scheduler_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gcs_actor_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc",
"src/ray/gcs/gcs_server/test/gcs_test_util.h",
],
copts = COPTS,
deps = [
":gcs_server_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(
Expand All @@ -739,7 +781,8 @@ cc_library(
),
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs",
":gcs_service_rpc",
],
)

Expand All @@ -754,6 +797,7 @@ cc_test(
"//:redis-server",
],
deps = [
":gcs_server_lib",
":service_based_gcs_client_lib",
"@com_google_googletest//:gtest_main",
],
Expand Down
Expand Up @@ -40,7 +40,7 @@ public boolean wasCurrentActorReconstructed() {
return false;
}

return runtime.getGcsClient().actorExists(getCurrentActorId());
return runtime.getGcsClient().wasCurrentActorReconstructed(getCurrentActorId());
}

@Override
Expand Down
26 changes: 24 additions & 2 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Expand Up @@ -9,6 +9,7 @@
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.ActorCheckpointIdData;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
Expand All @@ -27,9 +28,7 @@
* An implementation of GcsClient.
*/
public class GcsClient {

private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class);

private RedisClient primary;

private List<RedisClient> shards;
Expand Down Expand Up @@ -126,6 +125,29 @@ public boolean actorExists(ActorId actorId) {
return primary.exists(key);
}

public boolean wasCurrentActorReconstructed(ActorId actorId) {
byte[] key = ArrayUtils.addAll(TablePrefix.ACTOR.toString().getBytes(), actorId.getBytes());
if (!RayConfig.getInstance().gcsServiceEnabled) {
return primary.exists(key);
}

// TODO(ZhuSenlin): Get the actor table data from CoreWorker later.
byte[] value = primary.get(key);
if (value == null) {
return false;
}
Gcs.ActorTableData actorTableData = null;
try {
actorTableData = Gcs.ActorTableData.parseFrom(value);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}

long maxReconstructions = actorTableData.getMaxReconstructions();
long remainingReconstructions = actorTableData.getRemainingReconstructions();
return maxReconstructions - remainingReconstructions != 0;
}

/**
* Query whether the raylet task exists in Gcs.
*/
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_metrics.py
Expand Up @@ -227,7 +227,7 @@ def getpid(self):
if child_actor_info["state"] == -1:
assert child_actor_info["requiredResources"]["CustomResource"] == 1
else:
assert child_actor_info["state"] == 0
assert child_actor_info["state"] == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change? Was it wrong before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change? Was it wrong before?

That is because a new enum value 'ActorTableData::::ActorState::PENDING' is added. The value 0 is refers to ActorTableData::::ActorState::PENDING while ActorTableData::::ActorState::ALIVE is expected in this unit test.

  enum ActorState {
    // Actor is pending.
    PENDING = 0;
    // Actor is alive.
    ALIVE = 1;
    // Actor is dead, now being reconstructed.
    // After reconstruction finishes, the state will become alive again.
    RECONSTRUCTING = 2;
    // Actor is already dead and won't be reconstructed.
    DEAD = 3;
  }

assert len(child_actor_info["children"]) == 0
assert child_actor_info["usedResources"]["CPU"] == 1

Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -263,6 +263,10 @@ RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// The interval at which the gcs server will check if redis has gone down.
/// When this happens, gcs server will kill itself.
RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
/// Duration to wait between retries for leasing worker in gcs server.
RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200)
/// Duration to wait between retries for creating actor in gcs server.
RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)

/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
Expand Down
23 changes: 18 additions & 5 deletions src/ray/core_worker/core_worker.cc
Expand Up @@ -410,6 +410,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
};

std::function<Status(const TaskSpecification &, const gcs::StatusCallback &)>
actor_create_callback = nullptr;
if (RayConfig::instance().gcs_service_enabled()) {
actor_create_callback = [this](const TaskSpecification &task_spec,
const gcs::StatusCallback &callback) {
return gcs_client_->Actors().AsyncCreateActor(task_spec, callback);
};
}

direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_,
task_manager_));
Expand All @@ -418,7 +428,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
std::unique_ptr<CoreWorkerDirectTaskSubmitter>(new CoreWorkerDirectTaskSubmitter(
rpc_address_, local_raylet_client_, client_factory, raylet_client_factory,
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds()));
RayConfig::instance().worker_lease_timeout_milliseconds(),
std::move(actor_create_callback)));
future_resolver_.reset(new FutureResolver(memory_store_, client_factory));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
Expand Down Expand Up @@ -624,7 +635,6 @@ void CoreWorker::RegisterToGcs() {
RAY_CHECK_OK(gcs_client_->Workers().AsyncRegisterWorker(options_.worker_type, worker_id,
worker_info, nullptr));
}

void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
Expand Down Expand Up @@ -1244,7 +1254,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
Expand All @@ -1265,8 +1277,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address());
}

RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
Expand Down
25 changes: 24 additions & 1 deletion src/ray/core_worker/transport/direct_task_transport.cc
Expand Up @@ -15,14 +15,37 @@
#include "ray/core_worker/transport/direct_task_transport.h"

#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/core_worker/transport/direct_actor_transport.h"

namespace ray {

Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
resolver_.ResolveDependencies(task_spec, [this, task_spec]() {
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
if (actor_create_callback_ && task_spec.IsActorCreationTask()) {
// If gcs actor management is enabled, the actor creation task will be sent to gcs
// server directly after the in-memory dependent objects are resolved.
// For more details please see the protocol of actor management based on gcs.
// https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing
auto actor_id = task_spec.ActorCreationId();
auto task_id = task_spec.TaskId();
RAY_LOG(INFO) << "Submitting actor creation task to GCS: " << actor_id;
auto status =
actor_create_callback_(task_spec, [this, actor_id, task_id](Status status) {
// If GCS is failed, GcsRpcClient may receive IOError status but it will not
// trigger this callback, because GcsRpcClient has retry logic at the
// bottom. So if this callback is invoked with an error there must be
// something wrong with the protocol of gcs-based actor management.
// So just check `status.ok()` here.
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Actor creation task submitted to GCS: " << actor_id;
task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(),
rpc::Address());
});
RAY_CHECK_OK(status);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered, we should probably also call back to the TaskManager here, as we do for the non-GCS service path. This is used to indicate that the actor creation task has finished. We can also call the failure handler if the status returned an error, instead of crashing here.

There is a good example of what to do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered, we should probably also call back to the TaskManager here, as we do for the non-GCS service path. This is used to indicate that the actor creation task has finished. We can also call the failure handler if the status returned an error, instead of crashing here.

There is a good example of what to do here.

GcsRpcClient is a reliable client, it will retry at the bottom and ensure that the message must be delivered to the GCS Server, even if the GCS Server is restarted. So I assert here that status must be OK, if not then there must be a serious bug in the system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, I meant that we need to notify the TaskManager that the actor creation task is done. Otherwise, the TaskManager will still think that the creation task is pending. You have to call something like task_finisher_->CompletePendingTask (see the linked code for an example).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, I meant that we need to notify the TaskManager that the actor creation task is done. Otherwise, the TaskManager will still think that the creation task is pending. You have to call something like task_finisher_->CompletePendingTask (see the linked code for an example).

Okey, I know what you mean, I will fix it.

}

absl::MutexLock lock(&mu_);
// Note that the dependencies in the task spec are mutated to only contain
// plasma dependencies after ResolveDependencies finishes.
Expand Down
25 changes: 17 additions & 8 deletions src/ray/core_worker/transport/direct_task_transport.h
Expand Up @@ -49,21 +49,23 @@ using SchedulingKey = std::tuple<SchedulingClass, std::vector<ObjectID>, ActorID
// This class is thread-safe.
class CoreWorkerDirectTaskSubmitter {
public:
CoreWorkerDirectTaskSubmitter(rpc::Address rpc_address,
std::shared_ptr<WorkerLeaseInterface> lease_client,
rpc::ClientFactoryFn client_factory,
LeaseClientFactoryFn lease_client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher,
ClientID local_raylet_id, int64_t lease_timeout_ms)
explicit CoreWorkerDirectTaskSubmitter(
rpc::Address rpc_address, std::shared_ptr<WorkerLeaseInterface> lease_client,
rpc::ClientFactoryFn client_factory, LeaseClientFactoryFn lease_client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher, ClientID local_raylet_id,
int64_t lease_timeout_ms,
std::function<Status(const TaskSpecification &, const gcs::StatusCallback &)>
actor_create_callback = nullptr)
: rpc_address_(rpc_address),
local_lease_client_(lease_client),
client_factory_(client_factory),
lease_client_factory_(lease_client_factory),
resolver_(store, task_finisher),
task_finisher_(task_finisher),
lease_timeout_ms_(lease_timeout_ms),
local_raylet_id_(local_raylet_id),
lease_timeout_ms_(lease_timeout_ms) {}
actor_create_callback_(std::move(actor_create_callback)) {}

/// Schedule a task for direct submission to a worker.
///
Expand Down Expand Up @@ -148,6 +150,13 @@ class CoreWorkerDirectTaskSubmitter {
/// if a remote raylet tells us to spill the task back to the local raylet.
const ClientID local_raylet_id_;

/// A function to override actor creation. The callback will be called once the actor
/// creation task has been accepted for submission, but the actor may not be created
/// yet.
std::function<Status(const TaskSpecification &task_spec,
const gcs::StatusCallback &callback)>
actor_create_callback_;

// Protects task submission state below.
absl::Mutex mu_;

Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/accessor.h
Expand Up @@ -16,6 +16,7 @@
#define RAY_GCS_ACCESSOR_H

#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/entry_change_notification.h"
#include "ray/protobuf/gcs.pb.h"
Expand Down Expand Up @@ -46,6 +47,14 @@ class ActorInfoAccessor {
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) = 0;

/// Create an actor to GCS asynchronously.
///
/// \param task_spec The specification for the actor creation task.
/// \param callback Callback that will be called after the actor info is written to GCS.
/// \return Status
virtual Status AsyncCreateActor(const TaskSpecification &task_spec,
const StatusCallback &callback) = 0;

/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
Expand Down
18 changes: 17 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.cc
Expand Up @@ -79,7 +79,7 @@ ServiceBasedActorInfoAccessor::ServiceBasedActorInfoAccessor(
ServiceBasedGcsClient *client_impl)
: subscribe_id_(ClientID::FromRandom()),
client_impl_(client_impl),
actor_sub_executor_(client_impl->GetRedisGcsClient().log_based_actor_table()) {}
actor_sub_executor_(client_impl->GetRedisGcsClient().actor_table()) {}

Status ServiceBasedActorInfoAccessor::GetAll(
std::vector<ActorTableData> *actor_table_data_list) {
Expand All @@ -106,6 +106,22 @@ Status ServiceBasedActorInfoAccessor::AsyncGet(
return Status::OK();
}

Status ServiceBasedActorInfoAccessor::AsyncCreateActor(
const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) {
RAY_CHECK(task_spec.IsActorCreationTask() && callback);
rpc::CreateActorRequest request;
request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
client_impl_->GetGcsRpcClient().CreateActor(
request, [callback](const Status &, const rpc::CreateActorReply &reply) {
auto status =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic to convert gRPC statuses to Ray statuses should probably be in a utility function somewhere (I think it already is?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic to convert gRPC statuses to Ray statuses should probably be in a utility function somewhere (I think it already is?)

I've searched the code. Similar code only exists in gcs_rpc_client.h. I think these two pieces are not quite same. One is the logic of returning reply on the server side, and the other is the logic of converting after receiving reply on the client side.

reply.status().code() == (int)StatusCode::OK
? Status()
: Status(StatusCode(reply.status().code()), reply.status().message());
callback(status);
wumuzi520 marked this conversation as resolved.
Show resolved Hide resolved
});
return Status::OK();
}

Status ServiceBasedActorInfoAccessor::AsyncRegister(
const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) {
Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_client/service_based_accessor.h
Expand Up @@ -15,6 +15,7 @@
#ifndef RAY_GCS_SERVICE_BASED_ACCESSOR_H
#define RAY_GCS_SERVICE_BASED_ACCESSOR_H

#include <ray/common/task/task_spec.h>
#include "ray/gcs/accessor.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/util/sequencer.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorTableData> &callback) override;

Status AsyncCreateActor(const TaskSpecification &task_spec,
const StatusCallback &callback) override;

Status AsyncRegister(const std::shared_ptr<rpc::ActorTableData> &data_ptr,
const StatusCallback &callback) override;

Expand Down Expand Up @@ -97,7 +101,7 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
private:
ServiceBasedGcsClient *client_impl_;

typedef SubscriptionExecutor<ActorID, ActorTableData, LogBasedActorTable>
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;

Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_server/actor_info_handler_impl.cc
Expand Up @@ -18,6 +18,21 @@
namespace ray {
namespace rpc {

void DefaultActorInfoHandler::HandleCreateActor(
const ray::rpc::CreateActorRequest &request, ray::rpc::CreateActorReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK);
auto actor_id =
ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id());
wumuzi520 marked this conversation as resolved.
Show resolved Hide resolved

RAY_LOG(INFO) << "Registering actor, actor id = " << actor_id;
gcs_actor_manager_.RegisterActor(request, [reply, send_reply_callback, actor_id](
std::shared_ptr<gcs::GcsActor> actor) {
RAY_LOG(INFO) << "Registered actor, actor id = " << actor_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
});
}

void DefaultActorInfoHandler::HandleGetActorInfo(
const rpc::GetActorInfoRequest &request, rpc::GetActorInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down