From f2286b859bc4451bfebd361bf7ba4b2284bd9913 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Sat, 1 Nov 2025 13:35:07 +0000 Subject: [PATCH 01/15] add test --- ydb/core/fq/libs/actors/nodes_manager.cpp | 6 +- .../fq/libs/row_dispatcher/coordinator.cpp | 2 +- ydb/tests/fq/yds/test_row_dispatcher.py | 55 ++++++++++++++++--- ydb/tests/tools/fq_runner/kikimr_runner.py | 6 +- 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index 3bd981829c3d..ca03bc8dba93 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped(); response->NodeIds.reserve(Peers.size()); for (const auto& info : Peers) { @@ -347,6 +347,8 @@ class TNodesManagerActor : public NActors::TActorBootstrappedGet()->Status.GetStatus(); if (!ev->Get()->Status.IsSuccess()) { @@ -356,6 +358,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped>(); nodesInfo->reserve(res.nodes().size()); + LOG_T("Nodes count " << res.nodes().size()); + Peers.clear(); std::set nodeIds; // may be not unique diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 7d03ecc108bf..7dcb68e0fc17 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -59,7 +59,7 @@ class TActorCoordinator : public TActorBootstrapped { static constexpr ui64 PrintStatePeriodSec = 300; static constexpr ui64 PrintStateToLogSplitSize = 64000; - static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(1); + static constexpr TDuration NodesManagerRetryPeriod = TDuration::Seconds(10); struct TTopicKey { TString Endpoint; diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index d60f91606e1a..e841e07f9b50 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -6,6 +6,8 @@ import logging import time import json +import random +import string from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase @@ -72,11 +74,12 @@ def wait_actor_count(kikimr, activity, expected_count): count = 0 for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) - if count == expected_count: - break + if count == expected_count: + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) - pass + return None + def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): @@ -627,8 +630,8 @@ def test_start_new_query(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @yq_v1 - def test_stop_start(self, kikimr, client): - self.init(client, "test_stop_start") + def test_stop_start777(self, kikimr, client): + self.init(client, "test_stop_start", 10) sql1 = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -636,12 +639,12 @@ def test_stop_start(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));''' query_id = start_yds_query(kikimr, client, sql1) - wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + #wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) data = ['{"time": 101}', '{"time": 102}'] self.write_stream(data) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints( query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2 @@ -663,7 +666,7 @@ def test_stop_start(self, kikimr, client): self.write_stream(data) expected = ['103', '104'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @@ -678,7 +681,7 @@ def test_stop_start2(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) self.write_stream(['{"time": 101}', '{"time": 102}']) expected = ['101', '102'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected) kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2) stop_yds_query(client, query_id1) @@ -1226,3 +1229,37 @@ def test_json_errors(self, kikimr, client): assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}" time.sleep(1) stop_yds_query(client, query_id) + + @yq_v1 + def test_redistribute_partition_after_timeout(self, kikimr, client): + self.init(client, "redistribute", partitions=10) + sql = Rf''' + --PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' + + query_id = start_yds_query(kikimr, client, sql) + session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) + logging.debug(f"Topic session node: {session_node_index}") + + message_count = 20 + for i in range(message_count): + self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) + self.read_stream(message_count, topic_path=self.output_topic) + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) + + data = ['{"time": 101, "data": "hello"}'] + self.write_stream(data) + expected = ['hello'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + logging.debug(f"Stopping node: {session_node_index}") + kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() + + #time.sleep(10) + + data = ['{"time": 101, "data": "Relativitätstheorie"}'] + self.write_stream(data) + expected = ['Relativitätstheorie'] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected \ No newline at end of file diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 920ec10caa54..9214ad3cbad0 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False): return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints", expect_counters_exist=expect_counters_exist) - def wait_completed_checkpoints(self, query_id, checkpoints_count, + def wait_completed_checkpoints(self, query_id, expected, timeout=plain_or_under_sanitizer_wrapper(30, 150), expect_counters_exist=False): deadline = time.time() + timeout while True: completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist) - if completed >= checkpoints_count: + if completed >= expected: break - assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed) + assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected" + str(expected) time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2)) def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150), From 84be8c69e984932275bed291bd784340e2d0f0d9 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 5 Nov 2025 11:22:14 +0000 Subject: [PATCH 02/15] add coordinator tests --- .../libs/config/protos/row_dispatcher.proto | 3 +- .../common/row_dispatcher_settings.cpp | 1 + .../common/row_dispatcher_settings.h | 1 + .../fq/libs/row_dispatcher/coordinator.cpp | 97 ++++++++++++++++--- .../libs/row_dispatcher/events/data_plane.h | 6 ++ .../libs/row_dispatcher/protos/events.proto | 3 + .../libs/row_dispatcher/ut/coordinator_ut.cpp | 42 +++++++- .../pq/async_io/dq_pq_rd_read_actor.cpp | 8 ++ .../pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 17 ++++ ydb/tests/fq/yds/test_row_dispatcher.py | 21 ++-- 10 files changed, 174 insertions(+), 25 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e1878cd8a88d..79366e27b847 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig { // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) // Request will hang up infinitely, disabled by default - uint64 TopicPartitionsLimitPerNode = 4; + uint64 TopicPartitionsLimitPerNode = 4; // deprecated + uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected. } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp index de1ec4ddd0e7..672fa9864859 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp @@ -29,6 +29,7 @@ TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NConfig : LocalMode(config.GetLocalMode()) , Database(config.GetDatabase()) , CoordinationNodePath(config.GetCoordinationNodePath()) + , RebalancingTimeout(TDuration::Seconds(config.GetRebalancingTimeoutSec())) {} TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NKikimrConfig::TStreamingQueriesConfig::TExternalStorageConfig& config) diff --git a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h index 56367dfcc424..4d435be42d3b 100644 --- a/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h +++ b/ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h @@ -55,6 +55,7 @@ class TRowDispatcherSettings { YDB_ACCESSOR(bool, LocalMode, false); YDB_ACCESSOR_MUTABLE(TExternalStorageSettings, Database, {}); YDB_ACCESSOR_DEF(TString, CoordinationNodePath); + YDB_ACCESSOR(TDuration, RebalancingTimeout, TDuration::Seconds(120)); }; enum class EConsumerMode { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 7dcb68e0fc17..63eba1a4f410 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -48,11 +48,13 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin, EvListNodes, + EvRebalancing, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; struct TEvListNodes : public NActors::TEventLocal {}; + struct TEvRebalancing : public NActors::TEventLocal {}; }; class TActorCoordinator : public TActorBootstrapped { @@ -106,12 +108,19 @@ class TActorCoordinator : public TActorBootstrapped { } }; + enum class ENodeState { + Initializing, // wait timeout after connected + Started + }; + struct RowDispatcherInfo { - RowDispatcherInfo(bool connected, bool isLocal) + RowDispatcherInfo(bool connected, ENodeState state, bool isLocal) : Connected(connected) + , State(state) , IsLocal(isLocal) {} - bool Connected = false; - bool IsLocal = false; + bool Connected = false; + ENodeState State; + bool IsLocal = false; THashSet Locations; }; @@ -186,10 +195,12 @@ class TActorCoordinator : public TActorBootstrapped { THashMap PartitionLocations; THashMap TopicsInfo; std::unordered_map PendingReadActors; + std::unordered_set KnownReadActors; TCoordinatorMetrics Metrics; THashSet InterconnectSessions; ui64 NodesCount = 0; NActors::TActorId NodesManagerId; + bool RebalancingScheduled = false; public: TActorCoordinator( @@ -211,6 +222,7 @@ class TActorCoordinator : public TActorBootstrapped { void Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev); void Handle(TEvPrivate::TEvPrintState::TPtr&); void Handle(TEvPrivate::TEvListNodes::TPtr&); + void Handle(TEvPrivate::TEvRebalancing::TPtr&); void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&); void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&); @@ -224,6 +236,7 @@ class TActorCoordinator : public TActorBootstrapped { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorRequest, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvListNodes, Handle); + hFunc(TEvPrivate::TEvRebalancing, Handle); hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle); hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle); }) @@ -238,7 +251,7 @@ class TActorCoordinator : public TActorBootstrapped { void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); TString GetInternalState(); - bool IsReady() const; + bool IsReadyPartitionDistribution() const; void SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message); void ScheduleNodeInfoRequest() const; }; @@ -273,6 +286,7 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; + //it->second.State = ENodeState::Connected; UpdatePendingReadActors(); return; } @@ -287,14 +301,19 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal PartitionLocations[key] = actorId; } info.Connected = true; + // info.State = ENodeState::Connected; auto node = RowDispatchers.extract(oldActorId); node.key() = actorId; RowDispatchers.insert(std::move(node)); UpdatePendingReadActors(); return; } - - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal}); + auto state = ENodeState::Initializing; + if (!IsReadyPartitionDistribution()) { + state = ENodeState::Started; // + } + LOG_ROW_DISPATCHER_TRACE("Add new rd to map (state " << static_cast(state)); + RowDispatchers.emplace(actorId, RowDispatcherInfo{true, state, isLocal}); UpdatePendingReadActors(); } @@ -316,6 +335,14 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) { AddRowDispatcher(ev->Sender, false); LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender); Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery); + + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + + } } TString TActorCoordinator::GetInternalState() { @@ -323,7 +350,7 @@ TString TActorCoordinator::GetInternalState() { str << "Known row dispatchers:\n"; for (const auto& [actorId, info] : RowDispatchers) { - str << " " << actorId << ", connected " << info.Connected << "\n"; + str << " " << actorId << ", state " << static_cast(info.State) << "\n"; } str << "\nLocations:\n"; @@ -361,6 +388,9 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: } Y_ENSURE(!info.IsLocal, "EvNodeDisconnected from local row dispatcher"); info.Connected = false; + // if (info.State == ENodeState::Connected) { + // info.State = ENodeState::Disconnected; + // } } } @@ -378,6 +408,9 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { continue; } info.Connected = false; + // if (info.State == ENodeState::Connected) { + // info.State = ENodeState::Disconnected; + // } return; } } @@ -399,11 +432,12 @@ TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TTo } std::optional TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds) { + LOG_ROW_DISPATCHER_INFO("GetAndUpdateLocation"); Y_ENSURE(!PartitionLocations.contains(key)); auto& topicInfo = GetOrCreateTopicInfo(key.Topic); - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { topicInfo.AddPendingPartition(key); return std::nullopt; } @@ -411,10 +445,13 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { - if (!info.Connected) { + LOG_ROW_DISPATCHER_INFO(" rd " << location); + if (info.State != ENodeState::Started) { + LOG_ROW_DISPATCHER_INFO(" State not connected"); continue; } if (!filteredNodeIds.empty() && !filteredNodeIds.contains(location.NodeId())) { + LOG_ROW_DISPATCHER_INFO(" filteredNodeIds"); continue; } ui64 numberPartitions = 0; @@ -428,6 +465,8 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition } } if (!bestLocation) { + LOG_ROW_DISPATCHER_INFO("Not found location"); + return std::nullopt; } @@ -445,6 +484,8 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { const auto& source = ev->Get()->Record.GetSource(); + KnownReadActors.insert(ev->Sender); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -509,7 +550,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC } void TActorCoordinator::UpdatePendingReadActors() { - if (!IsReady()) { + if (!IsReadyPartitionDistribution()) { return; } for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) { @@ -547,14 +588,46 @@ void TActorCoordinator::Handle(TEvPrivate::TEvListNodes::TPtr&) { } } -bool TActorCoordinator::IsReady() const { +void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { + LOG_ROW_DISPATCHER_DEBUG("Rebalancing..."); + RebalancingScheduled = false; + + bool isUpdated = false; + for (auto& [actorId, info] : RowDispatchers) { + if (info.State != ENodeState::Initializing) { + continue; + } + LOG_ROW_DISPATCHER_DEBUG("Move rd (actorId " << actorId << ") to Started state"); + if (info.Connected) { + info.State = ENodeState::Started; + isUpdated = true; + } else { + // Schedue + } + } + if (isUpdated) { + for (const auto& readActorId : KnownReadActors) { + LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); + Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); + } + + PendingReadActors.clear(); + PartitionLocations.clear(); + TopicsInfo.clear(); + KnownReadActors.clear(); + + //UpdatePendingReadActors(); + } +} + +bool TActorCoordinator::IsReadyPartitionDistribution() const { if (Config.GetLocalMode()) { return true; } if (!NodesCount) { return false; } - return RowDispatchers.size() >= NodesCount - 1; + return RowDispatchers.size() >= NodesCount;// TOOD add timeout } void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message) { diff --git a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h index 17474e0d2498..39bd6cf33db0 100644 --- a/ydb/core/fq/libs/row_dispatcher/events/data_plane.h +++ b/ydb/core/fq/libs/row_dispatcher/events/data_plane.h @@ -59,6 +59,7 @@ struct TEvRowDispatcher { EvPurecalcCompileRequest, EvPurecalcCompileResponse, EvPurecalcCompileAbort, + EvCoordinatorDistributionReset, EvEnd, }; @@ -91,6 +92,11 @@ struct TEvRowDispatcher { TEvCoordinatorResult() = default; }; + struct TEvCoordinatorDistributionReset : public NActors::TEventPB { + TEvCoordinatorDistributionReset() = default; + }; + // Session events (with seqNo checks) struct TEvStartSession : public NActors::TEventPB(readActorId, TDuration::Seconds(5)); + UNIT_ASSERT(eventPtr.Get() != nullptr); + } + void ProcessNodesManagerRequest(ui64 nodesCount) { TVector nodes; nodes.reserve(nodesCount); @@ -195,7 +201,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Y_UNIT_TEST_F(WaitNodesConnected, TFixture) { ExpectCoordinatorChangesSubscribe(); - ProcessNodesManagerRequest(4); + ProcessNodesManagerRequest(3); Ping(RowDispatcher1Id); MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); @@ -224,6 +230,40 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId()); UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId()); } + + Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(1); + TSet rowDispatcherIds{LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2); + + Ping(RowDispatcher1Id); + ExpectDistributionReset(ReadActor1); + ExpectDistributionReset(ReadActor2); + + MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0}); + rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId()); + MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1}); + rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId()); + UNIT_ASSERT(rdActor1 != rdActor2); + } + + Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) { + ExpectCoordinatorChangesSubscribe(); + ProcessNodesManagerRequest(2); + TSet rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; + for (auto id : rowDispatcherIds) { + Ping(id); + } + + } } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 36650b42bc52..8e3acf1e79d7 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -352,6 +352,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev); void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev); + void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev); void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -377,6 +378,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle); hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -404,6 +406,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession); hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession); + hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle); hFunc(NActors::TEvents::TEvPong, Handle); hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected); @@ -914,6 +917,11 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: Send(ev->Sender, response.release(), 0, ev->Cookie); } +void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) { + SRC_LOG_T("Received TEvCoordinatorDistributionReset from " << ev->Sender); + // check coordinaotor is active +} + void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { auto partitionId = ev->Get()->Record.GetPartitionId(); const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index ba991a76c8a7..9c02ffbc576b 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -350,6 +350,13 @@ class TFixture : public TPqIoTestFixture { AssertDataWithWatermarks(expected, actual); } + void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const { + CaSetup->Execute([&](TFakeActor& actor) { + auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset(); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0, cookie)); + }); + } + public: NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings( "topic", @@ -888,6 +895,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { f.ReadMessages(expected); } } + + Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) { + StartSession(Settings); + MockCoordinatorDistributionReset(CoordinatorId1); + + auto req = ExpectCoordinatorRequest(CoordinatorId1); + MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId1, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcherId1, 1); + MockAck(RowDispatcherId1, 1, PartitionId1); + } } } // namespace NYql::NDq diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index e841e07f9b50..391115d658c6 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1244,22 +1244,21 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): logging.debug(f"Topic session node: {session_node_index}") message_count = 20 + expected = "hello" for i in range(message_count): self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) - self.read_stream(message_count, topic_path=self.output_topic) + self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) - data = ['{"time": 101, "data": "hello"}'] - self.write_stream(data) - expected = ['hello'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + # data = ['{"time": 101, "data": "hello"}'] + # self.write_stream(data) + # expected = ['hello'] + # assert self.read_stream(len(expected), topic_path=self.output_topic) == expected logging.debug(f"Stopping node: {session_node_index}") kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() - #time.sleep(10) - - data = ['{"time": 101, "data": "Relativitätstheorie"}'] - self.write_stream(data) - expected = ['Relativitätstheorie'] - assert self.read_stream(len(expected), topic_path=self.output_topic) == expected \ No newline at end of file + expected = "Relativitätstheorie" + for i in range(message_count): + self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count \ No newline at end of file From 1b90ce170e7862685122cd7e86f1b92f42f00236 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 5 Nov 2025 14:59:37 +0000 Subject: [PATCH 03/15] update test --- ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp index 6c5d46f43d09..c43dfb4684e4 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp @@ -263,6 +263,17 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Ping(id); } + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + auto result1 = ExpectResult(ReadActor1); + UNIT_ASSERT(result1.PartitionsSize() == 3); + + auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId()); + Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event)); + + ExpectDistributionReset(ReadActor1); + + MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + UNIT_ASSERT(result1.PartitionsSize() == 2); } } From b3f695d9d734e71ec5273ed516bfb3192d97101e Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 6 Nov 2025 18:45:59 +0000 Subject: [PATCH 04/15] wip --- .../fq/libs/row_dispatcher/coordinator.cpp | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 63eba1a4f410..22180a9696e4 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -388,6 +388,12 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: } Y_ENSURE(!info.IsLocal, "EvNodeDisconnected from local row dispatcher"); info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } // if (info.State == ENodeState::Connected) { // info.State = ENodeState::Disconnected; // } @@ -408,6 +414,12 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { continue; } info.Connected = false; + + if (!RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; + } // if (info.State == ENodeState::Connected) { // info.State = ENodeState::Disconnected; // } @@ -593,18 +605,22 @@ void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { RebalancingScheduled = false; bool isUpdated = false; + TSet toDelete; + for (auto& [actorId, info] : RowDispatchers) { - if (info.State != ENodeState::Initializing) { - continue; - } LOG_ROW_DISPATCHER_DEBUG("Move rd (actorId " << actorId << ") to Started state"); - if (info.Connected) { + if (info.State == ENodeState::Initializing && info.Connected) { info.State = ENodeState::Started; isUpdated = true; - } else { + } else (info.State == ENodeState::Started && !info.Connected) { // Schedue + toDelete.insert(actorId); + isUpdated = true; } } + for (const auto& actorId : toDelete) { + RowDispatchers.erase(actorId); + } if (isUpdated) { for (const auto& readActorId : KnownReadActors) { LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); @@ -615,9 +631,9 @@ void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { PartitionLocations.clear(); TopicsInfo.clear(); KnownReadActors.clear(); - - //UpdatePendingReadActors(); } + + } bool TActorCoordinator::IsReadyPartitionDistribution() const { From d5c64f57bcf6c430a694a3c7b40fc7dd340d5bd4 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 7 Nov 2025 13:41:36 +0000 Subject: [PATCH 05/15] fix coordinator tests --- .../fq/libs/row_dispatcher/coordinator.cpp | 126 +++++++++++------- .../libs/row_dispatcher/ut/coordinator_ut.cpp | 3 +- 2 files changed, 83 insertions(+), 46 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 22180a9696e4..5cc4dbfa7a1f 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -49,12 +49,14 @@ struct TEvPrivate { EvPrintState = EvBegin, EvListNodes, EvRebalancing, + EvStartingTimeout, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; struct TEvListNodes : public NActors::TEventLocal {}; struct TEvRebalancing : public NActors::TEventLocal {}; + struct TEvStartingTimeout : public NActors::TEventLocal {}; }; class TActorCoordinator : public TActorBootstrapped { @@ -118,9 +120,9 @@ class TActorCoordinator : public TActorBootstrapped { : Connected(connected) , State(state) , IsLocal(isLocal) {} - bool Connected = false; - ENodeState State; - bool IsLocal = false; + bool Connected = false; + ENodeState State; + bool IsLocal = false; THashSet Locations; }; @@ -201,6 +203,7 @@ class TActorCoordinator : public TActorBootstrapped { ui64 NodesCount = 0; NActors::TActorId NodesManagerId; bool RebalancingScheduled = false; + ENodeState State = ENodeState::Initializing; public: TActorCoordinator( @@ -223,6 +226,7 @@ class TActorCoordinator : public TActorBootstrapped { void Handle(TEvPrivate::TEvPrintState::TPtr&); void Handle(TEvPrivate::TEvListNodes::TPtr&); void Handle(TEvPrivate::TEvRebalancing::TPtr&); + void Handle(TEvPrivate::TEvStartingTimeout::TPtr&); void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&); void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&); @@ -237,13 +241,14 @@ class TActorCoordinator : public TActorBootstrapped { hFunc(TEvPrivate::TEvPrintState, Handle); hFunc(TEvPrivate::TEvListNodes, Handle); hFunc(TEvPrivate::TEvRebalancing, Handle); + hFunc(TEvPrivate::TEvStartingTimeout, Handle); hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle); hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle); }) private: - void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); + void UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic); std::optional GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds); // std::nullopt if TopicPartitionsLimitPerNode reached @@ -254,6 +259,7 @@ class TActorCoordinator : public TActorBootstrapped { bool IsReadyPartitionDistribution() const; void SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message); void ScheduleNodeInfoRequest() const; + void UpdateGlobalState(); }; TActorCoordinator::TActorCoordinator( @@ -269,24 +275,26 @@ TActorCoordinator::TActorCoordinator( , Metrics(counters) , NodesManagerId(nodesManagerId) { - AddRowDispatcher(localRowDispatcherId, true); + UpdateKnownRowDispatchers(localRowDispatcherId, true); } void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); ScheduleNodeInfoRequest(); + Schedule(TDuration::Seconds(1), new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } -void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal) { +void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, bool isLocal) { + LOG_ROW_DISPATCHER_TRACE("UpdateKnownRowDispatchers " << actorId.ToString()); + auto it = RowDispatchers.find(actorId); if (it != RowDispatchers.end()) { it->second.Connected = true; - //it->second.State = ENodeState::Connected; UpdatePendingReadActors(); return; } @@ -301,19 +309,24 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal PartitionLocations[key] = actorId; } info.Connected = true; - // info.State = ENodeState::Connected; auto node = RowDispatchers.extract(oldActorId); node.key() = actorId; RowDispatchers.insert(std::move(node)); UpdatePendingReadActors(); return; } - auto state = ENodeState::Initializing; - if (!IsReadyPartitionDistribution()) { - state = ENodeState::Started; // + auto nodeState = State == ENodeState::Initializing ? ENodeState::Started : ENodeState::Initializing; + + LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); + RowDispatchers.emplace(actorId, RowDispatcherInfo{true, nodeState, isLocal}); + UpdateGlobalState(); + + if (nodeState == ENodeState::Initializing && !RebalancingScheduled) { + LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); + Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + RebalancingScheduled = true; } - LOG_ROW_DISPATCHER_TRACE("Add new rd to map (state " << static_cast(state)); - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, state, isLocal}); + UpdatePendingReadActors(); } @@ -332,17 +345,9 @@ void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& inte void TActorCoordinator::Handle(NActors::TEvents::TEvPing::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvPing received, " << ev->Sender); UpdateInterconnectSessions(ev->InterconnectSession); - AddRowDispatcher(ev->Sender, false); + UpdateKnownRowDispatchers(ev->Sender, false); LOG_ROW_DISPATCHER_TRACE("Send TEvPong to " << ev->Sender); Send(ev->Sender, new NActors::TEvents::TEvPong(), IEventHandle::FlagTrackDelivery); - - - if (!RebalancingScheduled) { - LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); - Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); - RebalancingScheduled = true; - - } } TString TActorCoordinator::GetInternalState() { @@ -457,7 +462,6 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition TActorId bestLocation; ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { - LOG_ROW_DISPATCHER_INFO(" rd " << location); if (info.State != ENodeState::Started) { LOG_ROW_DISPATCHER_INFO(" State not connected"); continue; @@ -587,6 +591,7 @@ void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult } LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << ev->Get()->AssignedNodes.size() << ", AssignedNodes: " << JoinSeq(", ", ev->Get()->AssignedNodes)); NodesCount = ev->Get()->AssignedNodes.size(); + UpdateGlobalState(); UpdatePendingReadActors(); } @@ -604,46 +609,70 @@ void TActorCoordinator::Handle(TEvPrivate::TEvRebalancing::TPtr&) { LOG_ROW_DISPATCHER_DEBUG("Rebalancing..."); RebalancingScheduled = false; - bool isUpdated = false; + bool needRebalance = false; TSet toDelete; + auto printState = [&](const TString& str){ + LOG_ROW_DISPATCHER_DEBUG(str); + for (auto& [actorId, info] : RowDispatchers) { + LOG_ROW_DISPATCHER_DEBUG(" node " << actorId.NodeId() << " (" << actorId << ") state " << (info.State == ENodeState::Initializing ? "Initializing" : "Started") << " connected " << info.Connected << " partitions count " << info.Locations.size()); + } + }; + + printState("Current state (rebalancing):"); + for (auto& [actorId, info] : RowDispatchers) { - LOG_ROW_DISPATCHER_DEBUG("Move rd (actorId " << actorId << ") to Started state"); - if (info.State == ENodeState::Initializing && info.Connected) { - info.State = ENodeState::Started; - isUpdated = true; - } else (info.State == ENodeState::Started && !info.Connected) { - // Schedue - toDelete.insert(actorId); - isUpdated = true; + if (info.State == ENodeState::Initializing) { + if (info.Connected) { + info.State = ENodeState::Started; + needRebalance = true; + } else { + toDelete.insert(actorId); + } + } else { // Started + if (!info.Connected) { + toDelete.insert(actorId); + if (!info.Locations.empty()) { + needRebalance = true; + } + } } } for (const auto& actorId : toDelete) { RowDispatchers.erase(actorId); } - if (isUpdated) { - for (const auto& readActorId : KnownReadActors) { - LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); - Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); - } + if (!needRebalance) { + return; + } - PendingReadActors.clear(); - PartitionLocations.clear(); - TopicsInfo.clear(); - KnownReadActors.clear(); + for (const auto& readActorId : KnownReadActors) { + LOG_ROW_DISPATCHER_TRACE("Send TEvCoordinatorDistributionReset to " << readActorId); + Send(readActorId, new TEvRowDispatcher::TEvCoordinatorDistributionReset(), IEventHandle::FlagTrackDelivery); } + for (auto& [actorId, info] : RowDispatchers) { + info.Locations.clear(); + } + PendingReadActors.clear(); + PartitionLocations.clear(); + TopicsInfo.clear(); + KnownReadActors.clear(); + printState("Current state (after rebalancing):"); +} + +void TActorCoordinator::Handle(TEvPrivate::TEvStartingTimeout::TPtr&) { + if (State != ENodeState::Started) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by timeout)"); + State = ENodeState::Started; + } } bool TActorCoordinator::IsReadyPartitionDistribution() const { if (Config.GetLocalMode()) { return true; } - if (!NodesCount) { - return false; - } - return RowDispatchers.size() >= NodesCount;// TOOD add timeout + return State == ENodeState::Started; } void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorRequest& request, const TString& message) { @@ -660,12 +689,19 @@ void TActorCoordinator::ScheduleNodeInfoRequest() const { void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) { NodesCount = ev->Get()->NodeIds.size(); LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << NodesCount); + UpdateGlobalState(); if (!NodesCount) { ScheduleNodeInfoRequest(); } UpdatePendingReadActors(); } +void TActorCoordinator::UpdateGlobalState() { + if (State != ENodeState::Started && NodesCount && RowDispatchers.size() >= NodesCount) { + LOG_ROW_DISPATCHER_TRACE("Change global state to Started (by nodes count)"); + State = ENodeState::Started; + } +} } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp index c43dfb4684e4..a567e17a07b2 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp @@ -257,7 +257,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) { ExpectCoordinatorChangesSubscribe(); - ProcessNodesManagerRequest(2); + ProcessNodesManagerRequest(3); TSet rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId}; for (auto id : rowDispatcherIds) { Ping(id); @@ -273,6 +273,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) { ExpectDistributionReset(ReadActor1); MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2}); + result1 = ExpectResult(ReadActor1); UNIT_ASSERT(result1.PartitionsSize() == 2); } } From 355576cb6ffdd09ebefe500c36139ab6fc0c465b Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 7 Nov 2025 15:12:28 +0000 Subject: [PATCH 06/15] try to fix test --- .../yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | 9 +++++++-- ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 8e3acf1e79d7..ac25470c70c4 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -918,8 +918,13 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest: } void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) { - SRC_LOG_T("Received TEvCoordinatorDistributionReset from " << ev->Sender); - // check coordinaotor is active + if (CoordinatorActorId != ev->Sender) { + SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")"); + return; + } + SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender); + ReInit("Distribution changed"); + ScheduleProcessState(); } void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) { diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 9c02ffbc576b..757ff49e9821 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -353,7 +353,7 @@ class TFixture : public TPqIoTestFixture { void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const { CaSetup->Execute([&](TFakeActor& actor) { auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset(); - CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0, cookie)); + CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0)); }); } @@ -901,9 +901,9 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { MockCoordinatorDistributionReset(CoordinatorId1); auto req = ExpectCoordinatorRequest(CoordinatorId1); - MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId1, PartitionId1}}, req->Cookie); - ExpectStartSession({}, RowDispatcherId1, 1); - MockAck(RowDispatcherId1, 1, PartitionId1); + MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie); + ExpectStartSession({}, RowDispatcherId2, 2); + MockAck(RowDispatcherId2, 2, PartitionId1); } } From 87a453a86f2addecb403398fbe3fe13af35b6dbe Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Fri, 7 Nov 2025 19:12:06 +0000 Subject: [PATCH 07/15] add timeout to config --- .../fq/libs/row_dispatcher/coordinator.cpp | 6 +++-- ydb/tests/fq/yds/test_row_dispatcher.py | 27 ++++++++----------- ydb/tests/tools/fq_runner/kikimr_runner.py | 1 + 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 5cc4dbfa7a1f..e76869f108e2 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -282,9 +282,11 @@ void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); ScheduleNodeInfoRequest(); - Schedule(TDuration::Seconds(1), new TEvPrivate::TEvStartingTimeout()); + Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big - LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId); + LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() + << ", NodesManagerId " << NodesManagerId + << ", rebalancing timeout " << Config.GetRebalancingTimeout()); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 391115d658c6..e9fd0e52edaa 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -75,13 +75,12 @@ def wait_actor_count(kikimr, activity, expected_count): for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) if count == expected_count: - return node_index # return any node + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) return None - def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): deadline = time.time() + 60 while True: @@ -630,7 +629,7 @@ def test_start_new_query(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @yq_v1 - def test_stop_start777(self, kikimr, client): + def test_stop_start(self, kikimr, client): self.init(client, "test_stop_start", 10) sql1 = Rf''' @@ -639,7 +638,7 @@ def test_stop_start777(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));''' query_id = start_yds_query(kikimr, client, sql1) - #wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) data = ['{"time": 101}', '{"time": 102}'] self.write_stream(data) @@ -1232,7 +1231,8 @@ def test_json_errors(self, kikimr, client): @yq_v1 def test_redistribute_partition_after_timeout(self, kikimr, client): - self.init(client, "redistribute", partitions=10) + partitions_count = 3 + self.init(client, "redistribute", partitions=partitions_count) sql = Rf''' --PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -1240,25 +1240,20 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' query_id = start_yds_query(kikimr, client, sql) - session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10) - logging.debug(f"Topic session node: {session_node_index}") + session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) + kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) - message_count = 20 + message_count = 10 expected = "hello" for i in range(message_count): - self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) + self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i)) self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) - # data = ['{"time": 101, "data": "hello"}'] - # self.write_stream(data) - # expected = ['hello'] - # assert self.read_stream(len(expected), topic_path=self.output_topic) == expected - logging.debug(f"Stopping node: {session_node_index}") kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop() expected = "Relativitätstheorie" for i in range(message_count): - self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8)))) - assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count \ No newline at end of file + self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i)) + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 9214ad3cbad0..25625cdc05e7 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -537,6 +537,7 @@ def fill_config(self, control_plane): 'max_session_used_memory': 1000000, 'without_consumer': True} fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} + fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} fq_config['row_dispatcher']['coordinator']['database'] = {} self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'], "RowDispatcher_" + self.uuid) From f5468f302fbaa7ae017e2258942dbb818ae971bf Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Sun, 9 Nov 2025 19:26:35 +0000 Subject: [PATCH 08/15] edit test --- ydb/tests/fq/yds/test_row_dispatcher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index e9fd0e52edaa..c8c4bc1ec6b5 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1234,15 +1234,16 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): partitions_count = 3 self.init(client, "redistribute", partitions=partitions_count) sql = Rf''' - --PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; + PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' + time.sleep(10) query_id = start_yds_query(kikimr, client, sql) session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) - + message_count = 10 expected = "hello" for i in range(message_count): From cbe51154afab3531660d0e0ff731d107546413b3 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 06:18:03 +0000 Subject: [PATCH 09/15] style fix --- ydb/tests/fq/yds/test_row_dispatcher.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index c8c4bc1ec6b5..3ceae887fd61 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -6,8 +6,6 @@ import logging import time import json -import random -import string from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase @@ -75,7 +73,7 @@ def wait_actor_count(kikimr, activity, expected_count): for node_index in kikimr.compute_plane.kikimr_cluster.nodes: count = count + kikimr.compute_plane.get_actor_count(node_index, activity) if count == expected_count: - return node_index # return any node + return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) return None @@ -1243,7 +1241,7 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): query_id = start_yds_query(kikimr, client, sql) session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) - + message_count = 10 expected = "hello" for i in range(message_count): From ec8344f822b1c861a7a0fe2821327eac008c5607 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 12:54:54 +0000 Subject: [PATCH 10/15] remove comments --- ydb/core/fq/libs/actors/nodes_manager.cpp | 4 ---- .../fq/libs/row_dispatcher/coordinator.cpp | 19 ++++++------------- ydb/tests/fq/yds/test_row_dispatcher.py | 3 ++- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index ca03bc8dba93..9bfdba23dbfe 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -347,8 +347,6 @@ class TNodesManagerActor : public NActors::TActorBootstrappedGet()->Status.GetStatus(); if (!ev->Get()->Status.IsSuccess()) { @@ -358,8 +356,6 @@ class TNodesManagerActor : public NActors::TActorBootstrapped>(); nodesInfo->reserve(res.nodes().size()); - LOG_T("Nodes count " << res.nodes().size()); - Peers.clear(); std::set nodeIds; // may be not unique diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index e76869f108e2..cc44b252a499 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -33,14 +33,14 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChanged = Counters->GetCounter("LeaderChanged", true); - PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode"); + KnownRowDispatchers = Counters->GetCounter("KnownRowDispatchers"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged; ::NMonitoring::TDynamicCounters::TCounterPtr IsActive; - ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode; + ::NMonitoring::TDynamicCounters::TCounterPtr KnownRowDispatchers; }; struct TEvPrivate { @@ -318,6 +318,9 @@ void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, boo return; } auto nodeState = State == ENodeState::Initializing ? ENodeState::Started : ENodeState::Initializing; + if (PartitionLocations.empty()) { + nodeState = ENodeState::Started; + } LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); RowDispatchers.emplace(actorId, RowDispatcherInfo{true, nodeState, isLocal}); @@ -330,6 +333,7 @@ void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, boo } UpdatePendingReadActors(); + Metrics.KnownRowDispatchers->Set(RowDispatchers.size()); } void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) { @@ -401,9 +405,6 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); RebalancingScheduled = true; } - // if (info.State == ENodeState::Connected) { - // info.State = ENodeState::Disconnected; - // } } } @@ -427,9 +428,6 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); RebalancingScheduled = true; } - // if (info.State == ENodeState::Connected) { - // info.State = ENodeState::Disconnected; - // } return; } } @@ -451,7 +449,6 @@ TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TTo } std::optional TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key, const TSet& filteredNodeIds) { - LOG_ROW_DISPATCHER_INFO("GetAndUpdateLocation"); Y_ENSURE(!PartitionLocations.contains(key)); auto& topicInfo = GetOrCreateTopicInfo(key.Topic); @@ -465,11 +462,9 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition ui64 bestNumberPartitions = std::numeric_limits::max(); for (auto& [location, info] : RowDispatchers) { if (info.State != ENodeState::Started) { - LOG_ROW_DISPATCHER_INFO(" State not connected"); continue; } if (!filteredNodeIds.empty() && !filteredNodeIds.contains(location.NodeId())) { - LOG_ROW_DISPATCHER_INFO(" filteredNodeIds"); continue; } ui64 numberPartitions = 0; @@ -483,8 +478,6 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition } } if (!bestLocation) { - LOG_ROW_DISPATCHER_INFO("Not found location"); - return std::nullopt; } diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 3ceae887fd61..5ab0ad6c092f 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1231,13 +1231,14 @@ def test_json_errors(self, kikimr, client): def test_redistribute_partition_after_timeout(self, kikimr, client): partitions_count = 3 self.init(client, "redistribute", partitions=partitions_count) + wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1) + sql = Rf''' PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));''' - time.sleep(10) query_id = start_yds_query(kikimr, client, sql) session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count) kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) From d26b08b3ae3347b29a13f1f59007a4e696133b54 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 16:32:52 +0300 Subject: [PATCH 11/15] Update ydb/tests/fq/yds/test_row_dispatcher.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ydb/tests/fq/yds/test_row_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 5ab0ad6c092f..730cfb527fcd 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1247,7 +1247,7 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): expected = "hello" for i in range(message_count): self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i)) - self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count + assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2) logging.debug(f"Stopping node: {session_node_index}") From 587703b012e84dfaf07d81b46ec168f212b274a7 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 16:33:15 +0300 Subject: [PATCH 12/15] Update ydb/tests/tools/fq_runner/kikimr_runner.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ydb/tests/tools/fq_runner/kikimr_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index 25625cdc05e7..98bc27c07252 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -371,7 +371,7 @@ def wait_completed_checkpoints(self, query_id, expected, completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist) if completed >= expected: break - assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected" + str(expected) + assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected) time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2)) def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150), From 6bcdcbaec696c90872a005c60bb25dfd0e11880b Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 18:34:47 +0000 Subject: [PATCH 13/15] fixes by review --- .../fq/libs/row_dispatcher/coordinator.cpp | 22 +++++++++++-------- ydb/tests/fq/yds/test_row_dispatcher.py | 1 - 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index cc44b252a499..673574cde642 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -26,6 +26,8 @@ using NYql::TIssues; namespace { +const ui64 DefaultRebalancingTimeoutSec = 60; + //////////////////////////////////////////////////////////////////////////////// struct TCoordinatorMetrics { @@ -115,8 +117,8 @@ class TActorCoordinator : public TActorBootstrapped { Started }; - struct RowDispatcherInfo { - RowDispatcherInfo(bool connected, ENodeState state, bool isLocal) + struct TRowDispatcherInfo { + TRowDispatcherInfo(bool connected, ENodeState state, bool isLocal) : Connected(connected) , State(state) , IsLocal(isLocal) {} @@ -193,7 +195,7 @@ class TActorCoordinator : public TActorBootstrapped { TActorId LocalRowDispatcherId; const TString LogPrefix; const TString Tenant; - TMap RowDispatchers; + TMap RowDispatchers; THashMap PartitionLocations; THashMap TopicsInfo; std::unordered_map PendingReadActors; @@ -204,6 +206,7 @@ class TActorCoordinator : public TActorBootstrapped { NActors::TActorId NodesManagerId; bool RebalancingScheduled = false; ENodeState State = ENodeState::Initializing; + TDuration RebalancingTimeout; public: TActorCoordinator( @@ -274,6 +277,7 @@ TActorCoordinator::TActorCoordinator( , Tenant(tenant) , Metrics(counters) , NodesManagerId(nodesManagerId) + , RebalancingTimeout(Config.GetRebalancingTimeout() ? Config.GetRebalancingTimeout() : DefaultRebalancingTimeoutSec) { UpdateKnownRowDispatchers(localRowDispatcherId, true); } @@ -282,11 +286,11 @@ void TActorCoordinator::Bootstrap() { Become(&TActorCoordinator::StateFunc); Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); ScheduleNodeInfoRequest(); - Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvStartingTimeout()); + Schedule(RebalancingTimeout, new TEvPrivate::TEvStartingTimeout()); // Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId - << ", rebalancing timeout " << Config.GetRebalancingTimeout()); + << ", rebalancing timeout " << RebalancingTimeout); auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId())); Metrics.IsActive = nodeGroup->GetCounter("IsActive"); } @@ -323,12 +327,12 @@ void TActorCoordinator::UpdateKnownRowDispatchers(NActors::TActorId actorId, boo } LOG_ROW_DISPATCHER_TRACE("Add new row dispatcher to map (state " << static_cast(nodeState) << ")"); - RowDispatchers.emplace(actorId, RowDispatcherInfo{true, nodeState, isLocal}); + RowDispatchers.emplace(actorId, TRowDispatcherInfo{true, nodeState, isLocal}); UpdateGlobalState(); if (nodeState == ENodeState::Initializing && !RebalancingScheduled) { LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); - Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); RebalancingScheduled = true; } @@ -402,7 +406,7 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected: if (!RebalancingScheduled) { LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); - Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); RebalancingScheduled = true; } } @@ -425,7 +429,7 @@ void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { if (!RebalancingScheduled) { LOG_ROW_DISPATCHER_TRACE("Schedule TEvRebalancing"); - Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvRebalancing()); + Schedule(RebalancingTimeout, new TEvPrivate::TEvRebalancing()); RebalancingScheduled = true; } return; diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 730cfb527fcd..1d97c16ba745 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -76,7 +76,6 @@ def wait_actor_count(kikimr, activity, expected_count): return node_index # return any node assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" time.sleep(1) - return None def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True): From fe52a7900b39e03882dac1a891e7daced4760efc Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 10 Nov 2025 18:48:40 +0000 Subject: [PATCH 14/15] try to fix build --- ydb/core/fq/libs/row_dispatcher/coordinator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 673574cde642..0be5c465b6ac 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -26,7 +26,7 @@ using NYql::TIssues; namespace { -const ui64 DefaultRebalancingTimeoutSec = 60; +const ui64 DefaultRebalancingTimeoutSec = 120; //////////////////////////////////////////////////////////////////////////////// @@ -277,7 +277,7 @@ TActorCoordinator::TActorCoordinator( , Tenant(tenant) , Metrics(counters) , NodesManagerId(nodesManagerId) - , RebalancingTimeout(Config.GetRebalancingTimeout() ? Config.GetRebalancingTimeout() : DefaultRebalancingTimeoutSec) + , RebalancingTimeout(Config.GetRebalancingTimeout() ? Config.GetRebalancingTimeout() : TDuration::Seconds(DefaultRebalancingTimeoutSec)) { UpdateKnownRowDispatchers(localRowDispatcherId, true); } From 5999ba4c6637d65a699664692e54656d866f4792 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Wed, 12 Nov 2025 13:25:52 +0000 Subject: [PATCH 15/15] fix style --- ydb/tests/fq/yds/test_row_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index c7e971dd6700..77987adb85f9 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -1287,7 +1287,7 @@ def test_redistribute_partition_after_timeout(self, kikimr, client): partitions_count = 3 self.init(client, "redistribute", partitions=partitions_count) wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1) - + sql = Rf''' PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@; INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`