Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
}

void Handle(NFq::TEvNodesManager::TEvGetNodesRequest::TPtr& ev) {
LOG_D("TNodesManagerActor::TEvGetNodesRequest");
LOG_T("Received TNodesManagerActor::TEvGetNodesRequest");
auto response = MakeHolder<NFq::TEvNodesManager::TEvGetNodesResponse>();
response->NodeIds.reserve(Peers.size());
for (const auto& info : Peers) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig {
// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
// Request will hang up infinitely, disabled by default
uint64 TopicPartitionsLimitPerNode = 4;
uint64 TopicPartitionsLimitPerNode = 4; // deprecated
uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected.
}

message TJsonParserConfig {
Expand Down
223 changes: 192 additions & 31 deletions ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/events/data_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct TEvRowDispatcher {
EvPurecalcCompileRequest,
EvPurecalcCompileResponse,
EvPurecalcCompileAbort,
EvCoordinatorDistributionReset,
EvEnd,
};

Expand Down Expand Up @@ -91,6 +92,11 @@ struct TEvRowDispatcher {
TEvCoordinatorResult() = default;
};

struct TEvCoordinatorDistributionReset : public NActors::TEventPB<TEvCoordinatorDistributionReset,
NFq::NRowDispatcherProto::TEvCoordinatorDistributionReset, EEv::EvCoordinatorDistributionReset> {
TEvCoordinatorDistributionReset() = default;
};

// Session events (with seqNo checks)

struct TEvStartSession : public NActors::TEventPB<TEvStartSession,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/protos/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ message TEvGetAddressResponse {
repeated Ydb.Issue.IssueMessage Issues = 2;
}

message TEvCoordinatorDistributionReset {
}

message TPartitionOffset {
uint32 PartitionId = 1;
uint64 Offset = 2;
Expand Down
54 changes: 53 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class TFixture : public NUnitTest::TBaseFixture {
database.SetEndpoint("YDB_ENDPOINT");
database.SetDatabase("YDB_DATABASE");
database.SetToken("");
config.SetRebalancingTimeoutSec(1);

Coordinator = Runtime.Register(NewCoordinator(
LocalRowDispatcherId,
Expand Down Expand Up @@ -109,6 +110,11 @@ class TFixture : public NUnitTest::TBaseFixture {
return result;
}

void ExpectDistributionReset(NActors::TActorId readActorId) {
auto eventPtr = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset>(readActorId, TDuration::Seconds(5));
UNIT_ASSERT(eventPtr.Get() != nullptr);
}

void ProcessNodesManagerRequest(ui64 nodesCount) {
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvNodesManager::TEvGetNodesRequest>(NodesManager, TDuration::Seconds(5));
UNIT_ASSERT(eventHolder.Get() != nullptr);
Expand Down Expand Up @@ -200,7 +206,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {

Y_UNIT_TEST_F(WaitNodesConnected, TFixture) {
ExpectCoordinatorChangesSubscribe();
ProcessNodesManagerRequest(4);
ProcessNodesManagerRequest(3);
Ping(RowDispatcher1Id);

MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
Expand Down Expand Up @@ -229,6 +235,52 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId());
UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId());
}

Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) {
ExpectCoordinatorChangesSubscribe();
ProcessNodesManagerRequest(1);
TSet<NActors::TActorId> rowDispatcherIds{LocalRowDispatcherId};
for (auto id : rowDispatcherIds) {
Ping(id);
}
MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId());
MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1});
auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId());
UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2);

Ping(RowDispatcher1Id);
ExpectDistributionReset(ReadActor1);
ExpectDistributionReset(ReadActor2);

MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId());
MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1});
rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId());
UNIT_ASSERT(rdActor1 != rdActor2);
}

Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) {
ExpectCoordinatorChangesSubscribe();
ProcessNodesManagerRequest(3);
TSet<NActors::TActorId> rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId};
for (auto id : rowDispatcherIds) {
Ping(id);
}

MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2});
auto result1 = ExpectResult(ReadActor1);
UNIT_ASSERT(result1.PartitionsSize() == 3);

auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId());
Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event));

ExpectDistributionReset(ReadActor1);

MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2});
result1 = ExpectResult(ReadActor1);
UNIT_ASSERT(result1.PartitionsSize() == 2);
}
}

}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ PEERDIR(
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/ydb
ydb/core/mon
ydb/core/mind

ydb/library/actors/core
ydb/library/security
Expand Down
13 changes: 13 additions & 0 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev);

void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
Expand All @@ -376,6 +377,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle);
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle);

hFunc(NActors::TEvents::TEvPong, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
Expand Down Expand Up @@ -403,6 +405,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession);
hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession);
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession);
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle);

hFunc(NActors::TEvents::TEvPong, Handle);
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
Expand Down Expand Up @@ -909,6 +912,16 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest:
Send(ev->Sender, response.release(), 0, ev->Cookie);
}

void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) {
if (CoordinatorActorId != ev->Sender) {
SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")");
return;
}
SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender);
ReInit("Distribution changed");
ScheduleProcessState();
}

void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
auto partitionId = ev->Get()->Record.GetPartitionId();
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
Expand Down
17 changes: 17 additions & 0 deletions ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ class TFixture : public TPqIoTestFixture {
AssertDataWithWatermarks(expected, actual);
}

void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset();
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0));
});
}

public:
NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings(
"topic",
Expand Down Expand Up @@ -889,6 +896,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
f.ReadMessages(expected);
}
}

Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) {
StartSession(Settings);
MockCoordinatorDistributionReset(CoordinatorId1);

auto req = ExpectCoordinatorRequest(CoordinatorId1);
MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie);
ExpectStartSession({}, RowDispatcherId2, 2);
MockAck(RowDispatcherId2, 2, PartitionId1);
}
}

} // namespace NYql::NDq
46 changes: 38 additions & 8 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ def wait_actor_count(kikimr, activity, expected_count):
count = 0
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
count = count + kikimr.compute_plane.get_actor_count(node_index, activity)
if count == expected_count:
break
if count == expected_count:
return node_index # return any node
assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}"
time.sleep(1)
pass


def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True):
Expand Down Expand Up @@ -617,20 +616,20 @@ def test_start_new_query(self, kikimr, client):

@yq_v1
def test_stop_start(self, kikimr, client):
self.init(client, "test_stop_start")
self.init(client, "test_stop_start", 10)

sql1 = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''

query_id = start_yds_query(kikimr, client, sql1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10)

data = ['{"time": 101}', '{"time": 102}']
self.write_stream(data)
expected = ['101', '102']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)

kikimr.compute_plane.wait_completed_checkpoints(
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
Expand All @@ -652,7 +651,7 @@ def test_stop_start(self, kikimr, client):

self.write_stream(data)
expected = ['103', '104']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)

stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand All @@ -667,7 +666,7 @@ def test_stop_start2(self, kikimr, client):
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
self.write_stream(['{"time": 101}', '{"time": 102}'])
expected = ['101', '102']
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)

kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2)
stop_yds_query(client, query_id1)
Expand Down Expand Up @@ -1233,3 +1232,34 @@ def test_json_errors(self, kikimr, client, use_binding):
assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}"
time.sleep(1)
stop_yds_query(client, query_id)

@yq_v1
def test_redistribute_partition_after_timeout(self, kikimr, client):
partitions_count = 3
self.init(client, "redistribute", partitions=partitions_count)
wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1)

sql = Rf'''
PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@;
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));'''

query_id = start_yds_query(kikimr, client, sql)
session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count)
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)

message_count = 10
expected = "hello"
for i in range(message_count):
self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i))
assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)

logging.debug(f"Stopping node: {session_node_index}")
kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop()

expected = "Relativitätstheorie"
for i in range(message_count):
self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i))
assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count
7 changes: 4 additions & 3 deletions ydb/tests/tools/fq_runner/kikimr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False):
return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints",
expect_counters_exist=expect_counters_exist)

def wait_completed_checkpoints(self, query_id, checkpoints_count,
def wait_completed_checkpoints(self, query_id, expected,
timeout=plain_or_under_sanitizer_wrapper(30, 150),
expect_counters_exist=False):
deadline = time.time() + timeout
while True:
completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist)
if completed >= checkpoints_count:
if completed >= expected:
break
assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed)
assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected)
time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2))

def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150),
Expand Down Expand Up @@ -537,6 +537,7 @@ def fill_config(self, control_plane):
'max_session_used_memory': 1000000,
'without_consumer': True}
fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"}
fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"}
fq_config['row_dispatcher']['coordinator']['database'] = {}
self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'],
"RowDispatcher_" + self.uuid)
Expand Down
Loading