diff --git a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp index 91630776f4df..db322cbbad83 100644 --- a/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp +++ b/ydb/core/blobstorage/bridge/proxy/bridge_proxy.cpp @@ -639,7 +639,7 @@ namespace NKikimr { TBridgeInfo::TPtr BridgeInfo; std::deque> PendingQ; - std::map>> PendingByGeneration; + std::deque>> PendingForNextGeneration; public: TBridgedBlobStorageProxyActor(TIntrusivePtr info) @@ -650,6 +650,32 @@ namespace NKikimr { void Bootstrap() { Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(/*subscribe=*/ true)); Become(&TThis::StateWaitBridgeInfo); + HandleWakeup(); + } + + void HandleWakeup() { + TMonotonic dropBefore = TActivationContext::Monotonic() - TDuration::Seconds(2); + while (!PendingForNextGeneration.empty()) { + if (auto& [timestamp, ev] = PendingForNextGeneration.front(); timestamp < dropBefore) { + switch (ev->GetTypeRewrite()) { +#define MAKE_ERROR(TYPE) \ + case TYPE::EventType: \ + Send(ev->Sender, static_cast(ev->GetBase())->MakeErrorResponse(NKikimrProto::ERROR, \ + "bridge request timed out", GroupId), 0, ev->Cookie); \ + break; + + DSPROXY_ENUM_EVENTS(MAKE_ERROR) +#undef MAKE_ERROR + default: + Y_ABORT(); + } + PendingForNextGeneration.pop_front(); + } else { + break; + } + } + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), + {}, nullptr, 0)); } void PassAway() override { @@ -815,7 +841,7 @@ namespace NKikimr { const ui32 myGeneration = bridgeGroupState.GetPile(pile.BridgePileId.GetPileIndex()).GetGroupGeneration(); if (myGeneration < msg->RacingGeneration) { - PendingByGeneration[Info->GroupGeneration + 1].push_back(std::move(handle)); + PendingForNextGeneration.emplace_back(TActivationContext::Monotonic(), std::move(handle)); } else if (msg->RacingGeneration < myGeneration) { // our generation is higher than the recipient's; we have to route this message through node warden // to ensure proxy's configuration gets in place @@ -879,17 +905,13 @@ namespace NKikimr { } void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) { - Info = std::move(ev->Get()->Info); - while (!PendingByGeneration.empty()) { - auto it = PendingByGeneration.begin(); - auto& [requiredGeneration, events] = *it; - if (Info->GroupGeneration < requiredGeneration) { - break; - } - for (auto& ev : events) { + auto prevInfo = std::exchange(Info, std::move(ev->Get()->Info)); + Y_ABORT_UNLESS(prevInfo); + Y_ABORT_UNLESS(Info); + if (prevInfo->GroupGeneration < Info->GroupGeneration) { + for (auto& [timestamp, ev] : std::exchange(PendingForNextGeneration, {})) { TActivationContext::Send(ev.release()); } - PendingByGeneration.erase(it); } } @@ -923,6 +945,7 @@ namespace NKikimr { hFunc(TEvBlobStorage::TEvConfigureProxy, Handle) hFunc(TEvNodeWardenStorageConfig, Handle) cFunc(TEvents::TSystem::Poison, PassAway) + cFunc(TEvents::TSystem::Wakeup, HandleWakeup) ) #undef HANDLE_RESULT diff --git a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp index 93b26badc7a1..0ed34b331d5d 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp @@ -260,51 +260,46 @@ namespace NKikimr::NStorage { return TStringBuilder() << "peer storage config invalid: " << *error; } else if (auto error = ValidateClusterState(*StorageConfig)) { return TStringBuilder() << "local storage config invalid: " << *error; - } - - // local/peer side is returning from DISCONNECTED state, validate cluster histories to ensure there were no - // definite split brain - if (auto error = ValidateClusterStateDetails(config)) { + } else if (auto error = ValidateClusterStateDetails(config)) { return TStringBuilder() << "peer cluster state history invalid: " << *error; } else if (auto error = ValidateClusterStateDetails(*StorageConfig)) { return TStringBuilder() << "local cluster state history invalid: " << *error; - } else if (auto error = CheckHistoryCompatibility(*StorageConfig, config)) { - // histories are incompatible, connection won't ever be possible - return error; } - const auto *peerPile = BridgeInfo->GetPile(peerBridgePileId); - if (peerPile == BridgeInfo->SelfNodePile) { - // no extra checks when connecting nodes from the same pile - return std::nullopt; - } + std::optional error; - if (StorageConfig->GetGeneration() == config.GetGeneration() && StorageConfig->GetFingerprint() != config.GetFingerprint()) { - return "config fingerprint mismatch"; - } + const auto *peerPile = BridgeInfo->GetPile(peerBridgePileId); + if (peerPile != BridgeInfo->SelfNodePile) { // extra checks when connecting between different piles + if (auto error = CheckHistoryCompatibility(*StorageConfig, config)) { + // histories are incompatible, connection won't ever be possible + return error; + } - const NKikimrBlobStorage::TStorageConfig& newerConfig = - StorageConfig->GetGeneration() < config.GetGeneration() - ? config - : *StorageConfig; + if (StorageConfig->GetGeneration() == config.GetGeneration() && StorageConfig->GetFingerprint() != config.GetFingerprint()) { + return "config fingerprint mismatch"; + } - const auto& cs = newerConfig.GetClusterState(); + const NKikimrBlobStorage::TStorageConfig& newerConfig = + StorageConfig->GetGeneration() < config.GetGeneration() + ? config + : *StorageConfig; - std::optional error; + const auto& cs = newerConfig.GetClusterState(); - if (!NBridge::PileStateTraits(cs.GetPerPileState(peerBridgePileId.GetPileIndex())).AllowsConnection) { - error = "peer is not allowed to connect"; - } else if (!NBridge::PileStateTraits(cs.GetPerPileState(SelfBridgePileId.GetPileIndex())).AllowsConnection) { - error = "local node is not allowed to accept peer"; - } + if (!NBridge::PileStateTraits(cs.GetPerPileState(peerBridgePileId.GetPileIndex())).AllowsConnection) { + error = "peer is not allowed to connect"; + } else if (!NBridge::PileStateTraits(cs.GetPerPileState(SelfBridgePileId.GetPileIndex())).AllowsConnection) { + error = "local node is not allowed to accept peer"; + } - if (!error) { - const auto& myClusterState = StorageConfig->GetClusterState(); - const auto& peerClusterState = config.GetClusterState(); - if (myClusterState.GetGeneration() < peerClusterState.GetGeneration()) { - error = "local cluster state is obsolete"; - } else if (peerClusterState.GetGeneration() < myClusterState.GetGeneration()) { - error = "peer cluster state is obsolete"; + if (!error) { + const auto& myClusterState = StorageConfig->GetClusterState(); + const auto& peerClusterState = config.GetClusterState(); + if (myClusterState.GetGeneration() < peerClusterState.GetGeneration()) { + error = "local cluster state is obsolete"; + } else if (peerClusterState.GetGeneration() < myClusterState.GetGeneration()) { + error = "peer cluster state is obsolete"; + } } } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp index 83d7f459fce1..5a4933d38da0 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp @@ -576,7 +576,7 @@ namespace NKikimr::NStorage { Y_ABORT_UNLESS(PDiskKey.NodeId == SelfId().NodeId()); Send(MakeBlobStoragePDiskID(PDiskKey.NodeId, PDiskKey.PDiskId), ConvertedEv.release(), IEventHandle::FlagTrackDelivery); - Become(&TThis::StateFunc, TDuration::Seconds(10), new TEvents::TEvWakeup); + Become(&TThis::StateFunc, TDuration::Seconds(30), new TEvents::TEvWakeup); } void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) {