diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h index ce54150b4608..d2767c2ff7d1 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h @@ -42,6 +42,11 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser { TBatch() = default; TBatch(std::initializer_list rows); + template + TBatch(const TIterator& begin, const TIterator& end) + : Rows{begin, end} { + } + TBatch& AddRow(TRow row); public: diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index c06cf95e1873..bdd1e135503d 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -564,6 +564,20 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { GetMessage(FIRST_OFFSET + 2, R"({"a2": "102"})") }); } + + Y_UNIT_TEST_F(SkipErrors1JsonIn2Messages, TJsonParserFixtureSkipErrors) { + ExpectedBatches = 1; + CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector> /*result*/) { + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + }, false)); + + Parser->ParseMessages({ + GetMessage(FIRST_OFFSET, R"({"a1": "hel)"), + GetMessage(FIRST_OFFSET + 1, R"(lo0", "a2": "100"})"), + GetMessage(FIRST_OFFSET + 2, R"({"a1": "hello0", "a2": "100"})"), + GetMessage(FIRST_OFFSET + 3, R"({"a2": "hello2", "a2": "102"})"), + }); + } } Y_UNIT_TEST_SUITE(TestRawParser) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 16d8fcdae2e0..c2729b0a87b0 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -148,23 +148,33 @@ class TFixture : public NTests::TBaseFixture { Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release())); } - void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected, const std::vector& expectedLastOffset = {}) { - Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch())); + void ExpectMessageBatch(NActors::TActorId readActorId, TList expected, bool expectNewDataArrived = true, std::vector expectedLastOffset = {}) { + while (!expected.empty()) { + if (expectNewDataArrived) { + ExpectNewDataArrived({readActorId}); + } + Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch())); + auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); + UNIT_ASSERT(eventHolder.Get() != nullptr); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); - auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); - UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); - UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize()); - - NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0); - UNIT_ASSERT_VALUES_EQUAL(message.OffsetsSize(), expected.Rows.size()); - if (!expectedLastOffset.empty()) { - UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset.size(), message.OffsetsSize()); - for (size_t i =0; i < expectedLastOffset.size(); ++i) { - UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset[i], message.GetOffsets().Get(i)); + UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize()); + NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0); + UNIT_ASSERT(message.OffsetsSize() <= expected.size()); + + if (!expectedLastOffset.empty()) { + UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset.size(), message.OffsetsSize()); + for (size_t i = 0; i < message.OffsetsSize(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset[i], message.GetOffsets().Get(i)); + } + expectedLastOffset.erase(expectedLastOffset.begin(), expectedLastOffset.begin() + message.OffsetsSize()); } + auto itEnd = expected.begin(); + advance(itEnd, message.OffsetsSize()); + TBatch expectedBatch{expected.begin(), itEnd}; + expected.erase(expected.begin(), itEnd); + CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expectedBatch); } - CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected); } void ExpectSessionError(NActors::TActorId readActorId, TStatusCode statusCode, TString message = "") { @@ -296,8 +306,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false); ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}}); data = { Json2 }; @@ -305,8 +315,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }, false); ExpectStatistics({{ReadActorId1, 2}, {ReadActorId2, 2}}); auto source2 = BuildSource(false, "OtherConsumer"); @@ -329,10 +339,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { Json1 }; PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch())); - Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch())); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false); StopSession(ReadActorId1, source1); StopSession(ReadActorId2, source2); @@ -350,8 +358,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { Json1 }; PQWrite(data); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false); StopSession(ReadActorId1, source1); StopSession(ReadActorId2, source2); @@ -366,7 +374,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { Json1 }; PQWrite(data); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); StartSession(ReadActorId2, source); @@ -375,8 +382,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { PQWrite(data2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }, false); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); @@ -394,17 +401,14 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId2, source, 2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - TBatch expected1 = { JsonMessage(2), JsonMessage(3) }; - ExpectMessageBatch(ReadActorId1, expected1); - - TBatch expected2 = { JsonMessage(3) }; - ExpectMessageBatch(ReadActorId2, expected2); + ExpectMessageBatch(ReadActorId1, { JsonMessage(2), JsonMessage(3) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(3) }, false); const std::vector data2 = { Json4 }; PQWrite(data2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(4) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(4) }, false); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); @@ -437,12 +441,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId2, source); PQWrite({ Json1 }); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); PQWrite({ Json2 }); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); @@ -459,7 +461,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { Json1, Json2, Json3 }; // offset 0, 1, 2 PQWrite(data); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) }); // Restart topic session. @@ -469,8 +470,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { PQWrite({ Json4 }); ExpectNewDataArrived({ReadActorId1}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(2), JsonMessage(3), JsonMessage(4) }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(2), JsonMessage(3), JsonMessage(4) }, false); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); @@ -548,8 +549,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { PQWrite({ json1, json2 }); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(1).AddString("field1"), JsonMessage(2).AddString("field2") }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(1).AddString("field1"), JsonMessage(2).AddString("field2") }, false); auto source3 = BuildSource(); source3.AddColumns("field2"); @@ -560,9 +561,9 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json3 = "{\"dt\":300,\"value\":\"value3\", \"field1\":\"value1_field1\", \"field2\":\"value1_field2\"}"; PQWrite({ json3 }); ExpectNewDataArrived({ReadActorId1, ReadActorId2, readActorId3}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(3) }); - ExpectMessageBatch(ReadActorId2, { JsonMessage(3).AddString("value1_field1") }); - ExpectMessageBatch(readActorId3, { JsonMessage(3).AddString("value1_field2") }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(3) }, false); + ExpectMessageBatch(ReadActorId2, { JsonMessage(3).AddString("value1_field1") }, false); + ExpectMessageBatch(readActorId3, { JsonMessage(3).AddString("value1_field2") }, false); StopSession(ReadActorId1, source3); StopSession(readActorId3, source3); @@ -570,7 +571,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json4 = "{\"dt\":400,\"value\":\"value4\", \"field1\":\"value2_field1\", \"field2\":\"value2_field2\"}"; TString json5 = "{\"dt\":500,\"value\":\"value5\", \"field1\":\"value3_field1\", \"field2\":\"value3_field2\"}"; PQWrite({ json4, json5 }); - ExpectNewDataArrived({ReadActorId2}); ExpectMessageBatch(ReadActorId2, { JsonMessage(4).AddString("value2_field1"), JsonMessage(5).AddString("value3_field1") }); StopSession(ReadActorId1, source1); @@ -589,7 +589,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json1 = "{\"dt\":100,\"field1\":\"str\",\"value\":\"value1\"}"; PQWrite({ json1 }); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str", true) }); auto source2 = BuildSource(); @@ -606,13 +605,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId1, source); std::vector data = { Json1, Json2, Json3 }; PQWrite(data, 1); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) }); StartSession(ReadActorId2, source, 1); std::vector data2 = { Json1 }; PQWrite(data2, 1); - ExpectNewDataArrived({ReadActorId2}); ExpectMessageBatch(ReadActorId2, { JsonMessage(1)}); StopSession(ReadActorId2, source); @@ -622,7 +619,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { std::vector data3 = { Json4 }; PQWrite(data3, 4); - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }); PassAway(); @@ -634,16 +630,15 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { Init(topicName, std::numeric_limits::max(), true); auto source = BuildSource(); StartSession(ReadActorId1, source); - - auto writeRead = [&](const std::vector& input, const TBatch& output) { + + auto writeRead = [&](const std::vector& input, const TList& output) { PQWrite(input); - if (output.Rows.empty()) { + if (output.empty()) { return; } - ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, output); }; - + auto test = [&](const TString& wrongJson) { writeRead({ wrongJson }, { }); Sleep(TDuration::MilliSeconds(100)); @@ -661,8 +656,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { test("}"); test("{"); writeRead({ "{\"dt\":100}", "{}\x80", Json3 }, { JsonMessage(3) }); - writeRead({Json1 + Json1, Json3 }, { JsonMessage(1), JsonMessage(1) }); // not checked - writeRead({Json1.substr(0, 3), Json1.substr(3), Json2, Json3 }, { JsonMessage(1), JsonMessage(2), JsonMessage(3) }); + writeRead({Json1 + Json1 }, { JsonMessage(1) }); // not checked PassAway(); } @@ -675,8 +669,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString wrongJson{"wrong"}; PQWrite({ Json1, wrongJson, wrongJson, Json3 }); - ExpectNewDataArrived({ReadActorId1}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(3) }, {0, 3}); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(3) }, true, {0, 3}); PassAway(); } }