diff --git a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py index 460327bcf419..1f193638d709 100644 --- a/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py +++ b/ydb/apps/dstool/lib/dstool_cmd_cluster_workload_run.py @@ -28,20 +28,8 @@ def add_options(p): p.add_argument('--enable-hard-switch-piles', action='store_true', help='Enable hard switch pile with setting PRIMARY') p.add_argument('--enable-disconnect-piles', action='store_true', help='Enable disconnect pile') p.add_argument('--fixed-pile-for-disconnect', type=int, help='Pile to disconnect') - - -def fetch_start_time_map(base_config): - start_time_map = {} - for node_id in {pdisk.NodeId for pdisk in base_config.PDisk}: - r = common.fetch_json_info('sysinfo', [node_id]) - if len(r) != 1: - return None - k, v = r.popitem() - assert k == node_id - if 'StartTime' not in v: - return None - start_time_map[node_id] = int(v['StartTime']) - return start_time_map + p.add_argument('--weight-restarts', type=float, default=1.0, help='weight for restart action') + p.add_argument('--weight-kill-tablets', type=float, default=1.0, help='weight for kill tablets action') def make_pdisk_key_config(pdisk_keys, node_id): @@ -97,8 +85,6 @@ def do(args): base_config = common.fetch_base_config() vslot_map = common.build_vslot_map(base_config) node_fqdn_map = common.build_node_fqdn_map(base_config) - if args.enable_pdisk_encryption_keys_changes or not args.disable_restarts: - start_time_map = fetch_start_time_map(base_config) except Exception: if config_retries is None: config_retries = 3 @@ -108,7 +94,21 @@ def do(args): config_retries -= 1 continue - tablets = common.fetch_json_info('tabletinfo') if args.enable_kill_tablets or args.enable_kill_blob_depot else {} + if args.enable_kill_tablets or args.enable_kill_blob_depot: + tablets = { + int(tablet['TabletId']) : tablet + for tablet in common.fetch('viewer/json/tabletinfo', dict(enums=1)).get('TabletStateInfo', []) + } + else: + tablets = {} + sysinfo = { + int(node['NodeId']): node + for node in common.fetch('viewer/json/sysinfo', dict(fields_required=-1, enums=1), cache=False).get('SystemStateInfo', []) + } + start_time_map = { + int(node['NodeId']): int(node['StartTime']) + for node in sysinfo.values() + } config_retries = None @@ -187,11 +187,10 @@ def match(x): return True def do_restart(node_id): - host = node_fqdn_map[node_id] + node = sysinfo[node_id] if args.enable_pdisk_encryption_keys_changes: update_pdisk_key_config(node_fqdn_map, pdisk_keys, node_id) - subprocess.call(['ssh', host, 'sudo', 'killall', '-%s' % args.kill_signal, 'kikimr']) - subprocess.call(['ssh', host, 'sudo', 'killall', '-%s' % args.kill_signal, 'ydbd']) + subprocess.call(['ssh', node['Host'], 'sudo', 'kill', '-%s' % args.kill_signal, node['PID']]) if args.enable_pdisk_encryption_keys_changes: remove_old_pdisk_keys(pdisk_keys, pdisk_key_versions, node_id) @@ -316,9 +315,9 @@ def do_connect_pile(pile_id, pile_id_to_hosts): possible_actions = [] if args.enable_kill_tablets: - possible_actions.append(('kill tablet', (do_kill_tablet,))) + possible_actions.append((args.weight_kill_tablets, 'kill tablet', (do_kill_tablet,))) if args.enable_kill_blob_depot: - possible_actions.append(('kill blob depot', (do_kill_blob_depot,))) + possible_actions.append((1.0, 'kill blob depot', (do_kill_blob_depot,))) evicts = [] wipes = [] @@ -361,19 +360,19 @@ def pick(v): action[0](*action[1:]) if evicts: - possible_actions.append(('evict', (pick, evicts))) + possible_actions.append((1.0, 'evict', (pick, evicts))) if wipes: - possible_actions.append(('wipe', (pick, wipes))) + possible_actions.append((1.0, 'wipe', (pick, wipes))) if readonlies: - possible_actions.append(('readonly', (pick, readonlies))) + possible_actions.append((1.0, 'readonly', (pick, readonlies))) if unreadonlies: - possible_actions.append(('un-readonly', (pick, unreadonlies))) + possible_actions.append((1.0, 'un-readonly', (pick, unreadonlies))) if pdisk_restarts: - possible_actions.append(('restart-pdisk', (pick, pdisk_restarts))) + possible_actions.append((1.0, 'restart-pdisk', (pick, pdisk_restarts))) if make_pdisks_readonly: - possible_actions.append(('make-pdisks-readonly', (pick, make_pdisks_readonly))) + possible_actions.append((1.0, 'make-pdisks-readonly', (pick, make_pdisks_readonly))) if make_pdisks_not_readonly: - possible_actions.append(('make-pdisks-not-readonly', (pick, make_pdisks_not_readonly))) + possible_actions.append((1.0, 'make-pdisks-not-readonly', (pick, make_pdisks_not_readonly))) restarts = [] @@ -385,12 +384,12 @@ def pick(v): nodes_to_restart = nodes_to_restart[:node_count//2] for node_id in nodes_to_restart: if args.enable_pdisk_encryption_keys_changes: - possible_actions.append(('add new pdisk key to node with id: %d' % node_id, (do_add_pdisk_key, node_id))) + possible_actions.append((1.0, 'add new pdisk key to node with id: %d' % node_id, (do_add_pdisk_key, node_id))) if not args.disable_restarts: restarts.append(('restart node with id: %d' % node_id, (do_restart, node_id))) if restarts: - possible_actions.append(('restart', (pick, restarts))) + possible_actions.append((args.weight_restarts, 'restart', (pick, restarts))) has_pile_operations = args.enable_soft_switch_piles or args.enable_hard_switch_piles or args.enable_disconnect_piles if has_pile_operations: @@ -418,14 +417,14 @@ def pick(v): can_hard_switch = (len(synchronized_piles) + len(promoted_piles) > 0) if args.enable_soft_switch_piles and can_soft_switch: - possible_actions.append(('soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles)))) + possible_actions.append((1.0, 'soft-switch-pile', (do_soft_switch_pile, random.choice(synchronized_piles)))) if args.enable_hard_switch_piles and can_hard_switch: - possible_actions.append(('hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles))) + possible_actions.append((1.0, 'hard-switch-pile', (do_hard_switch_pile, random.choice(promoted_piles + synchronized_piles), [primary_pile] + promoted_piles + synchronized_piles))) if len(disconnected_piles) > 0: - possible_actions.append(('connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints))) + possible_actions.append((1.0, 'connect-pile', (do_connect_pile, random.choice(disconnected_piles), pile_id_to_endpoints))) if args.enable_disconnect_piles and len(synchronized_piles) > 0: pile_to_disconnect = args.fixed_pile_for_disconnect if args.fixed_pile_for_disconnect is not None else random.choice([primary_pile] + synchronized_piles) - possible_actions.append(('disconnect-pile', (do_disconnect_pile, pile_to_disconnect, pile_id_to_endpoints))) + possible_actions.append((1.0, 'disconnect-pile', (do_disconnect_pile, pile_to_disconnect, pile_id_to_endpoints))) if not possible_actions: common.print_if_not_quiet(args, 'Waiting for the next round...', file=sys.stdout) @@ -434,7 +433,7 @@ def pick(v): ################################################################################################################ - action_name, action = random.choice(possible_actions) + (_, action_name, action), = random.choices(possible_actions, weights=[w for w, _, _ in possible_actions]) print('%s %s' % (action_name, datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))) try: diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 294decc8a321..25e4426ef734 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -1241,6 +1241,7 @@ struct TEvBlobStorage { bool ReportDetailedPartMap = false; bool PhantomCheck = false; bool Decommission = false; // is it generated by decommission actor and should be handled by the underlying proxy? + bool DoNotReportIndexRestoreGetMissingBlobs = false; struct TTabletData { TTabletData() = default; @@ -2604,6 +2605,7 @@ struct TEvBlobStorage { : SkipBlocksUpTo(origin.SkipBlocksUpTo) , SkipBarriersUpTo(origin.SkipBarriersUpTo) , SkipBlobsUpTo(origin.SkipBlobsUpTo) + , IgnoreDecommitState(origin.IgnoreDecommitState) , Reverse(origin.Reverse) {} diff --git a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp index db322cbbad83..502eaa812379 100644 --- a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp +++ b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp @@ -199,40 +199,27 @@ namespace NKikimr { return CreateWithErrorReason(ev, ev->Status, ev->Id, StatusFlags, self.GroupId, ApproximateFreeSpaceShare); } + std::unique_ptr Combine(TThis& /*self*/, TEvBlobStorage::TEvGetBlockResult *ev, auto *current) { + Y_ABORT_UNLESS(!current || current->TabletId == ev->TabletId); + return CreateWithErrorReason(ev, ev->Status, ev->TabletId, + Max(current ? current->BlockedGeneration : 0, ev->BlockedGeneration)); + } + template std::unique_ptr ProcessFullQuorumResponse(TThis& self, std::unique_ptr ev) { // combine responses - if (CombinedResponse) { - Y_DEBUG_ABORT_UNLESS(dynamic_cast(CombinedResponse.get())); - } + Y_DEBUG_ABORT_UNLESS(!CombinedResponse || dynamic_cast(CombinedResponse.get())); + Y_DEBUG_ABORT_UNLESS(ev->Status != NKikimrProto::NODATA); + const bool readyToReply = !ResponsesPending || ev->Status != NKikimrProto::OK; CombinedResponse = Combine(self, ev.get(), static_cast(CombinedResponse.get())); - const bool readyToReply = !ResponsesPending || - (ev->Status != NKikimrProto::OK && ev->Status != NKikimrProto::NODATA); - return readyToReply - ? std::exchange(CombinedResponse, nullptr) - : nullptr; - } - - template - std::unique_ptr ProcessPrimaryPileResponse(TThis& self, std::unique_ptr ev, - const TBridgeInfo::TPile& pile) { - if (CombinedResponse) { - Y_DEBUG_ABORT_UNLESS(dynamic_cast(CombinedResponse.get())); - } - if (ev->Status != NKikimrProto::OK && ev->Status != NKikimrProto::NODATA) { - // if any pile reports error, we finish with error - return MakeErrorFrom(self, ev.get()); - } - if (pile.IsPrimary) { - return ev; - } - Y_ABORT_UNLESS(ResponsesPending); - return nullptr; + return readyToReply ? std::move(CombinedResponse) : nullptr; } std::unique_ptr ProcessResponse(TThis& self, std::unique_ptr ev, const TBridgeInfo::TPile& /*pile*/, TRequestPayload& payload) { if (IsRestoring) { + Y_ABORT_UNLESS(!std::holds_alternative(State)); + if (ev->Status != NKikimrProto::OK) { // can't restore this blob return std::visit(TOverloaded{ [&](TGetState& state) -> std::unique_ptr { @@ -463,8 +450,8 @@ namespace NKikimr { } std::unique_ptr ProcessResponse(TThis& self, std::unique_ptr ev, - const TBridgeInfo::TPile& pile, TRequestPayload& /*payload*/) { - return ProcessPrimaryPileResponse(self, std::move(ev), pile); + const TBridgeInfo::TPile& /*pile*/, TRequestPayload& /*payload*/) { + return ProcessFullQuorumResponse(self, std::move(ev)); } std::unique_ptr ProcessResponse(TThis& self, std::unique_ptr ev, @@ -876,6 +863,7 @@ namespace NKikimr { STLOG(success ? PRI_INFO : PRI_NOTICE, BS_PROXY_BRIDGE, BPB01, "request finished", (RequestId, request->RequestId), + (Status, common->Status), (Response, response->ToString()), (Passed, TDuration::Seconds(request->Timer.Passed())), (SubrequestTimings, makeSubrequestTimings())); diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.cpp b/ydb/core/blobstorage/bridge/syncer/syncer.cpp index abcf1f11ae77..2bc1f32910c8 100644 --- a/ydb/core/blobstorage/bridge/syncer/syncer.cpp +++ b/ydb/core/blobstorage/bridge/syncer/syncer.cpp @@ -298,8 +298,10 @@ namespace NKikimr::NBridge { for (size_t i = 0; i < numBlobs; ++i) { q[i].Set(*jt++); } - IssueQuery(true, std::make_unique(q, numBlobs, TInstant::Max(), - NKikimrBlobStorage::FastRead, true, true)); + auto ev = std::make_unique(q, numBlobs, TInstant::Max(), + NKikimrBlobStorage::FastRead, true, true); + ev->DoNotReportIndexRestoreGetMissingBlobs = true; // they may be missing, do not report errors in this case + IssueQuery(true, std::move(ev)); RestoreQueue.erase(RestoreQueue.begin(), it); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp index f218de928f26..aa2a48e12852 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp @@ -17,6 +17,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA const TInstant Deadline; const bool IsInternal; const bool Decommission; + const bool DoNotReportIndexRestoreGetMissingBlobs; const std::optional ForceBlockTabletData; THashMap QuorumTracker; @@ -210,10 +211,12 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA for (ui32 i = 0; i < getResult.ResponseSz; ++i) { TEvBlobStorage::TEvGetResult::TResponse &response = getResult.Responses[i]; if (response.Status != NKikimrProto::OK) { - DSP_LOG_ERROR_S("DSPI07", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(status) - << " Response[" << i << "]# " << NKikimrProto::EReplyStatus_Name(response.Status) - << " for tablet# " << TabletId - << " BlobStatus# " << DumpBlobStatus()); + if (!DoNotReportIndexRestoreGetMissingBlobs) { + DSP_LOG_ERROR_S("DSPI07", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(status) + << " Response[" << i << "]# " << NKikimrProto::EReplyStatus_Name(response.Status) + << " for tablet# " << TabletId + << " BlobStatus# " << DumpBlobStatus()); + } SetPendingResultResponseStatus(response.Id, response.Status); } } @@ -245,6 +248,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA true /*mustRestoreFirst*/, true /*isIndexOnly*/, std::nullopt /*forceBlockTabletData*/, IsInternal); ev->RestartCounter = counter; ev->Decommission = Decommission; + ev->DoNotReportIndexRestoreGetMissingBlobs = DoNotReportIndexRestoreGetMissingBlobs; return ev; } @@ -264,6 +268,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA , Deadline(params.Common.Event->Deadline) , IsInternal(params.Common.Event->IsInternal) , Decommission(params.Common.Event->Decommission) + , DoNotReportIndexRestoreGetMissingBlobs(params.Common.Event->DoNotReportIndexRestoreGetMissingBlobs) , ForceBlockTabletData(params.Common.Event->ForceBlockTabletData) , VGetsInFlight(0) , GetHandleClass(params.Common.Event->GetHandleClass) diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp b/ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp index 4e846396ef6f..4d9468ced0c5 100644 --- a/ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp +++ b/ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp @@ -119,7 +119,7 @@ namespace NKikimr { // only issue command if there is a progress in FreeUpToLsn queue bool progress = false; - for (; FreeUpToLsn && FreeUpToLsn.front() < curLsn; FreeUpToLsn.pop_front()) { + for (; FreeUpToLsn && FreeUpToLsn.front() <= curLsn; FreeUpToLsn.pop_front()) { progress = true; } diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp index ea44d8eddfee..0946a3c7b4f6 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp @@ -349,7 +349,7 @@ namespace NKikimr { bool THullHugeKeeperPersState::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn, ui32 itemsAfterCommit) const { - return freeUpToLsn < minInFlightLsn && (PersistentLsn <= freeUpToLsn || itemsAfterCommit > 10000); + return freeUpToLsn <= minInFlightLsn && (!PersistentLsn || PersistentLsn < freeUpToLsn || itemsAfterCommit > 10000); } // initiate commit diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.cpp b/ydb/core/blobstorage/vdisk/query/assimilation.cpp index 1c6e8251be88..57e4d0ac0f8d 100644 --- a/ydb/core/blobstorage/vdisk/query/assimilation.cpp +++ b/ydb/core/blobstorage/vdisk/query/assimilation.cpp @@ -18,6 +18,8 @@ namespace NKikimr { size_t RecordSize; size_t RecordItems = 0; + TIntrusivePtr BarriersEssence; + static constexpr TDuration MaxQuantumTime = TDuration::MilliSeconds(10); static constexpr size_t MaxResultSize = 5'000'000; // may overshoot a little static constexpr size_t MaxResultItems = 10'000; @@ -139,7 +141,7 @@ namespace NKikimr { RecordSize += len; ++RecordItems; - return iter.Next(); + iter.Next(); } template @@ -192,6 +194,14 @@ namespace NKikimr { void Process(TBlobIter& iter) { const TKeyLogoBlob& key = iter; const TMemRecLogoBlob& memRec = iter; + + if (!BarriersEssence) { + BarriersEssence = Snap.BarriersSnap.CreateEssence(Snap.HullCtx); + } + if (auto keep = BarriersEssence->Keep(key, memRec, {}, Snap.HullCtx->AllowKeepFlags, true); !keep.KeepData) { + return iter.Next(); // we are not interested in this blob -- it is going to be collected + } + auto *pb = Result->Record.AddBlobs(); const TLogoBlobID id(key.LogoBlobID()); @@ -227,7 +237,7 @@ namespace NKikimr { RecordSize += len; ++RecordItems; - return iter.Next(); + iter.Next(); } template diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 034d9093985e..4e3962f31438 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -165,8 +165,108 @@ #include +#include + namespace NKikimr { +class TGRpcServersManager : public TActorBootstrapped { + TGRpcServersFactory GRpcServersFactory; + TGRpcServers GRpcServers; + TIntrusivePtr ProcessMemoryInfoProvider; + +public: + enum { + EvStop = EventSpaceBegin(TEvents::ES_PRIVATE), + }; + + struct TEvStop : TEventLocal { + TManualEvent *Event; + + TEvStop(TManualEvent *event) + : Event(event) + {} + }; + +public: + TGRpcServersManager(TGRpcServersFactory grpcServersFactory, + TIntrusivePtr processMemoryInfoProvider) + : GRpcServersFactory(std::move(grpcServersFactory)) + , ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider)) + {} + + void Bootstrap() { + Become(&TThis::StateFunc); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true)); + Start(); + } + + void Handle(TEvNodeWardenStorageConfig::TPtr ev) { + if (const auto& bridgeInfo = ev->Get()->BridgeInfo) { + if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) { + Start(); + } else { + Stop(); + } + } + } + + void Start() { + if (GRpcServers) { + return; + } + GRpcServers = GRpcServersFactory(); + for (auto& [name, server] : GRpcServers) { + if (!server) { + continue; + } + + server->Start(); + + TString endpoint; + if (server->GetHost() != "[::]") { + endpoint = server->GetHost(); + } + + endpoint += Sprintf(":%d", server->GetPort()); + + Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(name, endpoint)); + + if (ProcessMemoryInfoProvider) { + auto memInfo = ProcessMemoryInfoProvider->Get(); + NKikimrWhiteboard::TSystemStateInfo systemStateInfo; + if (memInfo.CGroupLimit) { + systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit); + } else if (memInfo.MemTotal) { + systemStateInfo.SetMemoryLimit(*memInfo.MemTotal); + } + Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), + new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo)); + } + } + } + + void Stop() { + for (auto& [name, server] : GRpcServers) { + if (server) { + server->Stop(); + } + } + GRpcServers.clear(); + } + + void HandleStop(TEvStop::TPtr ev) { + Stop(); + ev->Get()->Event->Signal(); + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvNodeWardenStorageConfig, Handle) + hFunc(TEvStop, HandleStop) + ) +}; + class TDomainsInitializer : public IAppDataInitializer { const NKikimrConfig::TAppConfig& Config; @@ -428,15 +528,8 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr factories) } TKikimrRunner::~TKikimrRunner() { - if (!!ActorSystem) { - // Stop ActorSystem first, so no one actor can call any grpc stuff. + if (ActorSystem) { ActorSystem->Stop(); - // After that stop sending any requests to actors - // by destroing grpc subsystem. - for (auto& serv : GRpcServers) { - serv.second.Destroy(); - } - ActorSystem.Destroy(); } } @@ -548,8 +641,14 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) { } void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { + GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); }; +} + +TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) { const auto& appConfig = runConfig.AppConfig; + TGRpcServers grpcServers; + auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NYdbGrpc::TGRpcServer& server, NYdbGrpc::TServerOptions& opts) { const auto& services = grpcConfig.GetServices(); const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled(); @@ -1042,15 +1141,15 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate(); sslOpts.SetSslData(sslData); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) }); + grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) }); - fillFn(grpcConfig, *GRpcServers.back().second, sslOpts); + fillFn(grpcConfig, *grpcServers.back().second, sslOpts); } if (grpcConfig.GetPort()) { - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) }); + grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) }); - fillFn(grpcConfig, *GRpcServers.back().second, opts); + fillFn(grpcConfig, *grpcServers.back().second, opts); } for (auto &ex : grpcConfig.GetExtEndpoints()) { @@ -1065,8 +1164,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { xopts.SetEndpointId(ex.GetEndpointId()); } - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) }); - fillFn(ex, *GRpcServers.back().second, xopts); + grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) }); + fillFn(ex, *grpcServers.back().second, xopts); } if (ex.HasSslPort() && ex.GetSslPort()) { @@ -1104,11 +1203,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set"); Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set"); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) }); - fillFn(ex, *GRpcServers.back().second, xopts); + grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) }); + fillFn(ex, *grpcServers.back().second, xopts); } } } + + return grpcServers; } void TKikimrRunner::InitializeAllocator(const TKikimrRunConfig& runConfig) { @@ -1874,30 +1975,7 @@ void TKikimrRunner::KikimrStart() { Monitoring->Start(ActorSystem.Get()); } - for (auto& server : GRpcServers) { - if (server.second) { - server.second->Start(); - - TString endpoint; - if (server.second->GetHost() != "[::]") { - endpoint = server.second->GetHost(); - } - endpoint += Sprintf(":%d", server.second->GetPort()); - ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId), - new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(server.first, endpoint)); - if (ProcessMemoryInfoProvider) { - auto memInfo = ProcessMemoryInfoProvider->Get(); - NKikimrWhiteboard::TSystemStateInfo systemStateInfo; - if (memInfo.CGroupLimit) { - systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit); - } else if (memInfo.MemTotal) { - systemStateInfo.SetMemoryLimit(*memInfo.MemTotal); - } - ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId), - new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo)); - } - } - } + GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider)); if (SqsHttp) { SqsHttp->Start(); @@ -2002,20 +2080,14 @@ void TKikimrRunner::KikimrStop(bool graceful) { } // stop processing grpc requests/response - we must stop feeding ActorSystem - for (auto& server : GRpcServers) { - if (server.second) { - server.second->Stop(); - } - } + TManualEvent event; + ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event))); + event.WaitI(); if (ActorSystem) { ActorSystem->Stop(); } - for (auto& server : GRpcServers) { - server.second.Destroy(); - } - if (YqSharedResources) { YqSharedResources->Stop(); } @@ -2053,7 +2125,6 @@ void TKikimrRunner::OnTerminate(int) { KikimrShouldContinue.ShouldStop(0); } - void TKikimrRunner::SetSignalHandlers() { #ifdef _unix_ signal(SIGPIPE, SIG_IGN); diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h index 6ffa49daf4b6..8368cfd3eab9 100644 --- a/ydb/core/driver_lib/run/run.h +++ b/ydb/core/driver_lib/run/run.h @@ -26,6 +26,9 @@ namespace NKikimr { +using TGRpcServers = TVector>>; +using TGRpcServersFactory = std::function; + class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { protected: static TProgramShouldContinue KikimrShouldContinue; @@ -56,8 +59,6 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { TIntrusivePtr PollerThreads; TAutoPtr AppData; - TVector>> GRpcServers; - TIntrusivePtr LogSettings; std::shared_ptr LogBackend; TAutoPtr ActorSystem; @@ -68,6 +69,9 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { TKikimrRunner(std::shared_ptr factories = {}); + TGRpcServersFactory GRpcServersFactory; + TActorId GRpcServersManager; + virtual ~TKikimrRunner(); virtual void InitializeRegistries(const TKikimrRunConfig& runConfig); @@ -85,6 +89,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig); void InitializeGRpc(const TKikimrRunConfig& runConfig); + TGRpcServers CreateGRpcServers(const TKikimrRunConfig& runConfig); void InitializeKqpController(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index 5c47b1117f63..a8e14518cf6e 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -687,6 +687,7 @@ message TTestShardControlRequest { optional uint32 PutTraceFractionPPM = 13; optional uint32 PutTraceVerbosity = 14 [default = 15]; optional uint32 SecondsBeforeLoadStart = 15; // number of seconds to wait before starting load + optional uint32 StallCounter = 16; // number of requests before issuing barrier and waiting for all of them to complete } optional uint64 TabletId = 1; diff --git a/ydb/core/protos/node_whiteboard.proto b/ydb/core/protos/node_whiteboard.proto index bf1686978236..ca8c9dadb6bc 100644 --- a/ydb/core/protos/node_whiteboard.proto +++ b/ydb/core/protos/node_whiteboard.proto @@ -367,6 +367,7 @@ message TSystemStateInfo { optional uint64 NetworkWriteThroughput = 42; optional uint32 RealNumberOfCpus = 43; // number of cpus without cgroups limitations repeated TSystemThreadInfo Threads = 44; + optional uint64 PID = 45; } message TEvSystemStateRequest { diff --git a/ydb/core/tablet/node_whiteboard.cpp b/ydb/core/tablet/node_whiteboard.cpp index 16328eaa0581..65e88302d519 100644 --- a/ydb/core/tablet/node_whiteboard.cpp +++ b/ydb/core/tablet/node_whiteboard.cpp @@ -16,6 +16,7 @@ #include #include +#include #include using namespace NActors; @@ -65,6 +66,7 @@ class TNodeWhiteboardService : public TActorBootstrapped } SystemStateInfo.SetStartTime(ctx.Now().MilliSeconds()); + SystemStateInfo.SetPID(GetPID()); ctx.Send(ctx.SelfID, new TEvPrivate::TEvUpdateRuntimeStats()); auto utils = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters, "utils"); diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index 43ec5379570b..df9c84b84979 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -31,9 +31,18 @@ namespace NKikimr::NTestShard { void TLoadActor::ClearKeys() { for (auto& [key, info] : Keys) { - Y_ABORT_UNLESS(info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED - ? info.ConfirmedKeyIndex < ConfirmedKeys.size() && ConfirmedKeys[info.ConfirmedKeyIndex] == key - : info.ConfirmedKeyIndex == Max()); + auto makeError = [&] { + TStringBuilder sb; + sb << "Key# " << key << " ConfirmedKeyIndex# " << info.ConfirmedKeyIndex + << " ConfirmedState# " << info.ConfirmedState << " PendingState# " << info.PendingState; + if (info.ConfirmedKeyIndex != Max()) { + sb << " KeyByIndex# " << ConfirmedKeys[info.ConfirmedKeyIndex]; + } + return TString(sb); + }; + Y_VERIFY_S(info.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED + ? info.ConfirmedKeyIndex < ConfirmedKeys.size() && ConfirmedKeys[info.ConfirmedKeyIndex] == key + : info.ConfirmedKeyIndex == Max(), makeError()); info.ConfirmedKeyIndex = Max(); } Keys.clear(); @@ -46,6 +55,7 @@ namespace NKikimr::NTestShard { Send(MakeStateServerInterfaceActorId(), new TEvStateServerConnect(Settings.GetStorageServerHost(), Settings.GetStorageServerPort())); Send(TabletActorId, new TTestShard::TEvSwitchMode(TTestShard::EMode::STATE_SERVER_CONNECT)); + IssuedConnect = true; } else { RunValidation(true); } @@ -53,7 +63,7 @@ namespace NKikimr::NTestShard { } void TLoadActor::PassAway() { - if (Settings.HasStorageServerHost()) { + if (IssuedConnect) { Send(MakeStateServerInterfaceActorId(), new TEvStateServerDisconnect); } if (ValidationActorId) { @@ -63,7 +73,7 @@ namespace NKikimr::NTestShard { } void TLoadActor::HandleWakeup() { - STLOG(PRI_NOTICE, TEST_SHARD, TS00, "voluntary restart", (TabletId, TabletId)); + STLOG(PRI_ERROR, TEST_SHARD, TS00, "voluntary restart", (TabletId, TabletId)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, Tablet, TabletActorId, nullptr, 0)); } @@ -71,7 +81,7 @@ namespace NKikimr::NTestShard { if (ValidationActorId) { // do nothing while validation is in progress return; } - if (StallCounter > 500) { + if (Settings.HasStallCounter() && StallCounter > Settings.GetStallCounter()) { if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) { StallCounter = 0; @@ -92,7 +102,7 @@ namespace NKikimr::NTestShard { const TMonotonic now = TActivationContext::Monotonic(); bool canWriteMore = false; - if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() && !DisableWrites) { + if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() && !DisableWrites && BytesOfData <= Settings.GetMaxDataBytes()) { if (NextWriteTimestamp <= now) { if (Settings.HasPatchRequestsFractionPPM() && !ConfirmedKeys.empty() && RandomNumber(1'000'000u) < Settings.GetPatchRequestsFractionPPM()) { @@ -102,7 +112,7 @@ namespace NKikimr::NTestShard { } if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) { NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods()); - canWriteMore = NextWriteTimestamp <= now; + canWriteMore = NextWriteTimestamp <= now && BytesOfData <= Settings.GetMaxDataBytes(); } else { NextWriteTimestamp = TMonotonic::Max(); } @@ -151,6 +161,7 @@ namespace NKikimr::NTestShard { if (ev->Get()->Connected) { RunValidation(true); } else { + STLOG(PRI_ERROR, TEST_SHARD, TS33, "state server not connected", (TabletId, TabletId)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, TabletActorId, SelfId(), nullptr, 0)); PassAway(); } @@ -207,7 +218,7 @@ namespace NKikimr::NTestShard { const TString& key = nh.mapped(); const auto it = Keys.find(key); Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict"); - STLOG(PRI_WARN, TEST_SHARD, TS27, "patch failed", (TabletId, TabletId), (Key, key)); + STLOG(PRI_WARN, TEST_SHARD, TS34, "patch failed", (TabletId, TabletId), (Key, key)); RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::DELETED); } if (const auto it = DeletesInFlight.find(record.GetCookie()); it != DeletesInFlight.end()) { diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index 6fafed1b717c..6480cdd15bc9 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -14,6 +14,7 @@ namespace NKikimr::NTestShard { const TActorId Tablet; TActorId TabletActorId; const NKikimrClient::TTestShardControlRequest::TCmdInitialize Settings; + bool IssuedConnect = false; ui64 ValidationRunningCount = 0; diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index e2674f4198c8..5fdb24858800 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -326,6 +326,7 @@ namespace NKikimr::NTestShard { Y_FAIL_S("ERROR from StateServer TabletId# " << TabletId); case ::NTestShard::TStateServer::RACE: + STLOG(PRI_ERROR, TEST_SHARD, TS22, "received RACE in TEvStateServerReadResult", (TabletId, TabletId)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, TabletActorId, SelfId(), nullptr, 0)); PassAway(); return; @@ -483,6 +484,7 @@ namespace NKikimr::NTestShard { Y_FAIL_S("ERROR from StateServer TabletId# " << TabletId); case ::NTestShard::TStateServer::RACE: + STLOG(PRI_ERROR, TEST_SHARD, TS32, "received RACE in TEvStateServerWriteResult", (TabletId, TabletId)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, TabletActorId, SelfId(), nullptr, 0)); PassAway(); return; diff --git a/ydb/core/test_tablet/load_actor_state.cpp b/ydb/core/test_tablet/load_actor_state.cpp index f6e34965d1ab..f5a02a00d336 100644 --- a/ydb/core/test_tablet/load_actor_state.cpp +++ b/ydb/core/test_tablet/load_actor_state.cpp @@ -21,13 +21,15 @@ namespace NKikimr::NTestShard { Y_ABORT_UNLESS(from != ::NTestShard::TStateServer::DELETED); Y_ABORT_UNLESS(to != ::NTestShard::TStateServer::ABSENT); + if (from == ::NTestShard::TStateServer::CONFIRMED) { // key was confirmed, unconfirm it + MakeUnconfirmed(key); + } + if (from == ::NTestShard::TStateServer::WRITE_PENDING && to == ::NTestShard::TStateServer::CONFIRMED) { + BytesOfData += key.second.Len; + } + if (!Settings.HasStorageServerHost()) { - if (from == ::NTestShard::TStateServer::WRITE_PENDING && to == ::NTestShard::TStateServer::CONFIRMED) { - BytesOfData += key.second.Len; - } - if (from == ::NTestShard::TStateServer::CONFIRMED) { - MakeUnconfirmed(key); - } else if (to == ::NTestShard::TStateServer::CONFIRMED) { + if (to == ::NTestShard::TStateServer::CONFIRMED) { MakeConfirmed(key); } if (to == ::NTestShard::TStateServer::DELETED) { @@ -79,6 +81,7 @@ namespace NKikimr::NTestShard { Y_FAIL_S("ERROR from StateServer TabletId# " << TabletId); case ::NTestShard::TStateServer::RACE: + STLOG(PRI_ERROR, TEST_SHARD, TS35, "received RACE in TEvStateServerWriteResult", (TabletId, TabletId)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, TabletActorId, SelfId(), nullptr, 0)); PassAway(); return; @@ -94,15 +97,9 @@ namespace NKikimr::NTestShard { // account data bytes if confirming written key Y_ABORT_UNLESS(key.second.ConfirmedState != key.second.PendingState); - if (key.second.ConfirmedState == ::NTestShard::TStateServer::WRITE_PENDING && - key.second.PendingState == ::NTestShard::TStateServer::CONFIRMED) { - BytesOfData += key.second.Len; - } // switch to correct state - if (key.second.ConfirmedState == ::NTestShard::TStateServer::CONFIRMED) { - MakeUnconfirmed(key); - } else if (key.second.PendingState == ::NTestShard::TStateServer::CONFIRMED) { + if (key.second.PendingState == ::NTestShard::TStateServer::CONFIRMED) { MakeConfirmed(key); } key.second.ConfirmedState = key.second.PendingState;