From 5a4e0866cedb879a8d6189185da726fb34ca7cba Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Mon, 10 Nov 2025 08:34:32 +0000 Subject: [PATCH] Correct distconf quorum for scenario when user adds new nodes to cluster --- .../blobstorage/base/blobstorage_events.cpp | 4 +- .../blobstorage/base/blobstorage_events.h | 4 +- ydb/core/blobstorage/nodewarden/distconf.cpp | 53 ++++--- ydb/core/blobstorage/nodewarden/distconf.h | 39 +++-- .../nodewarden/distconf_binding.cpp | 59 ++++++-- .../blobstorage/nodewarden/distconf_fsm.cpp | 63 +++++--- .../nodewarden/distconf_quorum.cpp | 13 +- .../nodewarden/distconf_scatter_gather.cpp | 143 +++++++++++++----- .../blobstorage/nodewarden/node_warden_impl.h | 5 +- .../nodewarden/node_warden_resource.cpp | 28 +--- .../lib/node_warden_mock_bsc.cpp | 2 +- ydb/core/health_check/health_check_ut.cpp | 6 +- .../bscontroller/ut_bscontroller/main.cpp | 2 +- .../ut_selfheal/node_warden_mock.h | 2 +- ydb/core/mind/hive/hive_ut.cpp | 2 +- .../blobstorage_distributed_config.proto | 1 + ydb/core/util/stlog.h | 32 ++++ 17 files changed, 305 insertions(+), 153 deletions(-) diff --git a/ydb/core/blobstorage/base/blobstorage_events.cpp b/ydb/core/blobstorage/base/blobstorage_events.cpp index cf9a446d3d71..d0240ae5a0f1 100644 --- a/ydb/core/blobstorage/base/blobstorage_events.cpp +++ b/ydb/core/blobstorage/base/blobstorage_events.cpp @@ -4,10 +4,8 @@ namespace NKikimr { TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(std::shared_ptr config, - std::shared_ptr proposedConfig, bool selfManagementEnabled, - TBridgeInfo::TPtr bridgeInfo) + bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo) : Config(std::move(config)) - , ProposedConfig(std::move(proposedConfig)) , SelfManagementEnabled(selfManagementEnabled) , BridgeInfo(std::move(bridgeInfo)) {} diff --git a/ydb/core/blobstorage/base/blobstorage_events.h b/ydb/core/blobstorage/base/blobstorage_events.h index 2182907c4430..61c6a35c25e3 100644 --- a/ydb/core/blobstorage/base/blobstorage_events.h +++ b/ydb/core/blobstorage/base/blobstorage_events.h @@ -592,13 +592,11 @@ namespace NKikimr { : TEventLocal { std::shared_ptr Config; - std::shared_ptr ProposedConfig; bool SelfManagementEnabled; TBridgeInfo::TPtr BridgeInfo; TEvNodeWardenStorageConfig(std::shared_ptr config, - std::shared_ptr proposedConfig, bool selfManagementEnabled, - TBridgeInfo::TPtr bridgeInfo); + bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo); ~TEvNodeWardenStorageConfig(); }; diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index 1ae6e2fa3ca5..50d85a96a5ae 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -130,7 +130,7 @@ namespace NKikimr::NStorage { ProposedStorageConfig.reset(); } - ReportStorageConfigToNodeWarden(0); + ReportStorageConfigToNodeWarden(); if (IsSelfStatic) { PersistConfig({}); @@ -177,15 +177,7 @@ namespace NKikimr::NStorage { } void TDistributedConfigKeeper::HandleConfigConfirm(STATEFN_SIG) { - if (ev->Cookie) { - STLOG(PRI_DEBUG, BS_NODE, NWDC46, "HandleConfigConfirm", (Cookie, ev->Cookie), - (ProposedStorageConfigCookie, ProposedStorageConfigCookie), - (ProposedStorageConfigCookieUsage, ProposedStorageConfigCookieUsage)); - if (ev->Cookie == ProposedStorageConfigCookie && ProposedStorageConfigCookieUsage) { - --ProposedStorageConfigCookieUsage; - } - FinishAsyncOperation(ev->Cookie); - } + Y_UNUSED(ev); } void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr ev) { @@ -233,13 +225,22 @@ namespace NKikimr::NStorage { for (const auto& [cookie, task] : ScatterTasks) { for (const ui32 nodeId : task.PendingNodes) { - const auto it = DirectBoundNodes.find(nodeId); - Y_ABORT_UNLESS(it != DirectBoundNodes.end()); - TBoundNode& info = it->second; - Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie)); + if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { + TBoundNode& info = it->second; + Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie)); + } else { + Y_ABORT_UNLESS(AddedNodesScatterTasks.contains({nodeId, cookie})); + } } } + for (const auto& [nodeId, cookie] : AddedNodesScatterTasks) { + const auto it = ScatterTasks.find(cookie); + Y_ABORT_UNLESS(it != ScatterTasks.end()); + TScatterTask& task = it->second; + Y_ABORT_UNLESS(task.PendingNodes.contains(nodeId)); + } + for (const auto& [nodeId, info] : DirectBoundNodes) { for (const ui64 cookie : info.ScatterTasks) { const auto it = ScatterTasks.find(cookie); @@ -250,10 +251,12 @@ namespace NKikimr::NStorage { } for (const auto& [cookie, task] : ScatterTasks) { - if (task.Origin) { - Y_ABORT_UNLESS(Binding); - Y_ABORT_UNLESS(task.Origin == Binding); - } + std::visit(TOverloaded{ + [&](const TBinding& origin) { Y_ABORT_UNLESS(origin == Binding); }, + [&](const TActorId& /*actorId*/) { Y_ABORT_UNLESS(!Binding); }, + [&](const TScatterTaskOriginFsm&) {}, + [&](const TScatterTaskOriginTargeted&) {} + }, task.Origin); } for (const auto& [nodeId, subs] : SubscribedSessions) { @@ -277,6 +280,10 @@ namespace NKikimr::NStorage { if (UnsubscribeQueue.contains(nodeId)) { okay = true; } + if (!okay) { + const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); + okay = it != AddedNodesScatterTasks.end() && std::get<0>(*it) == nodeId; + } Y_ABORT_UNLESS(okay); if (subs.SubscriptionCookie) { const auto it = SubscriptionCookieMap.find(subs.SubscriptionCookie); @@ -377,16 +384,12 @@ namespace NKikimr::NStorage { } } - void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden(ui64 cookie) { + void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden() { Y_ABORT_UNLESS(StorageConfig); const TActorId wardenId = MakeBlobStorageNodeWardenID(SelfId().NodeId()); const auto& config = SelfManagementEnabled ? StorageConfig : BaseConfig; - auto proposedConfig = ProposedStorageConfig && SelfManagementEnabled - ? std::make_shared(*ProposedStorageConfig) - : nullptr; - auto ev = std::make_unique(config, std::move(proposedConfig), SelfManagementEnabled, - BridgeInfo); - Send(wardenId, ev.release(), 0, cookie); + auto ev = std::make_unique(config, SelfManagementEnabled, BridgeInfo); + Send(wardenId, ev.release()); } STFUNC(TDistributedConfigKeeper::StateFunc) { diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 69072167546f..22b3fc5fe419 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -172,10 +172,29 @@ namespace NKikimr::NStorage { ui64 Id = RandomNumber(); // unique id }; + struct TScatterTaskOriginFsm { + TString ToString() const { return "fsm"; } + }; + + struct TScatterTaskOriginTargeted { + TActorId Sender; + ui64 Cookie; + TActorId InterconnectSessionId; + + TString ToString() const { return TStringBuilder() << "{Sender# " << Sender << " Cookie# " << Cookie + << " InterconnectSessionId# " << InterconnectSessionId << '}'; } + }; + + using TScatterTaskOrigin = std::variant< + TBinding, // when scatter is received from root + TActorId, // locally generated by invoke processor + TScatterTaskOriginFsm, // locally generated by configuration change FSM + TScatterTaskOriginTargeted // when targeted scatter is issued by cluster leader to newly added nodes + >; + struct TScatterTask { - const std::optional Origin; + const TScatterTaskOrigin Origin; const ui64 ScepterCounter; - const TActorId ActorId; THashSet PendingNodes; ui32 AsyncOperationsPending = 0; @@ -183,11 +202,9 @@ namespace NKikimr::NStorage { TEvGather Response; std::vector CollectedResponses; // from bound nodes - TScatterTask(const std::optional& origin, TEvScatter&& request, - ui64 scepterCounter, TActorId actorId) - : Origin(origin) + TScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, ui64 scepterCounter) + : Origin(std::move(origin)) , ScepterCounter(scepterCounter) - , ActorId(actorId) { Request.Swap(&request); if (Request.HasCookie()) { @@ -218,8 +235,7 @@ namespace NKikimr::NStorage { // proposed storage configuration of the cluster std::optional ProposedStorageConfig; // proposed one - ui64 ProposedStorageConfigCookie; // if set, then this configuration is being written right now - ui32 ProposedStorageConfigCookieUsage = 0; + std::optional ProposedStorageConfigCookie; // if set, then this configuration is being written right now // most relevant proposed config using TPersistCallback = std::function; @@ -266,6 +282,7 @@ namespace NKikimr::NStorage { ui64 NextScatterCookie = RandomNumber(); using TScatterTasks = THashMap; TScatterTasks ScatterTasks; + std::set> AddedNodesScatterTasks; std::optional StateStorageSelfHealActor; @@ -284,6 +301,7 @@ namespace NKikimr::NStorage { NKikimrBlobStorage::TStorageConfig StorageConfig; // storage config being proposed TActorId ActorId; // actor id waiting for this operation to complete bool MindPrev; // mind previous configuration quorum + std::vector AddedNodes; // a list of nodes being added in this configuration change }; std::optional CurrentProposition; @@ -348,7 +366,7 @@ namespace NKikimr::NStorage { void Halt(); // cease any distconf activity, unbind and reject any bindings bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config, bool fromBinding = false); void HandleConfigConfirm(STATEFN_SIG); - void ReportStorageConfigToNodeWarden(ui64 cookie); + void ReportStorageConfigToNodeWarden(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // PDisk configuration retrieval and storing @@ -411,6 +429,7 @@ namespace NKikimr::NStorage { void BecomeRoot(); void UnbecomeRoot(); void HandleErrorTimeout(); + void UndoCurrentPropositionNodeChange(TProposition& proposition); void ProcessGather(TEvGather *res); bool HasConnectedNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, bool local) const; @@ -487,7 +506,7 @@ namespace NKikimr::NStorage { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Scatter/gather logic - void IssueScatterTask(std::optional actorId, TEvScatter&& request); + void IssueScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, std::span addedNodes = {}); void CheckCompleteScatterTask(TScatterTasks::iterator it); void FinishAsyncOperation(ui64 cookie); void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task); diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp index 02d6a87defa0..9b7ff740b874 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp @@ -7,6 +7,11 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) { STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo"); + if (SelfManagementEnabled) { + // we obtain node list only from current StorageConfig + return; + } + std::vector> newNodeList; for (const auto& item : ev->Get()->Nodes) { if (item.IsStatic) { @@ -18,6 +23,8 @@ namespace NKikimr::NStorage { } void TDistributedConfigKeeper::ApplyNewNodeList(std::span> newNodeList) { + STLOG(PRI_DEBUG, BS_NODE, NWDC13, "ApplyNewNodeList", (NewNodeList, newNodeList)); + // do not start configuration negotiation for dynamic nodes if (!IsSelfStatic) { std::optional expectedBridgePileName; @@ -49,7 +56,8 @@ namespace NKikimr::NStorage { break; } } - Y_ABORT_UNLESS(found); + auto fn = [](const auto& x) { return TStringBuilder() << std::get<0>(x); }; + Y_VERIFY_S(found, "SelfNodeId# " << selfNodeId << " NewNodeList# " << FormatList(newNodeList | std::views::transform(fn))); // process all other nodes, find bindable ones (from our current pile) and build list of all nodes AllNodeIds.clear(); @@ -61,7 +69,7 @@ namespace NKikimr::NStorage { for (const auto& [item, location] : newNodeList) { const ui32 nodeId = item.NodeId(); - AllNodeIds.insert(item.NodeId()); + AllNodeIds.insert(nodeId); // check if node is from the same pile (as this one) if (location.GetBridgePileName() == SelfNodeBridgePileName) { @@ -297,6 +305,13 @@ namespace NKikimr::NStorage { if (Binding && Binding->NodeId == nodeId) { AbortBinding("disconnection", false); } + + // abort scatter tasks issued to newly added nodes + for (auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() && + std::get<0>(*it) == nodeId; it = AddedNodesScatterTasks.erase(it)) { + const auto& [nodeId, cookie] = *it; + AbortScatterTask(cookie, nodeId); + } } void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) { @@ -309,6 +324,10 @@ namespace NKikimr::NStorage { if (ConnectedDynamicNodes.contains(nodeId)) { return; } + if (const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() && + std::get<0>(*it) == nodeId) { + return; + } if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end()) { TSessionSubscription& subs = it->second; STLOG(PRI_DEBUG, BS_NODE, NWDC55, "UnsubscribeInterconnect", (NodeId, nodeId), (Subscription, subs)); @@ -319,11 +338,17 @@ namespace NKikimr::NStorage { Y_ABORT_UNLESS(jt != SubscriptionCookieMap.end()); Y_ABORT_UNLESS(jt->second == nodeId); SubscriptionCookieMap.erase(jt); + if (!AllNodeIds.contains(nodeId)) { + TActivationContext::Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0, + TActivationContext::InterconnectProxy(nodeId), {}, nullptr, 0)); + } } else { // we already had TEvNodeConnected, so we have to unsubscribe Y_ABORT_UNLESS(subs.SessionId); - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, subs.SessionId, SelfId(), - nullptr, 0)); + ui32 event = AllNodeIds.contains(nodeId) + ? TEvents::TSystem::Unsubscribe + : TEvents::TSystem::Poison; + TActivationContext::Send(new IEventHandle(event, 0, subs.SessionId, SelfId(), nullptr, 0)); } SubscribedSessions.erase(it); } @@ -403,7 +428,8 @@ namespace NKikimr::NStorage { return; // possible race with unbinding } - Y_ABORT_UNLESS(Binding->RootNodeId || ScatterTasks.empty()); + auto isTargeted = [](const TScatterTaskOrigin& origin) { return std::holds_alternative(origin); }; + Y_ABORT_UNLESS(Binding->RootNodeId || std::ranges::all_of(ScatterTasks | std::views::values, isTargeted, &TScatterTask::Origin)); // check if this binding was accepted and if it is acceptable from our point of view bool bindingUpdate = false; @@ -546,13 +572,10 @@ namespace NKikimr::NStorage { Y_ABORT_UNLESS(senderNodeId != SelfId().NodeId()); auto& record = ev->Get()->Record; - STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie), - (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record), - (RootNodeId, GetRootNodeId())); - // check if we have to send our current config to the peer const NKikimrBlobStorage::TStorageConfig *configToPeer = nullptr; std::optional requestStorageConfigGeneration; + const bool knownNode = AllNodeIds.contains(senderNodeId); if (StorageConfig) { for (const auto& item : record.GetBoundNodes()) { if (item.GetNodeId().GetNodeId() == senderNodeId) { @@ -566,9 +589,19 @@ namespace NKikimr::NStorage { } } - if (!AllNodeIds.contains(senderNodeId)) { + STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", + (NodeId, senderNodeId), + (Cookie, ev->Cookie), + (SessionId, ev->InterconnectSession), + (Binding, Binding), + (Record, record), + (RootNodeId, GetRootNodeId()), + (StorageConfigGeneration, StorageConfig ? (i64)StorageConfig->GetGeneration() : -1), + (KnownNode, knownNode)); + + if (!knownNode) { // node has been already deleted from the config, but new subscription is coming through -- ignoring it - SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer)); + SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr)); return; } @@ -577,7 +610,7 @@ namespace NKikimr::NStorage { STLOG(PRI_DEBUG, BS_NODE, NWDC28, "TEvNodeConfigPush rejected", (NodeId, senderNodeId), (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record)); - SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer)); + SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr)); return; } @@ -589,7 +622,7 @@ namespace NKikimr::NStorage { // nodes AND this is the root one } else { // this is either not the root node, or no quorum for connection - auto response = TEvNodeConfigReversePush::MakeRejected(configToPeer); + auto response = TEvNodeConfigReversePush::MakeRejected(nullptr); if (Binding && Binding->RootNodeId) { // command peer to join this specific node response->Record.SetRootNodeId(Binding->RootNodeId); diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index 1bc3261d58cd..a783e8ab4f8d 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -108,7 +108,10 @@ namespace NKikimr::NStorage { RootState = ERootState::ERROR_TIMEOUT; ErrorReason = reason; OpQueueOnError(reason); - CurrentProposition.reset(); + if (CurrentProposition) { + UndoCurrentPropositionNodeChange(*CurrentProposition); + CurrentProposition.reset(); + } CurrentSelfAssemblyUUID.reset(); ApplyConfigUpdateToDynamicNodes(true); AbortAllScatterTasks(std::nullopt); @@ -129,6 +132,15 @@ namespace NKikimr::NStorage { } } + void TDistributedConfigKeeper::UndoCurrentPropositionNodeChange(TProposition& proposition) { + if (!proposition.AddedNodes.empty()) { + Send(GetNameserviceActorId(), new TEvNodeWardenStorageConfig(StorageConfig, SelfManagementEnabled, BridgeInfo)); + for (const auto& nodeId : proposition.AddedNodes) { + UnsubscribeInterconnect(nodeId.NodeId()); + } + } + } + void TDistributedConfigKeeper::ProcessGather(TEvGather *res) { STLOG(PRI_DEBUG, BS_NODE, NWDC27, "ProcessGather", (RootState, RootState), (Res, *res)); @@ -460,9 +472,9 @@ namespace NKikimr::NStorage { // reset current proposition in advance auto proposition = *std::exchange(CurrentProposition, std::nullopt); + Y_ABORT_UNLESS(proposition.ActorId); auto finishWithError = [&](TString error) { - Y_ABORT_UNLESS(proposition.ActorId); Send(proposition.ActorId, new TEvPrivate::TEvConfigProposed(std::move(error))); }; @@ -480,7 +492,6 @@ namespace NKikimr::NStorage { ApplyStorageConfig(proposition.StorageConfig); // this proposition came from actor -- we notify that actor and finish operation - Y_ABORT_UNLESS(proposition.ActorId); Send(proposition.ActorId, new TEvPrivate::TEvConfigProposed(std::nullopt)); } else { STLOG(PRI_DEBUG, BS_NODE, NWDC47, "no quorum for ProposedStorageConfig", (Record, *res), @@ -490,6 +501,9 @@ namespace NKikimr::NStorage { finishWithError(TStringBuilder() << "no quorum for ProposedStorageConfig:" << err.Str()); } + // update nameservice configuration with correct values + UndoCurrentPropositionNodeChange(proposition); + // if this proposition was made by an actor, but it has died, then we have to return state to correct one if (DeadActorWaitingForProposition) { Y_ABORT_UNLESS(proposition.ActorId); @@ -520,7 +534,7 @@ namespace NKikimr::NStorage { } case TEvScatter::kProposeStorageConfig: - if (ProposedStorageConfigCookieUsage) { + if (ProposedStorageConfigCookie) { auto *status = task.Response.MutableProposeStorageConfig()->AddStatus(); SelfNode.Serialize(status->MutableNodeId()); status->SetStatus(TEvGather::TProposeStorageConfig::RACE); @@ -537,21 +551,13 @@ namespace NKikimr::NStorage { } else if (proposed.HasClusterState() && (!BridgeInfo || !NBridge::PileStateTraits(proposed.GetClusterState().GetPerPileState(BridgeInfo->SelfNodePile->BridgePileId.GetPileIndex())).RequiresConfigQuorum)) { // won't persist propsed config when this node is not part of the quorum } else { - ProposedStorageConfigCookie = cookie; + ProposedStorageConfigCookie.emplace(cookie); ProposedStorageConfig.emplace(proposed); - // issue notification to node warden - if (StorageConfig && StorageConfig->GetGeneration() && - StorageConfig->GetGeneration() < ProposedStorageConfig->GetGeneration()) { - ReportStorageConfigToNodeWarden(cookie); - ++task.AsyncOperationsPending; - ++ProposedStorageConfigCookieUsage; - } - PersistConfig([this, cookie](TEvPrivate::TEvStorageConfigStored& msg) { - Y_ABORT_UNLESS(ProposedStorageConfigCookieUsage); - Y_ABORT_UNLESS(cookie == ProposedStorageConfigCookie); - --ProposedStorageConfigCookieUsage; + Y_ABORT_UNLESS(ProposedStorageConfigCookie); + Y_ABORT_UNLESS(cookie == *ProposedStorageConfigCookie); + ProposedStorageConfigCookie.reset(); if (auto it = ScatterTasks.find(cookie); it != ScatterTasks.end()) { TScatterTask& task = it->second; @@ -578,7 +584,6 @@ namespace NKikimr::NStorage { }); ++task.AsyncOperationsPending; - ++ProposedStorageConfigCookieUsage; } break; @@ -772,11 +777,33 @@ namespace NKikimr::NStorage { .MindPrev = mindPrev, }); + Y_ABORT_UNLESS(StorageConfig); + const auto& currentNodesProto = StorageConfig->GetAllNodes(); + std::vector currentNodes(currentNodesProto.begin(), currentNodesProto.end()); + std::ranges::sort(currentNodes); + + const auto& newNodesProto = configToPropose->GetAllNodes(); + std::vector newNodes(newNodesProto.begin(), newNodesProto.end()); + std::ranges::sort(newNodes); + + std::vector addedNodes; + std::ranges::set_difference(newNodes, currentNodes, std::back_inserter(addedNodes)); + if (!addedNodes.empty()) { + Send(GetNameserviceActorId(), new TEvNodeWardenStorageConfig( + std::make_shared(CurrentProposition->StorageConfig), + SelfManagementEnabled, + Cfg->BridgeConfig + ? GenerateBridgeInfo(CurrentProposition->StorageConfig) + : nullptr + )); + CurrentProposition->AddedNodes = {addedNodes.begin(), addedNodes.end()}; + } + // issue scatter task TEvScatter task; task.SetTaskId(RandomNumber()); task.MutableProposeStorageConfig()->MutableConfig()->Swap(configToPropose); - IssueScatterTask(TActorId(), std::move(task)); + IssueScatterTask(TScatterTaskOriginFsm{}, std::move(task), addedNodes); return std::nullopt; } diff --git a/ydb/core/blobstorage/nodewarden/distconf_quorum.cpp b/ydb/core/blobstorage/nodewarden/distconf_quorum.cpp index 14c9b5c43e17..f1c73331d0ca 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_quorum.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_quorum.cpp @@ -350,15 +350,10 @@ namespace NKikimr::NStorage { bool HasConfigQuorum(const NKikimrBlobStorage::TStorageConfig& config, std::span successful, const THashMap& bridgePileNameMap, TBridgePileId singleBridgePileId, const TNodeWardenConfig& nwConfig, bool mindPrev, TStringStream *out) { - auto getQuorum = [&](auto& config, const char *name, bool allowUnformatted) { - // config quorum goes first -- if we have (or don't have one) -- we return it; if we don't know (because - // we have no static groups) -- then we use disk-wise quorum (more than 1/2 nodes with more than 1/2 disks) - auto q = HasStorageQuorum(config, successful, bridgePileNameMap, singleBridgePileId, nwConfig, - allowUnformatted, out, name); - return q ? *q : HasDiskQuorum(config, successful, bridgePileNameMap, singleBridgePileId, out, name); - }; - return getQuorum(config, "new", true) && (!mindPrev || !config.HasPrevConfig() || - getQuorum(config.GetPrevConfig(), "prev", false)); + return HasDiskQuorum(config, successful, bridgePileNameMap, singleBridgePileId, out, "new") && + HasStorageQuorum(config, successful, bridgePileNameMap, singleBridgePileId, nwConfig, true, out, "new").value_or(true) && + (!mindPrev || !config.HasPrevConfig() || HasStorageQuorum(config.GetPrevConfig(), successful, + bridgePileNameMap, singleBridgePileId, nwConfig, false, out, "prev").value_or(true)); } } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp index deb44a96cdfb..b25116dd78c6 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_scatter_gather.cpp @@ -2,20 +2,42 @@ namespace NKikimr::NStorage { - void TDistributedConfigKeeper::IssueScatterTask(std::optional actorId, TEvScatter&& request) { + void TDistributedConfigKeeper::IssueScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, + std::span addedNodes) { ui64 cookie = NextScatterCookie++; if (cookie == 0) { cookie = NextScatterCookie++; } - STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Request, request), (Cookie, cookie), (ActorId, actorId), - (Binding, Binding), (Scepter, Scepter ? std::make_optional(Scepter->Id) : std::nullopt)); - const auto [it, inserted] = ScatterTasks.try_emplace(cookie, Binding, std::move(request), ScepterCounter, - actorId.value_or(TActorId())); + if (std::holds_alternative(origin)) { + Y_ABORT_UNLESS(std::get(origin)); + } + STLOG(PRI_DEBUG, BS_NODE, NWDC21, "IssueScatterTask", (Request, request), (Cookie, cookie), (Origin, origin), + (Binding, Binding), (Scepter, Scepter ? std::make_optional(Scepter->Id) : std::nullopt), + (AddedNodes, addedNodes)); + const auto [it, inserted] = ScatterTasks.try_emplace(cookie, std::move(origin), std::move(request), ScepterCounter); Y_ABORT_UNLESS(inserted); TScatterTask& task = it->second; PrepareScatterTask(cookie, task); - for (auto& [nodeId, info] : DirectBoundNodes) { - IssueScatterTaskForNode(nodeId, info, cookie, task); + if (!std::holds_alternative(task.Origin)) { // issue scatter task downward unless this one is targeted + for (auto& [nodeId, info] : DirectBoundNodes) { + IssueScatterTaskForNode(nodeId, info, cookie, task); + } + for (const TNodeIdentifier& nodeId : addedNodes) { + auto ev = std::make_unique(); + ev->Record.CopyFrom(task.Request); + ev->Record.SetCookie(cookie); + ev->Record.SetTargeted(true); + auto interconnectSessionId = SubscribeToPeerNode(nodeId.NodeId(), TActorId()); + auto handle = std::make_unique(MakeBlobStorageNodeWardenID(nodeId.NodeId()), SelfId(), + ev.release(), 0, 0); + if (interconnectSessionId) { + handle->Rewrite(TEvInterconnect::EvForward, interconnectSessionId); + } + TActivationContext::Send(handle.release()); + const auto [it, inserted] = task.PendingNodes.insert(nodeId.NodeId()); + Y_ABORT_UNLESS(inserted); + AddedNodesScatterTasks.emplace(nodeId.NodeId(), cookie); + } } CheckCompleteScatterTask(it); } @@ -51,31 +73,43 @@ namespace NKikimr::NStorage { void TDistributedConfigKeeper::CompleteScatterTask(TScatterTask& task) { STLOG(PRI_DEBUG, BS_NODE, NWDC22, "CompleteScatterTask", (Request, task.Request)); - if (task.Origin) { + if (std::holds_alternative(task.Origin)) { Y_ABORT_UNLESS(Binding); // when binding is dropped, all scatter tasks must be dropped too - Y_ABORT_UNLESS(Binding == task.Origin); // binding must not change + Y_ABORT_UNLESS(Binding == std::get(task.Origin)); // binding must not change } PerformScatterTask(task); // do the local part - if (task.Origin) { - auto reply = std::make_unique(); - task.Response.Swap(&reply->Record); - SendEvent(*Binding, std::move(reply)); - } else if (task.ActorId) { - auto ev = std::make_unique(); - task.Response.Swap(&ev->Record); - Send(task.ActorId, ev.release()); - } else { - ProcessGather(task.ScepterCounter == ScepterCounter ? &task.Response : nullptr); - } + std::visit(TOverloaded{ + [&](const TBinding& binding) { + auto reply = std::make_unique(); + task.Response.Swap(&reply->Record); + SendEvent(binding, std::move(reply)); + }, + [&](const TActorId& actorId) { + auto ev = std::make_unique(); + task.Response.Swap(&ev->Record); + Send(actorId, ev.release()); + }, + [&](const TScatterTaskOriginFsm&) { + ProcessGather(task.ScepterCounter == ScepterCounter ? &task.Response : nullptr); + }, + [&](const TScatterTaskOriginTargeted& origin) { + auto reply = std::make_unique(); + task.Response.Swap(&reply->Record); + auto handle = std::make_unique(origin.Sender, SelfId(), reply.release(), 0, origin.Cookie); + Y_ABORT_UNLESS(origin.InterconnectSessionId); + handle->Rewrite(TEvInterconnect::EvForward, origin.InterconnectSessionId); + TActivationContext::Send(handle.release()); + } + }, task.Origin); } void TDistributedConfigKeeper::AbortScatterTask(ui64 cookie, ui32 nodeId) { STLOG(PRI_DEBUG, BS_NODE, NWDC23, "AbortScatterTask", (Cookie, cookie), (NodeId, nodeId)); const auto it = ScatterTasks.find(cookie); - Y_ABORT_UNLESS(it != ScatterTasks.end()); + Y_VERIFY_S(it != ScatterTasks.end(), "Cookie# " << cookie << " NodeId# " << nodeId); TScatterTask& task = it->second; const size_t n = task.PendingNodes.erase(nodeId); @@ -87,29 +121,53 @@ namespace NKikimr::NStorage { STLOG(PRI_DEBUG, BS_NODE, NWDC24, "AbortAllScatterTasks", (Binding, binding)); for (auto& [cookie, task] : std::exchange(ScatterTasks, {})) { - Y_ABORT_UNLESS(task.Origin == binding); - if (task.ActorId) { // terminate the task prematurely -- and notify actor + auto getAborted = [&] { auto ev = std::make_unique(); task.Response.SetAborted(true); task.Response.Swap(&ev->Record); - Send(task.ActorId, ev.release()); - } + return ev.release(); + }; + std::visit(TOverloaded{ + [&](const TBinding& origin) { + Y_ABORT_UNLESS(origin == binding); + }, + [&](const TActorId& actorId) { // terminate the task prematurely -- and notify actor + Y_ABORT_UNLESS(!binding); + Send(actorId, getAborted()); + }, + [&](const TScatterTaskOriginFsm&) { + }, + [&](const TScatterTaskOriginTargeted& origin) { // notify operation aborted + auto handle = std::make_unique(origin.Sender, SelfId(), getAborted(), 0, origin.Cookie); + Y_ABORT_UNLESS(origin.InterconnectSessionId); + handle->Rewrite(TEvInterconnect::EvForward, origin.InterconnectSessionId); + TActivationContext::Send(handle.release()); + } + }, task.Origin); + for (const ui32 nodeId : task.PendingNodes) { - const auto it = DirectBoundNodes.find(nodeId); - Y_ABORT_UNLESS(it != DirectBoundNodes.end()); - TBoundNode& info = it->second; - const size_t n = info.ScatterTasks.erase(cookie); - Y_ABORT_UNLESS(n == 1); + if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) { + TBoundNode& info = it->second; + const size_t n = info.ScatterTasks.erase(cookie); + Y_ABORT_UNLESS(n == 1); + } else { + const size_t n = AddedNodesScatterTasks.erase({nodeId, cookie}); + Y_ABORT_UNLESS(n == 1); + } } } + + Y_ABORT_UNLESS(AddedNodesScatterTasks.empty()); } void TDistributedConfigKeeper::Handle(TEvNodeConfigScatter::TPtr ev) { STLOG(PRI_DEBUG, BS_NODE, NWDC25, "TEvNodeConfigScatter", (Binding, Binding), (Sender, ev->Sender), (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); - if (Binding && Binding->Expected(*ev)) { - IssueScatterTask(std::nullopt, std::move(ev->Get()->Record)); + if (auto& record = ev->Get()->Record; Binding && Binding->Expected(*ev)) { + IssueScatterTask(*Binding, std::move(record)); + } else if (record.GetTargeted()) { + IssueScatterTask(TScatterTaskOriginTargeted{ev->Sender, ev->Cookie, ev->InterconnectSession}, std::move(record)); } } @@ -117,22 +175,27 @@ namespace NKikimr::NStorage { STLOG(PRI_DEBUG, BS_NODE, NWDC26, "TEvNodeConfigGather", (Sender, ev->Sender), (Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Record, ev->Get()->Record)); + auto& record = ev->Get()->Record; + const ui64 cookie = record.GetCookie(); + const ui32 senderNodeId = ev->Sender.NodeId(); - if (const auto it = DirectBoundNodes.find(senderNodeId); it != DirectBoundNodes.end() && it->second.Expected(*ev)) { - TBoundNode& info = it->second; - auto& record = ev->Get()->Record; - const ui64 cookie = record.GetCookie(); + const auto it = DirectBoundNodes.find(senderNodeId); + + if (it != DirectBoundNodes.end() ? it->second.Expected(*ev) : AddedNodesScatterTasks.erase({senderNodeId, cookie})) { if (const auto jt = ScatterTasks.find(cookie); jt != ScatterTasks.end()) { - const size_t n = info.ScatterTasks.erase(cookie); - Y_ABORT_UNLESS(n == 1); + if (it != DirectBoundNodes.end()) { + const size_t n = it->second.ScatterTasks.erase(cookie); + Y_ABORT_UNLESS(n == 1); + } TScatterTask& task = jt->second; record.Swap(&task.CollectedResponses.emplace_back()); const size_t m = task.PendingNodes.erase(senderNodeId); Y_ABORT_UNLESS(m == 1); CheckCompleteScatterTask(jt); - } else { - Y_DEBUG_ABORT_UNLESS(!info.ScatterTasks.contains(cookie)); + } + if (it == DirectBoundNodes.end()) { // maybe we don't need this node subscribed anymore + UnsubscribeInterconnect(senderNodeId); } } } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 6b1422e56893..fea0598d637c 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -715,9 +715,8 @@ namespace NKikimr::NStorage { void Handle(TEvNodeWardenQueryStorageConfig::TPtr ev); void Handle(TEvNodeWardenStorageConfig::TPtr ev); void HandleUnsubscribe(STATEFN_SIG); - void ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current, - const NKikimrBlobStorage::TNodeWardenServiceSet *proposed); - void ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConfig *proposed); + void ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current); + void ApplyStateStorageConfig(); void ApplyStaticServiceSet(const NKikimrBlobStorage::TNodeWardenServiceSet& ss); void Handle(TEventHandle::TPtr ev); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp index 7fa97a664c2f..00d50420730a 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp @@ -83,7 +83,7 @@ void TNodeWarden::Handle(TEvNodeWardenQueryStorageConfig::TPtr ev) { return; } - Send(ev->Sender, new TEvNodeWardenStorageConfig(StorageConfig, nullptr, SelfManagementEnabled, BridgeInfo)); + Send(ev->Sender, new TEvNodeWardenStorageConfig(StorageConfig, SelfManagementEnabled, BridgeInfo)); if (ev->Get()->Subscribe) { StorageConfigSubscribers.insert(ev->Sender); } @@ -98,30 +98,20 @@ void TNodeWarden::Handle(TEvNodeWardenStorageConfig::TPtr ev) { if (StorageConfig->HasBlobStorageConfig()) { const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); if (bsConfig.HasServiceSet()) { - const NKikimrBlobStorage::TNodeWardenServiceSet *proposed = nullptr; - if (const auto& proposedConfig = ev->Get()->ProposedConfig) { - Y_VERIFY_S(StorageConfig->GetGeneration() < proposedConfig->GetGeneration(), - "StorageConfig.Generation# " << StorageConfig->GetGeneration() - << " ProposedConfig.Generation# " << proposedConfig->GetGeneration()); - Y_ABORT_UNLESS(proposedConfig->HasBlobStorageConfig()); // must have the BlobStorageConfig and the ServiceSet - const auto& proposedBsConfig = proposedConfig->GetBlobStorageConfig(); - Y_ABORT_UNLESS(proposedBsConfig.HasServiceSet()); - proposed = &proposedBsConfig.GetServiceSet(); - } - ApplyStorageConfig(bsConfig.GetServiceSet(), proposed); + ApplyStorageConfig(bsConfig.GetServiceSet()); } SyncRateQuoter->UpdateBytesPerSecond(bsConfig.GetBridgeSyncRateBytesPerSecond()); } if (StorageConfig->HasStateStorageConfig() && StorageConfig->HasStateStorageBoardConfig() && StorageConfig->HasSchemeBoardConfig()) { - ApplyStateStorageConfig(ev->Get()->ProposedConfig.get()); + ApplyStateStorageConfig(); } else { Y_ABORT_UNLESS(!StorageConfig->HasStateStorageConfig() && !StorageConfig->HasStateStorageBoardConfig() && !StorageConfig->HasSchemeBoardConfig()); } for (const TActorId& subscriber : StorageConfigSubscribers) { - Send(subscriber, new TEvNodeWardenStorageConfig(StorageConfig, nullptr, SelfManagementEnabled, BridgeInfo)); + Send(subscriber, new TEvNodeWardenStorageConfig(StorageConfig, SelfManagementEnabled, BridgeInfo)); } if (StorageConfig->HasConfigComposite()) { @@ -172,17 +162,11 @@ void TNodeWarden::HandleUnsubscribe(STATEFN_SIG) { StorageConfigSubscribers.erase(ev->Sender); } -void TNodeWarden::ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current, - const NKikimrBlobStorage::TNodeWardenServiceSet *proposed) { - if (!proposed) { // just start the required services - // wipe out obsolete VSlots from running PDisks from current.Prev; however, it is not synchronous - return ApplyStaticServiceSet(current); - } - +void TNodeWarden::ApplyStorageConfig(const NKikimrBlobStorage::TNodeWardenServiceSet& current) { ApplyStaticServiceSet(current); } -void TNodeWarden::ApplyStateStorageConfig(const NKikimrBlobStorage::TStorageConfig* /*proposed*/) { +void TNodeWarden::ApplyStateStorageConfig() { if (!Cfg->DomainsConfig) { return; // no state storage management } diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/node_warden_mock_bsc.cpp b/ydb/core/blobstorage/ut_blobstorage/lib/node_warden_mock_bsc.cpp index 9fdd33ed725d..af6650ea0130 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/node_warden_mock_bsc.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/lib/node_warden_mock_bsc.cpp @@ -227,7 +227,7 @@ void TNodeWardenMockActor::Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpd void TNodeWardenMockActor::Handle(TEvNodeWardenQueryStorageConfig::TPtr ev) { Send(ev->Sender, new TEvNodeWardenStorageConfig(std::make_shared(), - nullptr, false, nullptr)); + false, nullptr)); } void TNodeWardenMockActor::HandleUnsubscribe(STATEFN_SIG) { diff --git a/ydb/core/health_check/health_check_ut.cpp b/ydb/core/health_check/health_check_ut.cpp index 12caa2e1201e..38414196550a 100644 --- a/ydb/core/health_check/health_check_ut.cpp +++ b/ydb/core/health_check/health_check_ut.cpp @@ -634,7 +634,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) { bridgeInfo->StaticNodeIdToPile[runtime.GetNodeId(0)] = &bridgeInfo->Piles[0]; bridgeInfo->StaticNodeIdToPile[runtime.GetNodeId(1)] = &bridgeInfo->Piles[1]; - auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), nullptr, false, nullptr); + auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), false, nullptr); nodeWardenStorageConfig->BridgeInfo = bridgeInfo; runtime.Send(new IEventHandle(GetNameserviceActorId(), runtime.AllocateEdgeActor(), nodeWardenStorageConfig)); @@ -2238,7 +2238,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) { bridgeInfo->SelfNodePile = bridgeInfo->Piles.data(); bridgeInfo->StaticNodeIdToPile[runtime.GetNodeId(0)] = &bridgeInfo->Piles[0]; - auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), nullptr, false, nullptr); + auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), false, nullptr); nodeWardenStorageConfig->BridgeInfo = bridgeInfo; runtime.Send(new IEventHandle(GetNameserviceActorId(), runtime.AllocateEdgeActor(), nodeWardenStorageConfig)); @@ -3044,7 +3044,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) { bridgeInfo->StaticNodeIdToPile[runtime.GetNodeId(2)] = &bridgeInfo->Piles[1]; bridgeInfo->StaticNodeIdToPile[runtime.GetNodeId(3)] = &bridgeInfo->Piles[1]; - auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), nullptr, false, nullptr); + auto* nodeWardenStorageConfig = new TEvNodeWardenStorageConfig(std::make_shared(), false, nullptr); nodeWardenStorageConfig->BridgeInfo = bridgeInfo; runtime.Send(new IEventHandle(GetNameserviceActorId(), runtime.AllocateEdgeActor(), nodeWardenStorageConfig)); diff --git a/ydb/core/mind/bscontroller/ut_bscontroller/main.cpp b/ydb/core/mind/bscontroller/ut_bscontroller/main.cpp index 2ac7d0f89287..3fada687d7e8 100644 --- a/ydb/core/mind/bscontroller/ut_bscontroller/main.cpp +++ b/ydb/core/mind/bscontroller/ut_bscontroller/main.cpp @@ -244,7 +244,7 @@ struct TEnvironmentSetup { void Handle(TEvNodeWardenQueryStorageConfig::TPtr ev) { Send(ev->Sender, new TEvNodeWardenStorageConfig(std::make_shared(), - nullptr, false, nullptr)); + false, nullptr)); } STATEFN(StateFunc) { diff --git a/ydb/core/mind/bscontroller/ut_selfheal/node_warden_mock.h b/ydb/core/mind/bscontroller/ut_selfheal/node_warden_mock.h index deabdefeaa2b..9639b70404ac 100644 --- a/ydb/core/mind/bscontroller/ut_selfheal/node_warden_mock.h +++ b/ydb/core/mind/bscontroller/ut_selfheal/node_warden_mock.h @@ -181,7 +181,7 @@ class TNodeWardenMock : public TActorBootstrapped { void Handle(TEvNodeWardenQueryStorageConfig::TPtr ev) { Send(ev->Sender, new TEvNodeWardenStorageConfig(std::make_shared(), - nullptr, false, nullptr)); + false, nullptr)); } STRICT_STFUNC(StateFunc, { diff --git a/ydb/core/mind/hive/hive_ut.cpp b/ydb/core/mind/hive/hive_ut.cpp index feec5c767c61..d5df13f3eda1 100644 --- a/ydb/core/mind/hive/hive_ut.cpp +++ b/ydb/core/mind/hive/hive_ut.cpp @@ -8463,7 +8463,7 @@ Y_UNIT_TEST_SUITE(THiveTest) { void Notify() { for (auto subscriber : Subscribers) { auto pile = (subscriber.NodeId() - Runtime.GetNodeId(0)) % 2; - auto ev = std::make_unique(nullptr, nullptr, true, BridgeInfos[pile]); + auto ev = std::make_unique(nullptr, true, BridgeInfos[pile]); Runtime.Send(new IEventHandle(subscriber, subscriber, ev.release())); } } diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 859db75d9072..554e68bea7b4 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -106,6 +106,7 @@ message TEvNodeConfigScatter { optional uint64 Cookie = 1; optional fixed64 TaskId = 4; // random (and not necessarily unique) identifier + optional bool Targeted = 5; // set when issuing updates to newly added nodes oneof Request { TCollectConfigs CollectConfigs = 2; diff --git a/ydb/core/util/stlog.h b/ydb/core/util/stlog.h index 7fe82adab8e7..da47c0c95e32 100644 --- a/ydb/core/util/stlog.h +++ b/ydb/core/util/stlog.h @@ -96,6 +96,7 @@ namespace NKikimr::NStLog { }; template struct TIsIterable { static constexpr bool value = false; }; + template struct TIsIterable> { static constexpr bool value = true; }; template struct TIsIterable> { static constexpr bool value = true; }; template struct TIsIterable> { static constexpr bool value = true; }; template struct TIsIterable> { static constexpr bool value = true; }; @@ -113,6 +114,12 @@ namespace NKikimr::NStLog { template struct TIsIdWrapper { static constexpr bool value = false; }; template struct TIsIdWrapper> { static constexpr bool value = true; }; + template struct TIsVariant { static constexpr bool value = false; }; + template struct TIsVariant> { static constexpr bool value = true; }; + + template struct TIsTuple { static constexpr bool value = false; }; + template struct TIsTuple> { static constexpr bool value = true; }; + template class TBoundParam : public Base { T Value; @@ -199,11 +206,24 @@ namespace NKikimr::NStLog { OutputParam(s, v); } s << '}'; + } else if constexpr (TIsVariant::value) { + std::visit([&](auto& x) { OutputParam(s, x); }, value); + } else if constexpr (TIsTuple::value) { + s << '['; + std::apply([&](const auto&... args) { OutputParam(s, args...); }, value); + s << ']'; } else { s << value; } } + template + static void OutputParam(IOutputStream& s, const TValue& first, const TRest&... rest) { + OutputParam(s, first); + s << ':'; + OutputParam(s, rest...); + } + template static void OutputParam(NJson::TJsonWriter& json, const TValue& value) { using Tx = std::decay_t; @@ -246,12 +266,24 @@ namespace NKikimr::NStLog { json.Write(value.GetRawId()); } else if constexpr (std::is_constructible_v) { json.Write(value); + } else if constexpr (TIsVariant::value) { + std::visit([&](auto& x) { OutputParam(json, x); }, value); + } else if constexpr (TIsTuple::value) { + json.OpenArray(); + std::apply([&](const auto&... args) { OutputParam(json, args...); }, value); + json.CloseArray(); } else { TStringStream stream; OutputParam(stream, value); json.Write(stream.Str()); } } + + template + static void OutputParam(NJson::TJsonWriter& json, const TValue& first, const TRest&... rest) { + OutputParam(json, first); + OutputParam(json, rest...); + } }; class TUnboundParam {