Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
TBatch() = default;
TBatch(std::initializer_list<TRow> rows);

template <typename TIterator>
TBatch(const TIterator& begin, const TIterator& end)
: Rows{begin, end} {
}

TBatch& AddRow(TRow row);

public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,20 @@ Y_UNIT_TEST_SUITE(TestJsonParser) {
GetMessage(FIRST_OFFSET + 2, R"({"a2": "102"})")
});
}

Y_UNIT_TEST_F(SkipErrors1JsonIn2Messages, TJsonParserFixtureSkipErrors) {
ExpectedBatches = 1;
CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector<std::span<NYql::NUdf::TUnboxedValue>> /*result*/) {
UNIT_ASSERT_VALUES_EQUAL(3, numberRows);
}, false));

Parser->ParseMessages({
GetMessage(FIRST_OFFSET, R"({"a1": "hel)"),
GetMessage(FIRST_OFFSET + 1, R"(lo0", "a2": "100"})"),
GetMessage(FIRST_OFFSET + 2, R"({"a1": "hello0", "a2": "100"})"),
GetMessage(FIRST_OFFSET + 3, R"({"a2": "hello2", "a2": "102"})"),
});
}
}

Y_UNIT_TEST_SUITE(TestRawParser) {
Expand Down
109 changes: 51 additions & 58 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,33 @@ class TFixture : public NTests::TBaseFixture {
Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release()));
}

void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected, const std::vector<ui64>& expectedLastOffset = {}) {
Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch()));
void ExpectMessageBatch(NActors::TActorId readActorId, TList<TRow> expected, bool expectNewDataArrived = true, std::vector<ui64> expectedLastOffset = {}) {
while (!expected.empty()) {
if (expectNewDataArrived) {
ExpectNewDataArrived({readActorId});
}
Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch()));
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId);

auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId);
UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize());

NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0);
UNIT_ASSERT_VALUES_EQUAL(message.OffsetsSize(), expected.Rows.size());
if (!expectedLastOffset.empty()) {
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset.size(), message.OffsetsSize());
for (size_t i =0; i < expectedLastOffset.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset[i], message.GetOffsets().Get(i));
UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize());
NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0);
UNIT_ASSERT(message.OffsetsSize() <= expected.size());

if (!expectedLastOffset.empty()) {
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset.size(), message.OffsetsSize());
for (size_t i = 0; i < message.OffsetsSize(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(expectedLastOffset[i], message.GetOffsets().Get(i));
}
expectedLastOffset.erase(expectedLastOffset.begin(), expectedLastOffset.begin() + message.OffsetsSize());
}
auto itEnd = expected.begin();
advance(itEnd, message.OffsetsSize());
TBatch expectedBatch{expected.begin(), itEnd};
expected.erase(expected.begin(), itEnd);
CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expectedBatch);
}
CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected);
}

void ExpectSessionError(NActors::TActorId readActorId, TStatusCode statusCode, TString message = "") {
Expand Down Expand Up @@ -296,17 +306,17 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});

ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false);
ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}});

data = { Json2 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});

ExpectStatistics({{ReadActorId1, 1}, {ReadActorId2, 1}});
ExpectMessageBatch(ReadActorId1, { JsonMessage(2) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(2) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }, false);
ExpectStatistics({{ReadActorId1, 2}, {ReadActorId2, 2}});

auto source2 = BuildSource(false, "OtherConsumer");
Expand All @@ -329,10 +339,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
const std::vector<TString> data = { Json1 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
Runtime.Send(new IEventHandle(TopicSession, ReadActorId1, new TEvRowDispatcher::TEvGetNextBatch()));
Runtime.Send(new IEventHandle(TopicSession, ReadActorId2, new TEvRowDispatcher::TEvGetNextBatch()));
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false);

StopSession(ReadActorId1, source1);
StopSession(ReadActorId2, source2);
Expand All @@ -350,8 +358,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
const std::vector<TString> data = { Json1 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(1) }, false);

StopSession(ReadActorId1, source1);
StopSession(ReadActorId2, source2);
Expand All @@ -366,7 +374,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

const std::vector<TString> data = { Json1 };
PQWrite(data);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });

StartSession(ReadActorId2, source);
Expand All @@ -375,8 +382,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
PQWrite(data2);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});

ExpectMessageBatch(ReadActorId1, { JsonMessage(2) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(2) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(2) }, false);

StopSession(ReadActorId1, source);
StopSession(ReadActorId2, source);
Expand All @@ -394,17 +401,14 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId2, source, 2);

ExpectNewDataArrived({ReadActorId1, ReadActorId2});
TBatch expected1 = { JsonMessage(2), JsonMessage(3) };
ExpectMessageBatch(ReadActorId1, expected1);

TBatch expected2 = { JsonMessage(3) };
ExpectMessageBatch(ReadActorId2, expected2);
ExpectMessageBatch(ReadActorId1, { JsonMessage(2), JsonMessage(3) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(3) }, false);

const std::vector<TString> data2 = { Json4 };
PQWrite(data2);
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
ExpectMessageBatch(ReadActorId1, { JsonMessage(4) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(4) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(4) }, false);

StopSession(ReadActorId1, source);
StopSession(ReadActorId2, source);
Expand Down Expand Up @@ -437,12 +441,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId2, source);

PQWrite({ Json1 });
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1) });
ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values");

PQWrite({ Json2 });
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(2) });
ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values");

Expand All @@ -459,7 +461,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

const std::vector<TString> data = { Json1, Json2, Json3 }; // offset 0, 1, 2
PQWrite(data);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) });

// Restart topic session.
Expand All @@ -469,8 +470,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
PQWrite({ Json4 });
ExpectNewDataArrived({ReadActorId1});

ExpectMessageBatch(ReadActorId1, { JsonMessage(4) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(2), JsonMessage(3), JsonMessage(4) });
ExpectMessageBatch(ReadActorId1, { JsonMessage(4) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(2), JsonMessage(3), JsonMessage(4) }, false);

StopSession(ReadActorId1, source);
StopSession(ReadActorId2, source);
Expand Down Expand Up @@ -548,8 +549,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

PQWrite({ json1, json2 });
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(1).AddString("field1"), JsonMessage(2).AddString("field2") });
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(1).AddString("field1"), JsonMessage(2).AddString("field2") }, false);

auto source3 = BuildSource();
source3.AddColumns("field2");
Expand All @@ -560,17 +561,16 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
TString json3 = "{\"dt\":300,\"value\":\"value3\", \"field1\":\"value1_field1\", \"field2\":\"value1_field2\"}";
PQWrite({ json3 });
ExpectNewDataArrived({ReadActorId1, ReadActorId2, readActorId3});
ExpectMessageBatch(ReadActorId1, { JsonMessage(3) });
ExpectMessageBatch(ReadActorId2, { JsonMessage(3).AddString("value1_field1") });
ExpectMessageBatch(readActorId3, { JsonMessage(3).AddString("value1_field2") });
ExpectMessageBatch(ReadActorId1, { JsonMessage(3) }, false);
ExpectMessageBatch(ReadActorId2, { JsonMessage(3).AddString("value1_field1") }, false);
ExpectMessageBatch(readActorId3, { JsonMessage(3).AddString("value1_field2") }, false);

StopSession(ReadActorId1, source3);
StopSession(readActorId3, source3);

TString json4 = "{\"dt\":400,\"value\":\"value4\", \"field1\":\"value2_field1\", \"field2\":\"value2_field2\"}";
TString json5 = "{\"dt\":500,\"value\":\"value5\", \"field1\":\"value3_field1\", \"field2\":\"value3_field2\"}";
PQWrite({ json4, json5 });
ExpectNewDataArrived({ReadActorId2});
ExpectMessageBatch(ReadActorId2, { JsonMessage(4).AddString("value2_field1"), JsonMessage(5).AddString("value3_field1") });

StopSession(ReadActorId1, source1);
Expand All @@ -589,7 +589,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

TString json1 = "{\"dt\":100,\"field1\":\"str\",\"value\":\"value1\"}";
PQWrite({ json1 });
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str", true) });

auto source2 = BuildSource();
Expand All @@ -606,13 +605,11 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId1, source);
std::vector<TString> data = { Json1, Json2, Json3 };
PQWrite(data, 1);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(2), JsonMessage(3) });

StartSession(ReadActorId2, source, 1);
std::vector<TString> data2 = { Json1 };
PQWrite(data2, 1);
ExpectNewDataArrived({ReadActorId2});
ExpectMessageBatch(ReadActorId2, { JsonMessage(1)});

StopSession(ReadActorId2, source);
Expand All @@ -622,7 +619,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

std::vector<TString> data3 = { Json4 };
PQWrite(data3, 4);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(4) });

PassAway();
Expand All @@ -634,16 +630,15 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
Init(topicName, std::numeric_limits<ui64>::max(), true);
auto source = BuildSource();
StartSession(ReadActorId1, source);
auto writeRead = [&](const std::vector<TString>& input, const TBatch& output) {

auto writeRead = [&](const std::vector<TString>& input, const TList<TRow>& output) {
PQWrite(input);
if (output.Rows.empty()) {
if (output.empty()) {
return;
}
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, output);
};

auto test = [&](const TString& wrongJson) {
writeRead({ wrongJson }, { });
Sleep(TDuration::MilliSeconds(100));
Expand All @@ -661,8 +656,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
test("}");
test("{");
writeRead({ "{\"dt\":100}", "{}\x80", Json3 }, { JsonMessage(3) });
writeRead({Json1 + Json1, Json3 }, { JsonMessage(1), JsonMessage(1) }); // not checked
writeRead({Json1.substr(0, 3), Json1.substr(3), Json2, Json3 }, { JsonMessage(1), JsonMessage(2), JsonMessage(3) });
writeRead({Json1 + Json1 }, { JsonMessage(1) }); // not checked
PassAway();
}

Expand All @@ -675,8 +669,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {

TString wrongJson{"wrong"};
PQWrite({ Json1, wrongJson, wrongJson, Json3 });
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(3) }, {0, 3});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1), JsonMessage(3) }, true, {0, 3});
PassAway();
}
}
Expand Down