Skip to content

Commit

Permalink
[GCS]GCS adapts to job table pub sub (#8145)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin committed Apr 24, 2020
1 parent 2298f6f commit 713e375
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 30 deletions.
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ cc_library(
copts = COPTS,
deps = [
":gcs",
":gcs_pub_sub_lib",
":gcs_service_rpc",
":node_manager_rpc",
":raylet_lib",
Expand Down Expand Up @@ -862,6 +863,7 @@ cc_library(
copts = COPTS,
deps = [
":gcs",
":gcs_pub_sub_lib",
":gcs_service_rpc",
":redis_store_client",
],
Expand Down
11 changes: 6 additions & 5 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ namespace gcs {

ServiceBasedJobInfoAccessor::ServiceBasedJobInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
job_sub_executor_(client_impl->GetRedisGcsClient().job_table()) {}
: client_impl_(client_impl) {}

Status ServiceBasedJobInfoAccessor::AsyncAdd(
const std::shared_ptr<JobTableData> &data_ptr, const StatusCallback &callback) {
Expand Down Expand Up @@ -64,13 +63,15 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing finished job.";
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
JobTableData job_data;
job_data.ParseFromString(data);
if (job_data.is_dead()) {
subscribe(job_id, job_data);
subscribe(JobID::FromBinary(id), job_data);
}
};
Status status =
job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing finished job.";
return status;
}
Expand Down
3 changes: 0 additions & 3 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ class ServiceBasedJobInfoAccessor : public JobInfoAccessor {

private:
ServiceBasedGcsClient *client_impl_;

typedef SubscriptionExecutor<JobID, JobTableData, JobTable> JobSubscriptionExecutor;
JobSubscriptionExecutor job_sub_executor_;
};

/// \class ServiceBasedActorInfoAccessor
Expand Down
9 changes: 6 additions & 3 deletions src/ray/gcs/gcs_client/service_based_gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
return Status::Invalid("gcs service address is invalid!");
}

// Connect to gcs
// Connect to gcs.
redis_gcs_client_.reset(new RedisGcsClient(options_));
RAY_CHECK_OK(redis_gcs_client_->Connect(io_service));

// Get gcs service address
// Init gcs pub sub instance.
gcs_pub_sub_.reset(new GcsPubSub(redis_gcs_client_->GetRedisClient()));

// Get gcs service address.
auto get_server_address = [this]() {
std::pair<std::string, int> address;
GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(),
Expand All @@ -44,7 +47,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
};
std::pair<std::string, int> address = get_server_address();

// Connect to gcs service
// Connect to gcs service.
client_call_manager_.reset(new rpc::ClientCallManager(io_service));
gcs_rpc_client_.reset(new rpc::GcsRpcClient(address.first, address.second,
*client_call_manager_, get_server_address));
Expand Down
9 changes: 6 additions & 3 deletions src/ray/gcs/gcs_client/service_based_gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RAY_GCS_SERVICE_BASED_GCS_CLIENT_H
#define RAY_GCS_SERVICE_BASED_GCS_CLIENT_H

#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_client.h"

Expand All @@ -23,14 +24,14 @@ namespace gcs {

class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
public:
ServiceBasedGcsClient(const GcsClientOptions &options);

ServiceBasedGcsClient(RedisGcsClient *redis_gcs_client);
explicit ServiceBasedGcsClient(const GcsClientOptions &options);

Status Connect(boost::asio::io_service &io_service) override;

void Disconnect() override;

GcsPubSub &GetGcsPubSub() { return *gcs_pub_sub_; }

RedisGcsClient &GetRedisGcsClient() { return *redis_gcs_client_; }

rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; }
Expand All @@ -46,6 +47,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {

std::unique_ptr<RedisGcsClient> redis_gcs_client_;

std::unique_ptr<GcsPubSub> gcs_pub_sub_;

// Gcs rpc client
std::unique_ptr<rpc::GcsRpcClient> gcs_rpc_client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
Expand Down
13 changes: 8 additions & 5 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ void GcsServer::Start() {
// Init backend client.
InitBackendClient();

// Init gcs node_manager
// Init gcs node_manager.
InitGcsNodeManager();

// Init gcs detector
// Init gcs pub sub instance.
gcs_pub_sub_ = std::make_shared<gcs::GcsPubSub>(redis_gcs_client_->GetRedisClient());

// Init gcs detector.
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();

// Init gcs actor manager
// Init gcs actor manager.
InitGcsActorManager();

// Register rpc service.
Expand Down Expand Up @@ -93,7 +96,7 @@ void GcsServer::Start() {
// Run rpc server.
rpc_server_.Run();

// Store gcs rpc server address in redis
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;

Expand Down Expand Up @@ -138,7 +141,7 @@ void GcsServer::InitGcsActorManager() {

std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {
return std::unique_ptr<rpc::DefaultJobInfoHandler>(
new rpc::DefaultJobInfoHandler(*redis_gcs_client_));
new rpc::DefaultJobInfoHandler(*redis_gcs_client_, gcs_pub_sub_));
}

std::unique_ptr<rpc::ActorInfoHandler> GcsServer::InitActorInfoHandler() {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RAY_GCS_GCS_SERVER_H
#define RAY_GCS_GCS_SERVER_H

#include <ray/gcs/pubsub/gcs_pub_sub.h>
#include <ray/gcs/redis_gcs_client.h>
#include <ray/rpc/gcs_server/gcs_rpc_server.h>
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
Expand Down Expand Up @@ -146,6 +147,8 @@ class GcsServer {
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Gcs service state flag, which is used for ut.
bool is_started_ = false;
bool is_stopped_ = false;
Expand Down
24 changes: 16 additions & 8 deletions src/ray/gcs/gcs_server/job_info_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@
// limitations under the License.

#include "job_info_handler_impl.h"
#include "ray/gcs/pb_util.h"

namespace ray {
namespace rpc {
void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
rpc::AddJobReply *reply,
rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.data().job_id());
RAY_LOG(DEBUG) << "Adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
RAY_LOG(INFO) << "Adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
auto job_table_data = std::make_shared<JobTableData>();
job_table_data->CopyFrom(request.data());
auto on_done = [job_id, request, reply, send_reply_callback](Status status) {
auto on_done = [job_id, request, reply, send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
} else {
RAY_LOG(INFO) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Expand All @@ -36,18 +40,23 @@ void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}

void DefaultJobInfoHandler::HandleMarkJobFinished(
const rpc::MarkJobFinishedRequest &request, rpc::MarkJobFinishedReply *reply,
rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.job_id());
RAY_LOG(DEBUG) << "Marking job state, job id = " << job_id;
auto on_done = [job_id, reply, send_reply_callback](Status status) {
RAY_LOG(INFO) << "Marking job state, job id = " << job_id;
auto job_table_data =
gcs::CreateJobTableData(job_id, /*is_dead*/ true, std::time(nullptr), "", -1);
auto on_done = [this, job_id, job_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Binary(),
job_table_data->SerializeAsString(), nullptr));
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Expand All @@ -56,7 +65,6 @@ void DefaultJobInfoHandler::HandleMarkJobFinished(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished marking job state, job id = " << job_id;
}
} // namespace rpc
} // namespace ray
7 changes: 5 additions & 2 deletions src/ray/gcs/gcs_server/job_info_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RAY_GCS_JOB_INFO_HANDLER_IMPL_H
#define RAY_GCS_JOB_INFO_HANDLER_IMPL_H

#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"

Expand All @@ -24,8 +25,9 @@ namespace rpc {
/// This implementation class of `JobInfoHandler`.
class DefaultJobInfoHandler : public rpc::JobInfoHandler {
public:
explicit DefaultJobInfoHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
explicit DefaultJobInfoHandler(gcs::RedisGcsClient &gcs_client,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {}

void HandleAddJob(const AddJobRequest &request, AddJobReply *reply,
SendReplyCallback send_reply_callback) override;
Expand All @@ -36,6 +38,7 @@ class DefaultJobInfoHandler : public rpc::JobInfoHandler {

private:
gcs::RedisGcsClient &gcs_client_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};

} // namespace rpc
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
namespace ray {
namespace gcs {

#define JOB_CHANNEL "JOB"

/// \class GcsPubSub
///
/// GcsPubSub supports publishing, subscription and unsubscribing of data.
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/redis_gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
return redis_client_->GetPrimaryContext();
}

std::shared_ptr<RedisClient> GetRedisClient() const { return redis_client_; }

/// The following xxx_table methods implement the Accessor interfaces.
/// Implements the Actors() interface.
LogBasedActorTable &log_based_actor_table();
Expand Down Expand Up @@ -113,7 +115,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
// might be used, if available.
CommandType command_type_{CommandType::kUnknown};

std::unique_ptr<RedisClient> redis_client_;
std::shared_ptr<RedisClient> redis_client_;

std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;
Expand Down

0 comments on commit 713e375

Please sign in to comment.