Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove static concurrency limit from gRPC server #7544

Merged
merged 2 commits into from Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 50 additions & 61 deletions src/ray/rpc/gcs_server/gcs_rpc_server.h
Expand Up @@ -9,31 +9,28 @@
namespace ray {
namespace rpc {

#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER, CONCURRENCY)
#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER)

#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER, CONCURRENCY)
#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER)

#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER, CONCURRENCY)
#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER)

#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER, CONCURRENCY)
#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER)

#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER, CONCURRENCY)
#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER)

#define STATS_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(StatsGcsService, HANDLER, CONCURRENCY)
#define STATS_SERVICE_RPC_HANDLER(HANDLER) RPC_SERVICE_HANDLER(StatsGcsService, HANDLER)

#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER, CONCURRENCY)
#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER)

#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER, CONCURRENCY)

#define SERVER_CALL_CONCURRENCY 9999
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER)

class JobInfoGcsServiceHandler {
public:
Expand Down Expand Up @@ -62,10 +59,9 @@ class JobInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
JOB_INFO_SERVICE_RPC_HANDLER(AddJob, SERVER_CALL_CONCURRENCY);
JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
JOB_INFO_SERVICE_RPC_HANDLER(AddJob);
JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished);
}

private:
Expand Down Expand Up @@ -119,14 +115,13 @@ class ActorInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID);
}

private:
Expand Down Expand Up @@ -188,16 +183,15 @@ class NodeInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(GetResources, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo);
NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(GetResources);
NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources);
NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources);
}

private:
Expand Down Expand Up @@ -239,11 +233,10 @@ class ObjectInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations, SERVER_CALL_CONCURRENCY);
OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation, SERVER_CALL_CONCURRENCY);
OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations);
OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation);
OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation);
}

private:
Expand Down Expand Up @@ -291,13 +284,12 @@ class TaskInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
TASK_INFO_SERVICE_RPC_HANDLER(AddTask, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(GetTask, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
TASK_INFO_SERVICE_RPC_HANDLER(AddTask);
TASK_INFO_SERVICE_RPC_HANDLER(GetTask);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction);
}

private:
Expand Down Expand Up @@ -331,9 +323,8 @@ class StatsGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
STATS_SERVICE_RPC_HANDLER(AddProfileData, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
STATS_SERVICE_RPC_HANDLER(AddProfileData);
}

private:
Expand Down Expand Up @@ -367,9 +358,8 @@ class ErrorInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError);
}

private:
Expand Down Expand Up @@ -403,9 +393,8 @@ class WorkerInfoGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure);
}

private:
Expand Down
9 changes: 4 additions & 5 deletions src/ray/rpc/grpc_server.cc
Expand Up @@ -50,10 +50,9 @@ void GrpcServer::Run() {
RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << ".";

// Create calls for all the server call factories.
for (auto &entry : server_call_factories_and_concurrencies_) {
for (int i = 0; i < entry.second; i++) {
// Create and request calls from the factory.
entry.first->CreateCall();
for (auto &entry : server_call_factories_) {
for (int i = 0; i < num_threads_; i++) {
entry->CreateCall();
}
}
// Start threads that polls incoming requests.
Expand All @@ -68,7 +67,7 @@ void GrpcServer::RegisterService(GrpcService &service) {
services_.emplace_back(service.GetGrpcService());

for (int i = 0; i < num_threads_; i++) {
service.InitServerCallFactories(cqs_[i], &server_call_factories_and_concurrencies_);
service.InitServerCallFactories(cqs_[i], &server_call_factories_);
}
}

Expand Down
16 changes: 6 additions & 10 deletions src/ray/rpc/grpc_server.h
Expand Up @@ -13,14 +13,13 @@
namespace ray {
namespace rpc {

#define RPC_SERVICE_HANDLER(SERVICE, HANDLER, CONCURRENCY) \
#define RPC_SERVICE_HANDLER(SERVICE, HANDLER) \
std::unique_ptr<ServerCallFactory> HANDLER##_call_factory( \
new ServerCallFactoryImpl<SERVICE, SERVICE##Handler, HANDLER##Request, \
HANDLER##Reply>( \
service_, &SERVICE::AsyncService::Request##HANDLER, service_handler_, \
&SERVICE##Handler::Handle##HANDLER, cq, main_service_)); \
server_call_factories_and_concurrencies->emplace_back( \
std::move(HANDLER##_call_factory), CONCURRENCY);
server_call_factories->emplace_back(std::move(HANDLER##_call_factory));

// Define a void RPC client method.
#define DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(METHOD) \
Expand Down Expand Up @@ -95,10 +94,8 @@ class GrpcServer {
bool is_closed_;
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
std::vector<std::reference_wrapper<grpc::Service>> services_;
/// The `ServerCallFactory` objects, and the maximum number of concurrent requests that
/// this gRPC server can handle.
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
server_call_factories_and_concurrencies_;
/// The `ServerCallFactory` objects.
std::vector<std::unique_ptr<ServerCallFactory>> server_call_factories_;
/// The number of completion queues the server is polling from.
int num_threads_;
/// The `ServerCompletionQueue` object used for polling events.
Expand Down Expand Up @@ -135,12 +132,11 @@ class GrpcService {
/// server can handle.
///
/// \param[in] cq The grpc completion queue.
/// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects,
/// \param[out] server_call_factories The `ServerCallFactory` objects,
/// and the maximum number of concurrent requests that this gRPC server can handle.
virtual void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) = 0;
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) = 0;

/// The main event loop, to which the service handler functions will be posted.
boost::asio::io_service &main_service_;
Expand Down
17 changes: 8 additions & 9 deletions src/ray/rpc/node_manager/node_manager_server.h
Expand Up @@ -10,13 +10,13 @@ namespace ray {
namespace rpc {

/// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler.
#define RAY_NODE_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats, 1) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC, 1)
#define RAY_NODE_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker) \
RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC)

/// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`.
class NodeManagerServiceHandler {
Expand Down Expand Up @@ -72,8 +72,7 @@ class NodeManagerGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_NODE_MANAGER_RPC_HANDLERS
}

Expand Down
11 changes: 5 additions & 6 deletions src/ray/rpc/object_manager/object_manager_server.h
Expand Up @@ -10,10 +10,10 @@
namespace ray {
namespace rpc {

#define RAY_OBJECT_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(ObjectManagerService, Push, 5) \
RPC_SERVICE_HANDLER(ObjectManagerService, Pull, 5) \
RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects, 2)
#define RAY_OBJECT_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(ObjectManagerService, Push) \
RPC_SERVICE_HANDLER(ObjectManagerService, Pull) \
RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects)

/// Implementations of the `ObjectManagerGrpcService`, check interface in
/// `src/ray/protobuf/object_manager.proto`.
Expand Down Expand Up @@ -53,8 +53,7 @@ class ObjectManagerGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_OBJECT_MANAGER_RPC_HANDLERS
}

Expand Down
7 changes: 2 additions & 5 deletions src/ray/rpc/server_call.h
Expand Up @@ -78,8 +78,6 @@ class ServerCallFactory {
public:
/// Create a new `ServerCall` and request gRPC runtime to start accepting the
/// corresponding type of requests.
///
/// \return Pointer to the `ServerCall` object.
virtual void CreateCall() const = 0;

virtual ~ServerCallFactory() = default;
Expand Down Expand Up @@ -141,6 +139,8 @@ class ServerCallImpl : public ServerCall {
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
const auto &factory = factory_;
// Create a new `ServerCall` to accept the next incoming request.
factory.CreateCall();
edoakes marked this conversation as resolved.
Show resolved Hide resolved
(service_handler_.*handle_request_function_)(
request_, &reply_,
[this](Status status, std::function<void()> success,
Expand All @@ -155,9 +155,6 @@ class ServerCallImpl : public ServerCall {
// this server call might be deleted
SendReply(status);
});
// We've finished handling this request,
// create a new `ServerCall` to accept the next incoming request.
factory.CreateCall();
}

void OnReplySent() override {
Expand Down
25 changes: 12 additions & 13 deletions src/ray/rpc/worker/core_worker_server.h
Expand Up @@ -14,17 +14,17 @@ class CoreWorker;
namespace rpc {

/// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler.
#define RAY_CORE_WORKER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask, 5) \
RPC_SERVICE_HANDLER(CoreWorkerService, PushTask, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, 9999)
#define RAY_CORE_WORKER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, PushTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady)

#define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignTask) \
Expand Down Expand Up @@ -70,8 +70,7 @@ class CoreWorkerGrpcService : public GrpcService {

void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_CORE_WORKER_RPC_HANDLERS
}

Expand Down