diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index 3bd981829c3d..9bfdba23dbfe 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped(); response->NodeIds.reserve(Peers.size()); for (const auto& info : Peers) { diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e05dd9b82edc..68c0e3619cac 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig { // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) // Request will hang up infinitely, disabled by default - uint64 TopicPartitionsLimitPerNode = 4; + uint64 TopicPartitionsLimitPerNode = 4; // deprecated + uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected. } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index dcec2158bd83..ad0a0e99c6f4 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -16,6 +16,8 @@ #include #include +#include + namespace NFq { using namespace NActors; @@ -25,6 +27,8 @@ using NYql::TIssues; namespace { +const ui64 DefaultRebalancingTimeoutSec = 120; + //////////////////////////////////////////////////////////////////////////////// struct TCoordinatorMetrics { @@ -32,31 +36,37 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChanged = Counters->GetCounter("LeaderChanged", true); - PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode"); + KnownRowDispatchers = Counters->GetCounter("KnownRowDispatchers"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; ::NMonitoring::TDynamicCounters::TCounterPtr IsActive; - ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; + ::NMonitoring::TDynamicCounters::TCounterPtr KnownRowDispatchers; }; struct TEvPrivate { enum EEv : ui32 { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin, + EvListNodes, + EvRebalancing, + EvStartingTimeout, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; + struct TEvListNodes : public NActors::TEventLocal {}; + struct TEvRebalancing : public NActors::TEventLocal {}; + struct TEvStartingTimeout : public NActors::TEventLocal {}; }; class TActorCoordinator : public TActorBootstrapped { static constexpr ui64 PrintStatePeriodSec = 300; static constexpr ui64 PrintStateToLogSplitSize = 64000; - static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(1); + static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(10); struct TTopicKey { TString Endpoint; @@ -103,11 +113,18 @@ class TActorCoordinator : public TActorBootstrapped { } }; - struct RowDispatcherInfo { - RowDispatcherInfo(bool connected, bool isLocal) + enum class ENodeState { + Initializing, // wait timeout after connected + Started + }; + + struct TRowDispatcherInfo { + TRowDispatcherInfo(bool connected, ENodeState state, bool isLocal) : Connected(connected) + , State(state) , IsLocal(isLocal) {} bool Connected = false; + ENodeState State; bool IsLocal = false; THashSet Locations; }; @@ -178,16 +195,20 @@ class TActorCoordinator : public TActorBootstrapped { NConfig::TRowDispatcherCoordinatorConfig Config; TYqSharedResources::TPtr YqSharedResources; TActorId LocalRowDispatcherId; - NActors::TActorId NodesManagerId; const TString LogPrefix; const TString Tenant; - TMap RowDispatchers; + TMap RowDispatchers; THashMap PartitionLocations; THashMap TopicsInfo; std::unordered_map PendingReadActors; + std::unordered_set KnownReadActors; TCoordinatorMetrics Metrics; THashSet InterconnectSessions; ui64 NodesCount = 0; + NActors::TActorId NodesManagerId; + bool RebalancingScheduled = false; + ENodeState State = ENodeState::Initializing; + TDuration RebalancingTimeout; public: TActorCoordinator( @@ -209,6 +230,10 @@ class TActorCoordinator : public TActorBootstrapped { void Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev); void Handle(TEvPrivate::TEvPrintState::TPtr&); + void Handle(TEvPrivate::TEvListNodes::TPtr&); + void Handle(TEvPrivate::TEvRebalancing::TPtr&); + void Handle(TEvPrivate::TEvStartingTimeout::TPtr&); + void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&); void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&); STRICT_STFUNC( @@ -220,12 +245,16 @@ class TActorCoordinator : public TActorBootstrapped { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle); hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); + hFunc(TEvPrivate::TEvListNodes, Handle); + hFunc(TEvPrivate::TEvRebalancing, Handle); + hFunc(TEvPrivate::TEvStartingTimeout, Handle); + hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle); hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle); }) private: - void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); + void UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic); std::optional GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds); // std::nullopt if TopicPartitionsLimitPerNode reached @@ -233,8 +262,10 @@ class TActorCoordinator : public TActorBootstrapped { void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); TString GetInternalState(); - bool IsReady() const; + bool IsReadyPartitionDistribution() const; void SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message); + void ScheduleNodeInfoRequest() const; + void UpdateGlobalState(); }; TActorCoordinator::TActorCoordinator( @@ -247,26 +278,31 @@ TActorCoordinator::TActorCoordinator( : Config(config) , YqSharedResources(yqSharedResources) , LocalRowDispatcherId(localRowDispatcherId) - , NodesManagerId(nodesManagerId) , LogPrefix("Coordinator: ") , Tenant(tenant) , Metrics(counters) + , NodesManagerId(nodesManagerId) + , RebalancingTimeout(TDuration::Seconds(Config.GetRebalancingTimeoutSec() ? Config.GetRebalancingTimeoutSec() : DefaultRebalancingTimeoutSec)) { - AddRowDispatcher(localRowDispatcherId, true); + UpdateKnownRowDispatchers(localRowDispatcherId, true); } void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); - Send(NodesManagerId, new NFq::TEvNodesManager::TEvGetNodesRequest(), IEventHandle::FlagTrackDelivery); - + ScheduleNodeInfoRequest(); + Schedule(RebalancingTimeout, new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big - LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId()); + LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() + << ", NodesManagerId " << NodesManagerId + << ", rebalancing timeout " << RebalancingTimeout); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } -void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal) { +void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal) { + LOG_ROW_DISPATCHER_TRACE("UpdateKnownRowDispatchers " << actorId.ToString()); + auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; @@ -290,9 +326,23 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal UpdatePendingReadActors(); return; } + auto nodeState = State == ENodeState::Initializing ? ENodeState::Started : ENodeState::Initializing; + if (PartitionLocations.empty()) { + nodeState = ENodeState::Started; + } + + LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); + RowDispatchers.emplace(actorId, TRowDispatcherInfo{true, nodeState, isLocal}); + UpdateGlobalState(); + + if (nodeState == ENodeState::Initializing && !RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal}); UpdatePendingReadActors(); + Metrics.KnownRowDispatchers->Set(RowDispatchers.size()); } void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { @@ -310,7 +360,7 @@ void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& inte void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender); UpdateInterconnectSessions(ev->InterconnectSession); - AddRowDispatcher(ev->Sender, false); + UpdateKnownRowDispatchers(ev->Sender, false); LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender); Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery); } @@ -320,7 +370,7 @@ TString TActorCoordinator::GetInternalState() { str << "Known row dispatchers:\n"; for (const auto& [actorId, info] : RowDispatchers) { - str << " " << actorId << ", connected " << info.Connected << "\n"; + str << " " << actorId << ", state " << static_cast(info.State) << "\n"; } str << "\nLocations:\n"; @@ -358,6 +408,12 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: } Y_ENSURE(!info.IsLocal, "EvNodeDisconnected from local row dispatcher"); info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } } } @@ -375,6 +431,12 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { continue; } info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } return; } } @@ -400,7 +462,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition auto& topicInfo = GetOrCreateTopicInfo(key.Topic); - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { topicInfo.AddPendingPartition(key); return std::nullopt; } @@ -408,7 +470,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { - if (!info.Connected) { + if (info.State != ENodeState::Started) { continue; } if (!filteredNodeIds.empty() && !filteredNodeIds.contains(location.NodeId())) { @@ -442,6 +504,8 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { const auto& source = ev->Get()->Record.GetSource(); + KnownReadActors.insert(ev->Sender); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -506,7 +570,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC } void TActorCoordinator::UpdatePendingReadActors() { - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { return; } for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) { @@ -523,20 +587,96 @@ void TActorCoordinator::Handle(TEvPrivate::TEvPrintState::TPtr&) { PrintInternalState(); } -void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) { - NodesCount = ev->Get()->NodeIds.size(); - LOG_ROW_DISPATCHER_DEBUG("TEvGetNodesResponse, nodes count " << NodesCount); - if (!NodesCount) { - NActors::TActivationContext::Schedule(NodesManagerRetryPeriod, new IEventHandle(NodesManagerId, SelfId(), new NFq::TEvNodesManager::TEvGetNodesRequest(), IEventHandle::FlagTrackDelivery)); +void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr& ev) { + if (!ev->Get()->Success) { + LOG_ROW_DISPATCHER_ERROR("Failed to get TEvLookupResult, try later..."); + ScheduleNodeInfoRequest(); + return; } + LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << ev->Get()->AssignedNodes.size() << ", AssignedNodes: " << JoinSeq(", ", ev->Get()->AssignedNodes)); + NodesCount = ev->Get()->AssignedNodes.size(); + UpdateGlobalState(); UpdatePendingReadActors(); } -bool TActorCoordinator::IsReady() const { - if (!NodesCount) { - return false; +void TActorCoordinator::Handle(TEvPrivate::TEvListNodes::TPtr&) { + if (NodesManagerId) { + LOG_ROW_DISPATCHER_DEBUG("Send TEvGetNodesRequest to NodesManager"); + Send(NodesManagerId, new NFq::TEvNodesManager::TEvGetNodesRequest(), IEventHandle::FlagTrackDelivery); + } else { + LOG_ROW_DISPATCHER_DEBUG("Send NodeEnumerationLookup request"); + Register(NKikimr::CreateTenantNodeEnumerationLookup(SelfId(), Tenant)); + } +} + +void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { + LOG_ROW_DISPATCHER_DEBUG("Rebalancing..."); + RebalancingScheduled = false; + + bool needRebalance = false; + TSet toDelete; + + auto printState = [&](const TString& str){ + LOG_ROW_DISPATCHER_DEBUG(str); + for (auto& [actorId, info] : RowDispatchers) { + LOG_ROW_DISPATCHER_DEBUG(" node " << actorId.NodeId() << " (" << actorId << ") state " << (info.State == ENodeState::Initializing ? "Initializing" : "Started") << " connected " << info.Connected << " partitions count " << info.Locations.size()); + } + }; + + printState("Current state (rebalancing):"); + + for (auto& [actorId, info] : RowDispatchers) { + if (info.State == ENodeState::Initializing) { + if (info.Connected) { + info.State = ENodeState::Started; + needRebalance = true; + } else { + toDelete.insert(actorId); + } + } else { // Started + if (!info.Connected) { + toDelete.insert(actorId); + if (!info.Locations.empty()) { + needRebalance = true; + } + } + } + } + for (const auto& actorId : toDelete) { + RowDispatchers.erase(actorId); + } + if (!needRebalance) { + return; } - return RowDispatchers.size() >= NodesCount - 1; + + for (const auto& readActorId : KnownReadActors) { + LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); + Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); + } + + for (auto& [actorId, info] : RowDispatchers) { + info.Locations.clear(); + } + PendingReadActors.clear(); + PartitionLocations.clear(); + TopicsInfo.clear(); + KnownReadActors.clear(); + + printState("Current state (after rebalancing):"); +} + +void TActorCoordinator::Handle(TEvPrivate::TEvStartingTimeout::TPtr&) { + if (State != ENodeState::Started) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by timeout)"); + State = ENodeState::Started; + } +} + +bool TActorCoordinator::IsReadyPartitionDistribution() const { + if (Config.GetLocalMode()) { + return true; + } + return State == ENodeState::Started; } void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message) { @@ -546,7 +686,28 @@ void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorReques Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie); } -} // namespace +void TActorCoordinator::ScheduleNodeInfoRequest() const { + Schedule(NodesManagerRetryPeriod, new TEvPrivate::TEvListNodes()); +} + +void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) { + NodesCount = ev->Get()->NodeIds.size(); + LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << NodesCount); + UpdateGlobalState(); + if (!NodesCount) { + ScheduleNodeInfoRequest(); + } + UpdatePendingReadActors(); +} + +void TActorCoordinator::UpdateGlobalState() { + if (State != ENodeState::Started && NodesCount && RowDispatchers.size() >= NodesCount) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by nodes count)"); + State = ENodeState::Started; + } +} + +} // anonymous namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 17474e0d2498..39bd6cf33db0 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -59,6 +59,7 @@ struct TEvRowDispatcher { EvPurecalcCompileRequest, EvPurecalcCompileResponse, EvPurecalcCompileAbort, + EvCoordinatorDistributionReset, EvEnd, }; @@ -91,6 +92,11 @@ struct TEvRowDispatcher { TEvCoordinatorResult() = default; }; + struct TEvCoordinatorDistributionReset : public NActors::TEventPB { + TEvCoordinatorDistributionReset() = default; + }; + // Session events (with seqNo checks) struct TEvStartSession : public NActors::TEventPB(readActorId, TDuration::Seconds(5)); + UNIT_ASSERT(eventPtr.Get() != nullptr); + } + void ProcessNodesManagerRequest(ui64 nodesCount) { auto eventHolder = Runtime.GrabEdgeEvent(NodesManager, TDuration::Seconds(5)); UNIT_ASSERT(eventHolder.Get() != nullptr); @@ -200,7 +206,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Y_UNIT_TEST_F(WaitNodesConnected, TFixture) { ExpectCoordinatorChangesSubscribe(); - ProcessNodesManagerRequest(4); + ProcessNodesManagerRequest(3); Ping(RowDispatcher1Id); MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); @@ -229,6 +235,52 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId()); UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId()); } + + Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(1); + TSet rowDispatcherIds{LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2); + + Ping(RowDispatcher1Id); + ExpectDistributionReset(ReadActor1); + ExpectDistributionReset(ReadActor2); + + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT(rdActor1 != rdActor2); + } + + Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(3); + TSet rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + auto result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 3); + + auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId()); + Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event)); + + ExpectDistributionReset(ReadActor1); + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 2); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index 1fa6af603693..ff1bbda62ee9 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/core/fq/libs/shared_resources ydb/core/fq/libs/ydb ydb/core/mon + ydb/core/mind ydb/library/actors/core ydb/library/security diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 58bf7d046afe..f174f1e4979f 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -351,6 +351,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev); void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -376,6 +377,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -403,6 +405,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -909,6 +912,16 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: Send(ev->Sender, response.release(), 0, ev->Cookie); } +void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) { + if (CoordinatorActorId != ev->Sender) { + SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")"); + return; + } + SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender); + ReInit("Distribution changed"); + ScheduleProcessState(); +} + void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { auto partitionId = ev->Get()->Record.GetPartitionId(); const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 4179c74a5db4..8f5c342fa461 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -351,6 +351,13 @@ class TFixture : public TPqIoTestFixture { AssertDataWithWatermarks(expected, actual); } + void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const { + CaSetup->Execute([&](TFakeActor& actor) { + auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset(); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0)); + }); + } + public: NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings( "topic", @@ -889,6 +896,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.ReadMessages(expected); } } + + Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) { + StartSession(Settings); + MockCoordinatorDistributionReset(CoordinatorId1); + + auto req = ExpectCoordinatorRequest(CoordinatorId1); + MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcherId2, 2); + MockAck(RowDispatcherId2, 2, PartitionId1); + } } } // namespace NYql::NDq diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index c65a5dd47aea..5adab6ef36d2 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -61,11 +61,10 @@ def wait_actor_count(kikimr, activity, expected_count): count = 0 for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) - if count == expected_count: - break + if count == expected_count: + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) - pass def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): @@ -617,7 +616,7 @@ def test_start_new_query(self, kikimr, client): @yq_v1 def test_stop_start(self, kikimr, client): - self.init(client, "test_stop_start") + self.init(client, "test_stop_start", 10) sql1 = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -625,12 +624,12 @@ def test_stop_start(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));''' query_id = start_yds_query(kikimr, client, sql1) - wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) data = ['{"time": 101}', '{"time": 102}'] self.write_stream(data) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints( query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 @@ -652,7 +651,7 @@ def test_stop_start(self, kikimr, client): self.write_stream(data) expected = ['103', '104'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -667,7 +666,7 @@ def test_stop_start2(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) self.write_stream(['{"time": 101}', '{"time": 102}']) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2) stop_yds_query(client, query_id1) @@ -1233,3 +1232,34 @@ def test_json_errors(self, kikimr, client, use_binding): assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}" time.sleep(1) stop_yds_query(client, query_id) + + @yq_v1 + def test_redistribute_partition_after_timeout(self, kikimr, client): + partitions_count = 3 + self.init(client, "redistribute", partitions=partitions_count) + wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1) + + sql = Rf''' + PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' + + query_id = start_yds_query(kikimr, client, sql) + session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + message_count = 10 + expected = "hello" + for i in range(message_count): + self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + logging.debug(f"Stopping node: {session_node_index}") + kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() + + expected = "Relativitätstheorie" + for i in range(message_count): + self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 920ec10caa54..98bc27c07252 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False): return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints", expect_counters_exist=expect_counters_exist) - def wait_completed_checkpoints(self, query_id, checkpoints_count, + def wait_completed_checkpoints(self, query_id, expected, timeout=plain_or_under_sanitizer_wrapper(30, 150), expect_counters_exist=False): deadline = time.time() + timeout while True: completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist) - if completed >= checkpoints_count: + if completed >= expected: break - assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed) + assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected) time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2)) def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150), @@ -537,6 +537,7 @@ def fill_config(self, control_plane): 'max_session_used_memory': 1000000, 'without_consumer': True} fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} + fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} fq_config['row_dispatcher']['coordinator']['database'] = {} self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'], "RowDispatcher_" + self.uuid)