diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index 3bd981829c3d..9bfdba23dbfe 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped(); response->NodeIds.reserve(Peers.size()); for (const auto& info : Peers) { diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e05dd9b82edc..68c0e3619cac 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig { // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) // Request will hang up infinitely, disabled by default - uint64 TopicPartitionsLimitPerNode = 4; + uint64 TopicPartitionsLimitPerNode = 4; // deprecated + uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected. } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp index 4bfa8dff23a3..dd2fcdde46fb 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp @@ -28,6 +28,7 @@ TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NConfig : LocalMode(config.GetLocalMode()) , Database(config.GetDatabase()) , CoordinationNodePath(config.GetCoordinationNodePath()) + , RebalancingTimeout(TDuration::Seconds(config.GetRebalancingTimeoutSec())) {} TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NKikimrConfig::TStreamingQueriesConfig::TExternalStorageConfig& config) diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h index d8eadaec9843..9c733c44588f 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h @@ -54,6 +54,7 @@ class TRowDispatcherSettings { YDB_ACCESSOR(bool, LocalMode, false); YDB_ACCESSOR_MUTABLE(TExternalStorageSettings, Database, {}); YDB_ACCESSOR_DEF(TString, CoordinationNodePath); + YDB_ACCESSOR(TDuration, RebalancingTimeout, TDuration::Seconds(120)); }; enum class EConsumerMode { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 7d03ecc108bf..0be5c465b6ac 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -26,6 +26,8 @@ using NYql::TIssues; namespace { +const ui64 DefaultRebalancingTimeoutSec = 120; + //////////////////////////////////////////////////////////////////////////////// struct TCoordinatorMetrics { @@ -33,14 +35,14 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChanged = Counters->GetCounter("LeaderChanged", true); - PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode"); + KnownRowDispatchers = Counters->GetCounter("KnownRowDispatchers"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; ::NMonitoring::TDynamicCounters::TCounterPtr IsActive; - ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; + ::NMonitoring::TDynamicCounters::TCounterPtr KnownRowDispatchers; }; struct TEvPrivate { @@ -48,18 +50,22 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin, EvListNodes, + EvRebalancing, + EvStartingTimeout, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; struct TEvListNodes : public NActors::TEventLocal {}; + struct TEvRebalancing : public NActors::TEventLocal {}; + struct TEvStartingTimeout : public NActors::TEventLocal {}; }; class TActorCoordinator : public TActorBootstrapped { static constexpr ui64 PrintStatePeriodSec = 300; static constexpr ui64 PrintStateToLogSplitSize = 64000; - static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(1); + static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(10); struct TTopicKey { TString Endpoint; @@ -106,11 +112,18 @@ class TActorCoordinator : public TActorBootstrapped { } }; - struct RowDispatcherInfo { - RowDispatcherInfo(bool connected, bool isLocal) + enum class ENodeState { + Initializing, // wait timeout after connected + Started + }; + + struct TRowDispatcherInfo { + TRowDispatcherInfo(bool connected, ENodeState state, bool isLocal) : Connected(connected) + , State(state) , IsLocal(isLocal) {} bool Connected = false; + ENodeState State; bool IsLocal = false; THashSet Locations; }; @@ -182,14 +195,18 @@ class TActorCoordinator : public TActorBootstrapped { TActorId LocalRowDispatcherId; const TString LogPrefix; const TString Tenant; - TMap RowDispatchers; + TMap RowDispatchers; THashMap PartitionLocations; THashMap TopicsInfo; std::unordered_map PendingReadActors; + std::unordered_set KnownReadActors; TCoordinatorMetrics Metrics; THashSet InterconnectSessions; ui64 NodesCount = 0; NActors::TActorId NodesManagerId; + bool RebalancingScheduled = false; + ENodeState State = ENodeState::Initializing; + TDuration RebalancingTimeout; public: TActorCoordinator( @@ -211,6 +228,8 @@ class TActorCoordinator : public TActorBootstrapped { void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev); void Handle(TEvPrivate::TEvPrintState::TPtr&); void Handle(TEvPrivate::TEvListNodes::TPtr&); + void Handle(TEvPrivate::TEvRebalancing::TPtr&); + void Handle(TEvPrivate::TEvStartingTimeout::TPtr&); void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&); void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&); @@ -224,13 +243,15 @@ class TActorCoordinator : public TActorBootstrapped { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvListNodes, Handle); + hFunc(TEvPrivate::TEvRebalancing, Handle); + hFunc(TEvPrivate::TEvStartingTimeout, Handle); hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle); hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle); }) private: - void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); + void UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic); std::optional GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds); // std::nullopt if TopicPartitionsLimitPerNode reached @@ -238,9 +259,10 @@ class TActorCoordinator : public TActorBootstrapped { void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); TString GetInternalState(); - bool IsReady() const; + bool IsReadyPartitionDistribution() const; void SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message); void ScheduleNodeInfoRequest() const; + void UpdateGlobalState(); }; TActorCoordinator::TActorCoordinator( @@ -255,21 +277,27 @@ TActorCoordinator::TActorCoordinator( , Tenant(tenant) , Metrics(counters) , NodesManagerId(nodesManagerId) + , RebalancingTimeout(Config.GetRebalancingTimeout() ? Config.GetRebalancingTimeout() : TDuration::Seconds(DefaultRebalancingTimeoutSec)) { - AddRowDispatcher(localRowDispatcherId, true); + UpdateKnownRowDispatchers(localRowDispatcherId, true); } void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); ScheduleNodeInfoRequest(); + Schedule(RebalancingTimeout, new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big - LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId); + LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() + << ", NodesManagerId " << NodesManagerId + << ", rebalancing timeout " << RebalancingTimeout); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } -void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal) { +void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal) { + LOG_ROW_DISPATCHER_TRACE("UpdateKnownRowDispatchers " << actorId.ToString()); + auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; @@ -293,9 +321,23 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal UpdatePendingReadActors(); return; } + auto nodeState = State == ENodeState::Initializing ? ENodeState::Started : ENodeState::Initializing; + if (PartitionLocations.empty()) { + nodeState = ENodeState::Started; + } + + LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); + RowDispatchers.emplace(actorId, TRowDispatcherInfo{true, nodeState, isLocal}); + UpdateGlobalState(); + + if (nodeState == ENodeState::Initializing && !RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal}); UpdatePendingReadActors(); + Metrics.KnownRowDispatchers->Set(RowDispatchers.size()); } void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { @@ -313,7 +355,7 @@ void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& inte void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender); UpdateInterconnectSessions(ev->InterconnectSession); - AddRowDispatcher(ev->Sender, false); + UpdateKnownRowDispatchers(ev->Sender, false); LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender); Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery); } @@ -323,7 +365,7 @@ TString TActorCoordinator::GetInternalState() { str << "Known row dispatchers:\n"; for (const auto& [actorId, info] : RowDispatchers) { - str << " " << actorId << ", connected " << info.Connected << "\n"; + str << " " << actorId << ", state " << static_cast(info.State) << "\n"; } str << "\nLocations:\n"; @@ -361,6 +403,12 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: } Y_ENSURE(!info.IsLocal, "EvNodeDisconnected from local row dispatcher"); info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } } } @@ -378,6 +426,12 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { continue; } info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } return; } } @@ -403,7 +457,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition auto& topicInfo = GetOrCreateTopicInfo(key.Topic); - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { topicInfo.AddPendingPartition(key); return std::nullopt; } @@ -411,7 +465,7 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { - if (!info.Connected) { + if (info.State != ENodeState::Started) { continue; } if (!filteredNodeIds.empty() && !filteredNodeIds.contains(location.NodeId())) { @@ -445,6 +499,8 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { const auto& source = ev->Get()->Record.GetSource(); + KnownReadActors.insert(ev->Sender); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -509,7 +565,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC } void TActorCoordinator::UpdatePendingReadActors() { - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { return; } for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) { @@ -534,6 +590,7 @@ void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult } LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << ev->Get()->AssignedNodes.size() << ", AssignedNodes: " << JoinSeq(", ", ev->Get()->AssignedNodes)); NodesCount = ev->Get()->AssignedNodes.size(); + UpdateGlobalState(); UpdatePendingReadActors(); } @@ -547,14 +604,74 @@ void TActorCoordinator::Handle(TEvPrivate::TEvListNodes::TPtr&) { } } -bool TActorCoordinator::IsReady() const { +void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { + LOG_ROW_DISPATCHER_DEBUG("Rebalancing..."); + RebalancingScheduled = false; + + bool needRebalance = false; + TSet toDelete; + + auto printState = [&](const TString& str){ + LOG_ROW_DISPATCHER_DEBUG(str); + for (auto& [actorId, info] : RowDispatchers) { + LOG_ROW_DISPATCHER_DEBUG(" node " << actorId.NodeId() << " (" << actorId << ") state " << (info.State == ENodeState::Initializing ? "Initializing" : "Started") << " connected " << info.Connected << " partitions count " << info.Locations.size()); + } + }; + + printState("Current state (rebalancing):"); + + for (auto& [actorId, info] : RowDispatchers) { + if (info.State == ENodeState::Initializing) { + if (info.Connected) { + info.State = ENodeState::Started; + needRebalance = true; + } else { + toDelete.insert(actorId); + } + } else { // Started + if (!info.Connected) { + toDelete.insert(actorId); + if (!info.Locations.empty()) { + needRebalance = true; + } + } + } + } + for (const auto& actorId : toDelete) { + RowDispatchers.erase(actorId); + } + if (!needRebalance) { + return; + } + + for (const auto& readActorId : KnownReadActors) { + LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); + Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); + } + + for (auto& [actorId, info] : RowDispatchers) { + info.Locations.clear(); + } + PendingReadActors.clear(); + PartitionLocations.clear(); + TopicsInfo.clear(); + KnownReadActors.clear(); + + printState("Current state (after rebalancing):"); +} + +void TActorCoordinator::Handle(TEvPrivate::TEvStartingTimeout::TPtr&) { + if (State != ENodeState::Started) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by timeout)"); + State = ENodeState::Started; + } +} + +bool TActorCoordinator::IsReadyPartitionDistribution() const { if (Config.GetLocalMode()) { return true; } - if (!NodesCount) { - return false; - } - return RowDispatchers.size() >= NodesCount - 1; + return State == ENodeState::Started; } void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message) { @@ -571,12 +688,19 @@ void TActorCoordinator::ScheduleNodeInfoRequest() const { void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) { NodesCount = ev->Get()->NodeIds.size(); LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << NodesCount); + UpdateGlobalState(); if (!NodesCount) { ScheduleNodeInfoRequest(); } UpdatePendingReadActors(); } +void TActorCoordinator::UpdateGlobalState() { + if (State != ENodeState::Started && NodesCount && RowDispatchers.size() >= NodesCount) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by nodes count)"); + State = ENodeState::Started; + } +} } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 17474e0d2498..39bd6cf33db0 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -59,6 +59,7 @@ struct TEvRowDispatcher { EvPurecalcCompileRequest, EvPurecalcCompileResponse, EvPurecalcCompileAbort, + EvCoordinatorDistributionReset, EvEnd, }; @@ -91,6 +92,11 @@ struct TEvRowDispatcher { TEvCoordinatorResult() = default; }; + struct TEvCoordinatorDistributionReset : public NActors::TEventPB { + TEvCoordinatorDistributionReset() = default; + }; + // Session events (with seqNo checks) struct TEvStartSession : public NActors::TEventPB(readActorId, TDuration::Seconds(5)); + UNIT_ASSERT(eventPtr.Get() != nullptr); + } + void ProcessNodesManagerRequest(ui64 nodesCount) { TVector nodes; nodes.reserve(nodesCount); @@ -195,7 +201,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Y_UNIT_TEST_F(WaitNodesConnected, TFixture) { ExpectCoordinatorChangesSubscribe(); - ProcessNodesManagerRequest(4); + ProcessNodesManagerRequest(3); Ping(RowDispatcher1Id); MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); @@ -224,6 +230,52 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId()); UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId()); } + + Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(1); + TSet rowDispatcherIds{LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2); + + Ping(RowDispatcher1Id); + ExpectDistributionReset(ReadActor1); + ExpectDistributionReset(ReadActor2); + + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT(rdActor1 != rdActor2); + } + + Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(3); + TSet rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + auto result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 3); + + auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId()); + Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event)); + + ExpectDistributionReset(ReadActor1); + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 2); + } } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index a845a9bff68a..52462acc3ac3 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -352,6 +352,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev); void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -377,6 +378,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -404,6 +406,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -916,6 +919,16 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: Send(ev->Sender, response.release(), 0, ev->Cookie); } +void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) { + if (CoordinatorActorId != ev->Sender) { + SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")"); + return; + } + SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender); + ReInit("Distribution changed"); + ScheduleProcessState(); +} + void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { auto partitionId = ev->Get()->Record.GetPartitionId(); const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index ba991a76c8a7..757ff49e9821 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -350,6 +350,13 @@ class TFixture : public TPqIoTestFixture { AssertDataWithWatermarks(expected, actual); } + void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const { + CaSetup->Execute([&](TFakeActor& actor) { + auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset(); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0)); + }); + } + public: NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings( "topic", @@ -888,6 +895,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.ReadMessages(expected); } } + + Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) { + StartSession(Settings); + MockCoordinatorDistributionReset(CoordinatorId1); + + auto req = ExpectCoordinatorRequest(CoordinatorId1); + MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcherId2, 2); + MockAck(RowDispatcherId2, 2, PartitionId1); + } } } // namespace NYql::NDq diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 43869c3e5a61..77987adb85f9 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -61,11 +61,10 @@ def wait_actor_count(kikimr, activity, expected_count): count = 0 for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) - if count == expected_count: - break + if count == expected_count: + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) - pass def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): @@ -617,7 +616,7 @@ def test_start_new_query(self, kikimr, client): @yq_v1 def test_stop_start(self, kikimr, client): - self.init(client, "test_stop_start") + self.init(client, "test_stop_start", 10) sql1 = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -625,12 +624,12 @@ def test_stop_start(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));''' query_id = start_yds_query(kikimr, client, sql1) - wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) data = ['{"time": 101}', '{"time": 102}'] self.write_stream(data) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints( query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 @@ -652,7 +651,7 @@ def test_stop_start(self, kikimr, client): self.write_stream(data) expected = ['103', '104'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -667,7 +666,7 @@ def test_stop_start2(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) self.write_stream(['{"time": 101}', '{"time": 102}']) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2) stop_yds_query(client, query_id1) @@ -1282,3 +1281,34 @@ def test_delete_topic(self, kikimr, client): self.write_stream(data) expected = ['104'] assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + @yq_v1 + def test_redistribute_partition_after_timeout(self, kikimr, client): + partitions_count = 3 + self.init(client, "redistribute", partitions=partitions_count) + wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1) + + sql = Rf''' + PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' + + query_id = start_yds_query(kikimr, client, sql) + session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + message_count = 10 + expected = "hello" + for i in range(message_count): + self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + logging.debug(f"Stopping node: {session_node_index}") + kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() + + expected = "Relativitätstheorie" + for i in range(message_count): + self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 920ec10caa54..98bc27c07252 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False): return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints", expect_counters_exist=expect_counters_exist) - def wait_completed_checkpoints(self, query_id, checkpoints_count, + def wait_completed_checkpoints(self, query_id, expected, timeout=plain_or_under_sanitizer_wrapper(30, 150), expect_counters_exist=False): deadline = time.time() + timeout while True: completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist) - if completed >= checkpoints_count: + if completed >= expected: break - assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed) + assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected) time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2)) def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150), @@ -537,6 +537,7 @@ def fill_config(self, control_plane): 'max_session_used_memory': 1000000, 'without_consumer': True} fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} + fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} fq_config['row_dispatcher']['coordinator']['database'] = {} self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'], "RowDispatcher_" + self.uuid)