diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index b3ba05ac4239..04ba0a8c6e86 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -169,8 +169,108 @@ #include +#include + namespace NKikimr { +class TGRpcServersManager : public TActorBootstrapped { + TGRpcServersFactory GRpcServersFactory; + TGRpcServers GRpcServers; + TIntrusivePtr ProcessMemoryInfoProvider; + +public: + enum { + EvStop = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + + struct TEvStop : TEventLocal { + TManualEvent *Event; + + TEvStop(TManualEvent *event) + : Event(event) + {} + }; + +public: + TGRpcServersManager(TGRpcServersFactory grpcServersFactory, + TIntrusivePtr processMemoryInfoProvider) + : GRpcServersFactory(std::move(grpcServersFactory)) + , ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider)) + {} + + void Bootstrap() { + Become(&TThis::StateFunc); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true)); + Start(); + } + + void Handle(TEvNodeWardenStorageConfig::TPtr ev) { + if (const auto& bridgeInfo = ev->Get()->BridgeInfo) { + if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) { + Start(); + } else { + Stop(); + } + } + } + + void Start() { + if (GRpcServers) { + return; + } + GRpcServers = GRpcServersFactory(); + for (auto& [name, server] : GRpcServers) { + if (!server) { + continue; + } + + server->Start(); + + TString endpoint; + if (server->GetHost() != "[::]") { + endpoint = server->GetHost(); + } + + endpoint += Sprintf(":%d", server->GetPort()); + + Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(name, endpoint)); + + if (ProcessMemoryInfoProvider) { + auto memInfo = ProcessMemoryInfoProvider->Get(); + NKikimrWhiteboard::TSystemStateInfo systemStateInfo; + if (memInfo.CGroupLimit) { + systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit); + } else if (memInfo.MemTotal) { + systemStateInfo.SetMemoryLimit(*memInfo.MemTotal); + } + Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo)); + } + } + } + + void Stop() { + for (auto& [name, server] : GRpcServers) { + if (server) { + server->Stop(); + } + } + GRpcServers.clear(); + } + + void HandleStop(TEvStop::TPtr ev) { + Stop(); + ev->Get()->Event->Signal(); + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvNodeWardenStorageConfig, Handle) + hFunc(TEvStop, HandleStop) + ) +}; + class TDomainsInitializer : public IAppDataInitializer { const NKikimrConfig::TAppConfig& Config; @@ -432,15 +532,8 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr factories) } TKikimrRunner::~TKikimrRunner() { - if (!!ActorSystem) { - // Stop ActorSystem first, so no one actor can call any grpc stuff. + if (ActorSystem) { ActorSystem->Stop(); - // After that stop sending any requests to actors - // by destroing grpc subsystem. - for (auto& serv : GRpcServers) { - serv.second.Destroy(); - } - ActorSystem.Destroy(); } } @@ -552,8 +645,14 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) { } void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { + GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); }; +} + +TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) { const auto& appConfig = runConfig.AppConfig; + TGRpcServers grpcServers; + auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NYdbGrpc::TGRpcServer& server, NYdbGrpc::TServerOptions& opts) { const auto& services = grpcConfig.GetServices(); const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled(); @@ -1046,15 +1145,15 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate(); sslOpts.SetSslData(sslData); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) }); + grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) }); - fillFn(grpcConfig, *GRpcServers.back().second, sslOpts); + fillFn(grpcConfig, *grpcServers.back().second, sslOpts); } if (grpcConfig.GetPort()) { - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) }); + grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) }); - fillFn(grpcConfig, *GRpcServers.back().second, opts); + fillFn(grpcConfig, *grpcServers.back().second, opts); } for (auto &ex : grpcConfig.GetExtEndpoints()) { @@ -1069,8 +1168,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { xopts.SetEndpointId(ex.GetEndpointId()); } - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) }); - fillFn(ex, *GRpcServers.back().second, xopts); + grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) }); + fillFn(ex, *grpcServers.back().second, xopts); } if (ex.HasSslPort() && ex.GetSslPort()) { @@ -1108,11 +1207,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set"); Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set"); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) }); - fillFn(ex, *GRpcServers.back().second, xopts); + grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) }); + fillFn(ex, *grpcServers.back().second, xopts); } } } + + return grpcServers; } void TKikimrRunner::InitializeAllocator(const TKikimrRunConfig& runConfig) { @@ -1889,30 +1990,7 @@ void TKikimrRunner::KikimrStart() { Monitoring->Start(ActorSystem.Get()); } - for (auto& server : GRpcServers) { - if (server.second) { - server.second->Start(); - - TString endpoint; - if (server.second->GetHost() != "[::]") { - endpoint = server.second->GetHost(); - } - endpoint += Sprintf(":%d", server.second->GetPort()); - ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId), - new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(server.first, endpoint)); - if (ProcessMemoryInfoProvider) { - auto memInfo = ProcessMemoryInfoProvider->Get(); - NKikimrWhiteboard::TSystemStateInfo systemStateInfo; - if (memInfo.CGroupLimit) { - systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit); - } else if (memInfo.MemTotal) { - systemStateInfo.SetMemoryLimit(*memInfo.MemTotal); - } - ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId), - new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo)); - } - } - } + GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider)); if (SqsHttp) { SqsHttp->Start(); @@ -2017,20 +2095,14 @@ void TKikimrRunner::KikimrStop(bool graceful) { } // stop processing grpc requests/response - we must stop feeding ActorSystem - for (auto& server : GRpcServers) { - if (server.second) { - server.second->Stop(); - } - } + TManualEvent event; + ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event))); + event.WaitI(); if (ActorSystem) { ActorSystem->Stop(); } - for (auto& server : GRpcServers) { - server.second.Destroy(); - } - if (YqSharedResources) { YqSharedResources->Stop(); } @@ -2068,7 +2140,6 @@ void TKikimrRunner::OnTerminate(int) { KikimrShouldContinue.ShouldStop(0); } - void TKikimrRunner::SetSignalHandlers() { #ifdef _unix_ signal(SIGPIPE, SIG_IGN); diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h index 6ffa49daf4b6..8368cfd3eab9 100644 --- a/ydb/core/driver_lib/run/run.h +++ b/ydb/core/driver_lib/run/run.h @@ -26,6 +26,9 @@ namespace NKikimr { +using TGRpcServers = TVector>>; +using TGRpcServersFactory = std::function; + class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { protected: static TProgramShouldContinue KikimrShouldContinue; @@ -56,8 +59,6 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { TIntrusivePtr PollerThreads; TAutoPtr AppData; - TVector>> GRpcServers; - TIntrusivePtr LogSettings; std::shared_ptr LogBackend; TAutoPtr ActorSystem; @@ -68,6 +69,9 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { TKikimrRunner(std::shared_ptr factories = {}); + TGRpcServersFactory GRpcServersFactory; + TActorId GRpcServersManager; + virtual ~TKikimrRunner(); virtual void InitializeRegistries(const TKikimrRunConfig& runConfig); @@ -85,6 +89,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig); void InitializeGRpc(const TKikimrRunConfig& runConfig); + TGRpcServers CreateGRpcServers(const TKikimrRunConfig& runConfig); void InitializeKqpController(const TKikimrRunConfig& runConfig);