diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index a34c6c0bd9c6..bdbd49831d75 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -30,7 +30,7 @@ class TTopicFilters : public ITopicFilters { , Counters_(std::move(counters)) {} - void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector*>& values, ui64 numberRows) override { + void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector>& values, ui64 numberRows) override { LOG_ROW_DISPATCHER_TRACE("ProcessData for " << RunHandlers_.size() << " clients, number rows: " << numberRows); if (!numberRows) { @@ -220,18 +220,18 @@ class TTopicFilters : public ITopicFilters { RunHandlers_.erase(iter); } - void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector& /* offsets */, const TVector& columnIndex, const TVector*>& values, ui64 numberRows) { + void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector& /* offsets */, const TVector& columnIndex, const TVector>& values, ui64 numberRows) { const auto consumer = programRunHandler->GetConsumer(); const auto& columnIds = consumer->GetColumnIds(); - TVector*> result; + TVector> result; result.reserve(columnIds.size()); for (ui64 columnId : columnIds) { Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size()); const ui64 index = columnIndex[columnId]; Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size()); - if (const auto value = values[index]) { + if (const auto value = values[index]; !value.empty()) { result.emplace_back(value); } else { LOG_ROW_DISPATCHER_TRACE("Ignore processing for " << consumer->GetClientId() << ", client got parsing error for column " << columnId); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h index 74ef42b0f3ed..eaeacd63d5bb 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h @@ -15,7 +15,7 @@ class ITopicFilters : public TThrRefBase, public TNonCopyable { public: // columnIndex - mapping from stable column id to index in values array - virtual void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector*>& values, ui64 numberRows) = 0; + virtual void ProcessData(const TVector& columnIndex, const TVector& offsets, const TVector>& values, ui64 numberRows) = 0; virtual void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& ev) = 0; virtual TStatus AddPrograms(IProcessedDataConsumer::TPtr consumer, std::unordered_map programHolders) = 0; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp index b83b75a51095..44c147fc662d 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp @@ -79,7 +79,7 @@ NYT::TNode MakeWatermarkOutputSchema() { } struct TInputType { - const TVector*>& Values; + const TVector>& Values; ui64 NumberRows; }; @@ -156,8 +156,9 @@ class TInputConsumer : public NYql::NPureCalc::IConsumer { items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(rowId); - for (ui64 fieldId = 0; const auto column : input.Values) { - items[FieldsPositions[fieldId++]] = column->at(rowId); + for (ui64 fieldId = 0; const auto& column : input.Values) { + Y_DEBUG_ABORT_UNLESS(column.size() > rowId); + items[FieldsPositions[fieldId++]] = column[rowId]; } Worker->Push(std::move(result)); @@ -419,7 +420,7 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable ActiveFilters_->Dec(); } - void ProcessData(const TVector*>& values, ui64 numberRows) const override { + void ProcessData(const TVector>& values, ui64 numberRows) const override { LOG_ROW_DISPATCHER_TRACE("ProcessData for " << numberRows << " rows"); if (!ProgramHolder_) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h index 9c0c69353f72..d7971aad4b96 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h @@ -73,7 +73,7 @@ class IProgramRunHandler : public TThrRefBase { return ProgramHolder_; } - virtual void ProcessData(const TVector*>& values, ui64 numberRows) const = 0; + virtual void ProcessData(const TVector>& values, ui64 numberRows) const = 0; protected: TString Name_; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 1927c218798c..570df2a3efe2 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor, public void OnParsedData(ui64 numberRows) override { LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows); - Self.ParsedData.assign(ParerSchema.size(), nullptr); + Self.ParsedData.assign(ParerSchema.size(), std::span()); for (size_t i = 0; i < ParerSchema.size(); ++i) { auto columnStatus = Self.Parser->GetParsedColumn(i); if (Y_LIKELY(columnStatus.IsSuccess())) { @@ -267,8 +267,11 @@ class TTopicFormatHandler : public NActors::TActor, public }; for (size_t i = 0; const ui64 columnId : ColumnsIds) { + auto& parsedData = Self.ParsedData[Self.ParserSchemaIndex[columnId]]; + Y_DEBUG_ABORT_UNLESS(parsedData.size() > rowId); + // All data was locked in parser, so copy is safe - FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId); + FilteredRow[i++] = parsedData[rowId]; } DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size()); @@ -661,7 +664,7 @@ class TTopicFormatHandler : public NActors::TActor, public // Parsed data const TVector* Offsets; - TVector*> ParsedData; + TVector> ParsedData; bool RefreshScheduled = false; // Metrics diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index cdc7f7152d9d..10844d2e862b 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -26,7 +26,7 @@ namespace { if (Y_UNLIKELY(error)) \ struct TJsonParserBuffer { - size_t NumberValues = 0; + ui16 NumberValues = 0; bool Finished = false; TInstant CreationStartTime = TInstant::Now(); TVector Offsets = {}; @@ -84,20 +84,21 @@ class TColumnParser { TString TypeYson; public: - TStatus InitParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, const NKikimr::NMiniKQL::TType* typeMkql) { + TStatus InitParser(const TString& name, const TString& typeYson, std::span parsedRows, const NKikimr::NMiniKQL::TType* typeMkql) { Name = name; TypeYson = typeYson; IsOptional = false; Status = TStatus::Success(); - - if (2 * ParsedRows.capacity() < maxNumberRows) { - ParsedRows.reserve(maxNumberRows); - } - + ParsedRowsCount = 0; + ParsedRows = parsedRows; return Status = ExtractDataSlot(typeMkql); } - const TVector& GetParsedRows() const { + ui16 GetParsedRowsCount() const { + return ParsedRowsCount; + } + + const std::span& GetParsedRows() const { return ParsedRows; } @@ -105,11 +106,11 @@ class TColumnParser { return Status; } - void ParseJsonValue(ui64 offset, ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + void ParseJsonValue(ui64 offset, ui16 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { if (Y_UNLIKELY(Status.IsFail())) { return; } - ParsedRows.emplace_back(rowId); + ParsedRows[ParsedRowsCount++] = rowId; if (DataSlot != NYql::NUdf::EDataSlot::Json) { ParseDataType(std::move(jsonValue), resultValue, Status); @@ -126,17 +127,17 @@ class TColumnParser { } } - void ValidateNumberValues(size_t expectedNumberValues, const TVector& offsets) { + void ValidateNumberValues(ui16 expectedNumberValues, const TVector& offsets) { if (Status.IsFail()) { return; } - if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { - Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets)); + if (Y_UNLIKELY(!IsOptional && ParsedRowsCount < expectedNumberValues)) { + Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRowsCount << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets)); } } void ClearParsedRows() { - ParsedRows.clear(); + ParsedRowsCount = 0; Status = TStatus::Success(); } @@ -318,7 +319,8 @@ class TColumnParser { TString DataTypeName; bool IsOptional = false; - TVector ParsedRows; + ui16 ParsedRowsCount = 0; + std::span ParsedRows; TStatus Status = TStatus::Success(); }; @@ -327,6 +329,9 @@ class TJsonParser : public TTopicParserBase { using TBase = TTopicParserBase; using TPtr = TIntrusivePtr; + static constexpr ui64 NUMBER_ROWS_LIMIT = 1000; + static_assert(NUMBER_ROWS_LIMIT <= Max()); + public: TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) : TBase(std::move(consumer), __LOCATION__, config.FunctionRegistry, counters) @@ -343,6 +348,10 @@ class TJsonParser : public TTopicParserBase { TStatus InitColumnsParsers() { const auto& consumerColumns = Consumer->GetColumns(); + + ParsedRowsIdxBuffer.resize(consumerColumns.size() * MaxNumberRows); + const std::span parsedRowsIdxSpan(ParsedRowsIdxBuffer); + Columns.resize(consumerColumns.size()); for (ui64 i = 0; i < consumerColumns.size(); ++i) { const auto& name = consumerColumns[i].Name; @@ -352,10 +361,11 @@ class TJsonParser : public TTopicParserBase { return TStatus(typeStatus).AddParentIssue(TStringBuilder() << "Failed to parse column '" << name << "' type " << typeYson); } - if (auto status = Columns[i].InitParser(name, typeYson, MaxNumberRows, typeStatus.DetachResult()); status.IsFail()) { + if (auto status = Columns[i].InitParser(name, typeYson, parsedRowsIdxSpan.subspan(i * MaxNumberRows, MaxNumberRows), typeStatus.DetachResult()); status.IsFail()) { return status.AddParentIssue(TStringBuilder() << "Failed to create parser for column '" << name << "' with type " << typeYson); } } + return TStatus::Success(); } @@ -413,11 +423,11 @@ class TJsonParser : public TTopicParserBase { return Buffer.Offsets; } - TValueStatus*> GetParsedColumn(ui64 columnId) const override { + TValueStatus> GetParsedColumn(ui64 columnId) override { if (auto status = Columns[columnId].GetStatus(); status.IsFail()) { return status; } - return &ParsedValues[columnId]; + return ParsedValues[columnId]; } protected: @@ -445,7 +455,7 @@ class TJsonParser : public TTopicParserBase { return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); } - size_t rowId = 0; + ui16 rowId = 0; for (auto document : documents) { if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets())); @@ -486,11 +496,17 @@ class TJsonParser : public TTopicParserBase { void ClearBuffer() override { for (size_t i = 0; i < Columns.size(); ++i) { auto& parsedColumn = ParsedValues[i]; - for (size_t rowId : Columns[i].GetParsedRows()) { - ClearObject(parsedColumn[rowId]); + + auto& column = Columns[i]; + const auto parsedRows = column.GetParsedRows(); + const auto parsedRowsCount = column.GetParsedRowsCount(); + for (ui16 rowId = 0; rowId < parsedRowsCount; ++rowId) { + ClearObject(parsedColumn[parsedRows[rowId]]); } - Columns[i].ClearParsedRows(); + + column.ClearParsedRows(); } + Buffer.Clear(); } @@ -507,27 +523,32 @@ class TJsonParser : public TTopicParserBase { ColumnsIndex.emplace(std::string_view(consumerColumns[i].Name), i); } + ParsedValuesBuffer.resize(consumerColumns.size() * MaxNumberRows); + const std::span valuesBufferSpan(ParsedValuesBuffer); + ParsedValues.resize(consumerColumns.size()); - for (auto& parseBuffer : ParsedValues) { - parseBuffer.resize(MaxNumberRows); + for (ui64 i = 0; i < consumerColumns.size(); ++i) { + ParsedValues[i] = valuesBufferSpan.subspan(i * MaxNumberRows, MaxNumberRows); } } - ui64 CalculateMaxNumberRows() const { - return (Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1; + ui16 CalculateMaxNumberRows() const { + return std::min((Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1, NUMBER_ROWS_LIMIT); } private: const TJsonParserConfig Config; - ui64 MaxNumberRows = 0; + ui16 MaxNumberRows = 0; const TString LogPrefix; TVector Columns; + TVector ParsedRowsIdxBuffer; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector> ParsedValues; + TVector ParsedValuesBuffer; + TVector> ParsedValues; }; } // anonymous namespace diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h index e078ac43e884..adc4dd474b5f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h @@ -18,7 +18,7 @@ struct TJsonParserConfig { const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; ui64 BatchSize = 1_MB; TDuration LatencyLimit; - ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit, amount memory size is O(BufferCellCount * log(BufferCellCount)) + ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit }; TValueStatus CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h index c2cc1dfe0e0a..253c2c41af7f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_abstract.h @@ -30,7 +30,7 @@ class ITopicParser : public TThrRefBase, public TNonCopyable { virtual TStatus ChangeConsumer(IParsedDataConsumer::TPtr consumer) = 0; virtual const TVector& GetOffsets() const = 0; - virtual TValueStatus*> GetParsedColumn(ui64 columnId) const = 0; + virtual TValueStatus> GetParsedColumn(ui64 columnId) = 0; virtual void FillStatistics(TFormatHandlerStatistic& statistic) = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp index 0283a3458b8b..076f947df226 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp @@ -83,9 +83,9 @@ class TRawParser : public TTopicParserBase { return Offsets; } - TValueStatus*> GetParsedColumn(ui64 columnId) const override { + TValueStatus> GetParsedColumn(ui64 columnId) override { Y_ENSURE(columnId == 0, "Invalid column id for raw parser"); - return &ParsedColumn; + return std::span(ParsedColumn); } protected: diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 29c3052961b6..3f4b5c8a996f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -172,41 +172,41 @@ class TFilterFixture : public TBaseFixture { Consumer.Reset(); } - void Push(const TVector*>& values, ui64 numberRows = 0) { + void Push(const TVector>& values, ui64 numberRows = 0) { for (const auto& [name, runHandler] : RunHandlers) { - runHandler->ProcessData(values, numberRows ? numberRows : values.front()->size()); + runHandler->ProcessData(values, numberRows ? numberRows : values.front().size()); } if (Consumer) { Consumer->OnBatchFinish(); } } - const TVector* MakeVector(size_t size, std::function valueCreator) { + std::span MakeVector(size_t size, std::function valueCreator) { with_lock (Alloc) { auto& holder = Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { holder.emplace_back(LockObject(valueCreator(i))); } - return &holder; + return holder; } } template - const TVector* MakeVector(const TVector& values, bool optional = false) { + std::span MakeVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod unboxedValue = NYql::NUdf::TUnboxedValuePod(values[i]); return optional ? unboxedValue.MakeOptional() : unboxedValue; }); } - const TVector* MakeStringVector(const TVector& values, bool optional = false) { + std::span MakeStringVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod stringValue = NKikimr::NMiniKQL::MakeString(values[i]); return optional ? stringValue.MakeOptional() : stringValue; }); } - const TVector* MakeEmptyVector(size_t size) { + std::span MakeEmptyVector(size_t size) { return MakeVector(size, [&](size_t) { return NYql::NUdf::TUnboxedValuePod(); }); @@ -310,8 +310,8 @@ class TFilterSetFixture : public TFilterFixture { Consumer.Reset(); } - void ProcessData(const TVector& columnIndex, const TVector*>& values, ui64 numberRows = 0) { - numberRows = numberRows ? numberRows : values.front()->size(); + void ProcessData(const TVector& columnIndex, const TVector>& values, ui64 numberRows = 0) { + numberRows = numberRows ? numberRows : values.front().size(); FiltersSet->ProcessData(columnIndex, TVector(numberRows, std::numeric_limits::max()), values, numberRows); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index 7593b4b44c2b..9dce3d661887 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -11,7 +11,7 @@ class TBaseParserFixture : public TBaseFixture { static constexpr ui64 FIRST_OFFSET = 42; using TBase = TBaseFixture; - using TCallback = std::function*> result)>; + using TCallback = std::function> result)>; class TParsedDataConsumer : public IParsedDataConsumer { public: @@ -60,7 +60,7 @@ class TBaseParserFixture : public TBaseFixture { CurrentOffset++; } - TVector*> result(Columns.size(), nullptr); + TVector> result(Columns.size()); for (ui64 i = 0; i < Columns.size(); ++i) { if (const auto it = ExpectedErrors.find(i); it != ExpectedErrors.end()) { CheckError(Self.Parser->GetParsedColumn(i), it->second.first, it->second.second); @@ -118,11 +118,11 @@ class TBaseParserFixture : public TBaseFixture { } TStatus MakeParser(TVector columnNames, TString columnType) { - return MakeParser(columnNames, columnType, [](ui64, TVector*>) {}); + return MakeParser(columnNames, columnType, [](ui64, TVector>) {}); } TStatus MakeParser(TVector columns) { - return MakeParser(columns, [](ui64, TVector*>) {}); + return MakeParser(columns, [](ui64, TVector>) {}); } void PushToParser(ui64 offset, const TString& data) { @@ -185,41 +185,41 @@ class TRawParserFixture : public TBaseParserFixture { Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(Simple1, TJsonParserFixture) { - CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(101, result[1]->at(0).GetOptionalValue().Get()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); } Y_UNIT_TEST_F(Simple2, TJsonParserFixture) { - CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple3, TJsonParserFixture) { - CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple4, TJsonParserFixture) { - CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a2", "a1"}, "[DataType; String]", [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } @@ -229,11 +229,11 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(1).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -246,12 +246,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(ManyValues, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1", "a2"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(3, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < numberRows; ++i) { - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0]->at(i).AsStringRef()), i); - UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1]->at(i).AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); } })); @@ -265,20 +265,20 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(MissingFields, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[OptionalType; [DataType; Uint64]]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(3, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < numberRows; ++i) { if (i == 2) { - UNIT_ASSERT_C(!result[0]->at(i), i); + UNIT_ASSERT_C(!result[0][i], i); } else { - NYql::NUdf::TUnboxedValue value = result[0]->at(i).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); } if (i == 1) { - UNIT_ASSERT_C(!result[1]->at(i), i); + UNIT_ASSERT_C(!result[1][i], i); } else { - UNIT_ASSERT_VALUES_EQUAL_C(101, result[1]->at(i).GetOptionalValue().Get(), i); + UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); } } })); @@ -293,21 +293,21 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(NestedTypes, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"nested", "[OptionalType; [DataType; Json]]"}, {"a1", "[DataType; String]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"nested", "[OptionalType; [DataType; Json]]"}, {"a1", "[DataType; String]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(4, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0]->at(0).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0]->at(1).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1]->at(1).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0]->at(2).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1]->at(2).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0]->at(3).AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1]->at(3).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); })); Parser->ParseMessages({ @@ -321,12 +321,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(SimpleBooleans, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); })); Parser->ParseMessages({ @@ -338,12 +338,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(ChangeParserSchema, TJsonParserFixture) { ExpectedBatches = 1; - CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"a", "[DataType; Bool]"}}, [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); })); Parser->ParseMessages({ @@ -354,14 +354,14 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"a", "[DataType; Bool]"}, {"b", "[DataType; Int64]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0]->at(1).Get()); - UNIT_ASSERT_VALUES_EQUAL(42, result[1]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(84, result[1]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + UNIT_ASSERT_VALUES_EQUAL(42, result[1][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(84, result[1][1].Get()); } ))); @@ -373,12 +373,12 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"b", "[DataType; Int64]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(42, result[0]->at(0).Get()); - UNIT_ASSERT_VALUES_EQUAL(84, result[0]->at(1).Get()); + UNIT_ASSERT_VALUES_EQUAL(42, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(84, result[0][1].Get()); } ))); @@ -393,10 +393,10 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Config.BufferCellCount = 1; const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -411,10 +411,10 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Config.BatchSize = 10; const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"col"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); })); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; @@ -480,11 +480,11 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_SUITE(TestRawParser) { Y_UNIT_TEST_F(Simple, TRawParserFixture) { - CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][0].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})", TString(value.AsStringRef())); })); PushToParser(FIRST_OFFSET, R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})"); @@ -499,10 +499,10 @@ Y_UNIT_TEST_SUITE(TestRawParser) { ExpectedBatches = data.size(); int i = 0; - CheckSuccess(MakeParser({"a1"}, "[DataType; String]", [&](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({"a1"}, "[DataType; String]", [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(data[i], TString(result[0]->at(0).AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(data[i], TString(result[0][0].AsStringRef())); i++; })); @@ -514,11 +514,11 @@ Y_UNIT_TEST_SUITE(TestRawParser) { } Y_UNIT_TEST_F(ChangeParserSchema, TRawParserFixture) { - CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector*> result) { + CheckSuccess(MakeParser({{"data", "[OptionalType; [DataType; String]]"}}, [](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0).GetOptionalValue(); + NYql::NUdf::TUnboxedValue value = result[0][0].GetOptionalValue(); UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello1__large_str", "a2": 101, "event": "event1"})", TString(value.AsStringRef())); })); @@ -528,11 +528,11 @@ Y_UNIT_TEST_SUITE(TestRawParser) { CheckSuccess(Parser->ChangeConsumer(MakeIntrusive( *this, TVector{{"data", "[DataType; String]"}}, - [&](ui64 numberRows, TVector*> result) { + [&](ui64 numberRows, TVector> result) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - NYql::NUdf::TUnboxedValue value = result[0]->at(0); + NYql::NUdf::TUnboxedValue value = result[0][0]; UNIT_ASSERT_VALUES_EQUAL(R"({"a1": "hello2__large_str", "a2": 101, "event": "event2"})", TString(value.AsStringRef())); } )));