Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 121 additions & 50 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,108 @@

#include <ydb/core/tracing/tablet_info.h>

#include <ydb/core/blobstorage/base/blobstorage_events.h>

namespace NKikimr {

class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
TGRpcServersFactory GRpcServersFactory;
TGRpcServers GRpcServers;
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;

public:
enum {
EvStop = EventSpaceBegin(TEvents::ES_PRIVATE),
};

struct TEvStop : TEventLocal<TEvStop, EvStop> {
TManualEvent *Event;

TEvStop(TManualEvent *event)
: Event(event)
{}
};

public:
TGRpcServersManager(TGRpcServersFactory grpcServersFactory,
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
: GRpcServersFactory(std::move(grpcServersFactory))
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
{}

void Bootstrap() {
Become(&TThis::StateFunc);
Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true));
Start();
}

void Handle(TEvNodeWardenStorageConfig::TPtr ev) {
if (const auto& bridgeInfo = ev->Get()->BridgeInfo) {
if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) {
Start();
} else {
Stop();
}
}
}

void Start() {
if (GRpcServers) {
return;
}
GRpcServers = GRpcServersFactory();
for (auto& [name, server] : GRpcServers) {
if (!server) {
continue;
}

server->Start();

TString endpoint;
if (server->GetHost() != "[::]") {
endpoint = server->GetHost();
}

endpoint += Sprintf(":%d", server->GetPort());

Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()),
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(name, endpoint));

if (ProcessMemoryInfoProvider) {
auto memInfo = ProcessMemoryInfoProvider->Get();
NKikimrWhiteboard::TSystemStateInfo systemStateInfo;
if (memInfo.CGroupLimit) {
systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit);
} else if (memInfo.MemTotal) {
systemStateInfo.SetMemoryLimit(*memInfo.MemTotal);
}
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()),
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo));
}
}
}

void Stop() {
for (auto& [name, server] : GRpcServers) {
if (server) {
server->Stop();
}
}
GRpcServers.clear();
}

void HandleStop(TEvStop::TPtr ev) {
Stop();
ev->Get()->Event->Signal();
PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvNodeWardenStorageConfig, Handle)
hFunc(TEvStop, HandleStop)
)
};

class TDomainsInitializer : public IAppDataInitializer {
const NKikimrConfig::TAppConfig& Config;

Expand Down Expand Up @@ -432,15 +532,8 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr<TModuleFactories> factories)
}

TKikimrRunner::~TKikimrRunner() {
if (!!ActorSystem) {
// Stop ActorSystem first, so no one actor can call any grpc stuff.
if (ActorSystem) {
ActorSystem->Stop();
// After that stop sending any requests to actors
// by destroing grpc subsystem.
for (auto& serv : GRpcServers) {
serv.second.Destroy();
}

ActorSystem.Destroy();
}
}
Expand Down Expand Up @@ -552,8 +645,14 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
}

void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
}

TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) {
const auto& appConfig = runConfig.AppConfig;

TGRpcServers grpcServers;

auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NYdbGrpc::TGRpcServer& server, NYdbGrpc::TServerOptions& opts) {
const auto& services = grpcConfig.GetServices();
const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled();
Expand Down Expand Up @@ -1046,15 +1145,15 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
sslOpts.SetSslData(sslData);

GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });

fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
fillFn(grpcConfig, *grpcServers.back().second, sslOpts);
}

if (grpcConfig.GetPort()) {
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });

fillFn(grpcConfig, *GRpcServers.back().second, opts);
fillFn(grpcConfig, *grpcServers.back().second, opts);
}

for (auto &ex : grpcConfig.GetExtEndpoints()) {
Expand All @@ -1069,8 +1168,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
xopts.SetEndpointId(ex.GetEndpointId());
}

GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *grpcServers.back().second, xopts);
}

if (ex.HasSslPort() && ex.GetSslPort()) {
Expand Down Expand Up @@ -1108,11 +1207,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");

GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *grpcServers.back().second, xopts);
}
}
}

return grpcServers;
}

void TKikimrRunner::InitializeAllocator(const TKikimrRunConfig& runConfig) {
Expand Down Expand Up @@ -1889,30 +1990,7 @@ void TKikimrRunner::KikimrStart() {
Monitoring->Start(ActorSystem.Get());
}

for (auto& server : GRpcServers) {
if (server.second) {
server.second->Start();

TString endpoint;
if (server.second->GetHost() != "[::]") {
endpoint = server.second->GetHost();
}
endpoint += Sprintf(":%d", server.second->GetPort());
ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId),
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(server.first, endpoint));
if (ProcessMemoryInfoProvider) {
auto memInfo = ProcessMemoryInfoProvider->Get();
NKikimrWhiteboard::TSystemStateInfo systemStateInfo;
if (memInfo.CGroupLimit) {
systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit);
} else if (memInfo.MemTotal) {
systemStateInfo.SetMemoryLimit(*memInfo.MemTotal);
}
ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId),
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo));
}
}
}
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider));

if (SqsHttp) {
SqsHttp->Start();
Expand Down Expand Up @@ -2017,20 +2095,14 @@ void TKikimrRunner::KikimrStop(bool graceful) {
}

// stop processing grpc requests/response - we must stop feeding ActorSystem
for (auto& server : GRpcServers) {
if (server.second) {
server.second->Stop();
}
}
TManualEvent event;
ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event)));
event.WaitI();

if (ActorSystem) {
ActorSystem->Stop();
}

for (auto& server : GRpcServers) {
server.second.Destroy();
}

if (YqSharedResources) {
YqSharedResources->Stop();
}
Expand Down Expand Up @@ -2068,7 +2140,6 @@ void TKikimrRunner::OnTerminate(int) {
KikimrShouldContinue.ShouldStop(0);
}


void TKikimrRunner::SetSignalHandlers() {
#ifdef _unix_
signal(SIGPIPE, SIG_IGN);
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/driver_lib/run/run.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

namespace NKikimr {

using TGRpcServers = TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>>;
using TGRpcServersFactory = std::function<TGRpcServers()>;

class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
protected:
static TProgramShouldContinue KikimrShouldContinue;
Expand Down Expand Up @@ -56,8 +59,6 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
TIntrusivePtr<NInterconnect::TPollerThreads> PollerThreads;
TAutoPtr<TAppData> AppData;

TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>> GRpcServers;

TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
std::shared_ptr<TLogBackend> LogBackend;
TAutoPtr<TActorSystem> ActorSystem;
Expand All @@ -68,6 +69,9 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {

TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});

TGRpcServersFactory GRpcServersFactory;
TActorId GRpcServersManager;

virtual ~TKikimrRunner();

virtual void InitializeRegistries(const TKikimrRunConfig& runConfig);
Expand All @@ -85,6 +89,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig);

void InitializeGRpc(const TKikimrRunConfig& runConfig);
TGRpcServers CreateGRpcServers(const TKikimrRunConfig& runConfig);

void InitializeKqpController(const TKikimrRunConfig& runConfig);

Expand Down
Loading