Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TTopicFilters : public ITopicFilters {
, Counters_(std::move(counters))
{}

void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) override {
void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) override {
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << RunHandlers_.size() << " clients, number rows: " << numberRows);

if (!numberRows) {
Expand Down Expand Up @@ -220,18 +220,18 @@ class TTopicFilters : public ITopicFilters {
RunHandlers_.erase(iter);
}

void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) {
const auto consumer = programRunHandler->GetConsumer();
const auto& columnIds = consumer->GetColumnIds();

TVector<const TVector<NYql::NUdf::TUnboxedValue>*> result;
TVector<std::span<NYql::NUdf::TUnboxedValue>> result;
result.reserve(columnIds.size());
for (ui64 columnId : columnIds) {
Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size());
const ui64 index = columnIndex[columnId];

Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size());
if (const auto value = values[index]) {
if (const auto value = values[index]; !value.empty()) {
result.emplace_back(value);
} else {
LOG_ROW_DISPATCHER_TRACE("Ignore processing for " << consumer->GetClientId() << ", client got parsing error for column " << columnId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ITopicFilters : public TThrRefBase, public TNonCopyable {

public:
// columnIndex - mapping from stable column id to index in values array
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) = 0;
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) = 0;
virtual void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& ev) = 0;

virtual TStatus AddPrograms(IProcessedDataConsumer::TPtr consumer, std::unordered_map<TString, IProgramHolder::TPtr> programHolders) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ NYT::TNode MakeWatermarkOutputSchema() {
}

struct TInputType {
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
const TVector<std::span<NYql::NUdf::TUnboxedValue>>& Values;
ui64 NumberRows;
};

Expand Down Expand Up @@ -156,8 +156,9 @@ class TInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(rowId);

for (ui64 fieldId = 0; const auto column : input.Values) {
items[FieldsPositions[fieldId++]] = column->at(rowId);
for (ui64 fieldId = 0; const auto& column : input.Values) {
Y_DEBUG_ABORT_UNLESS(column.size() > rowId);
items[FieldsPositions[fieldId++]] = column[rowId];
}

Worker->Push(std::move(result));
Expand Down Expand Up @@ -419,7 +420,7 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable
ActiveFilters_->Dec();
}

void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const override {
void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const override {
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << numberRows << " rows");

if (!ProgramHolder_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class IProgramRunHandler : public TThrRefBase {
return ProgramHolder_;
}

virtual void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const = 0;
virtual void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const = 0;

protected:
TString Name_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
void OnParsedData(ui64 numberRows) override {
LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows);

Self.ParsedData.assign(ParerSchema.size(), nullptr);
Self.ParsedData.assign(ParerSchema.size(), std::span<NYql::NUdf::TUnboxedValue>());
for (size_t i = 0; i < ParerSchema.size(); ++i) {
auto columnStatus = Self.Parser->GetParsedColumn(i);
if (Y_LIKELY(columnStatus.IsSuccess())) {
Expand Down Expand Up @@ -267,8 +267,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
};

for (size_t i = 0; const ui64 columnId : ColumnsIds) {
auto& parsedData = Self.ParsedData[Self.ParserSchemaIndex[columnId]];
Y_DEBUG_ABORT_UNLESS(parsedData.size() > rowId);

// All data was locked in parser, so copy is safe
FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId);
FilteredRow[i++] = parsedData[rowId];
}
DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size());

Expand Down Expand Up @@ -661,7 +664,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

// Parsed data
const TVector<ui64>* Offsets;
TVector<const TVector<NYql::NUdf::TUnboxedValue>*> ParsedData;
TVector<std::span<NYql::NUdf::TUnboxedValue>> ParsedData;
bool RefreshScheduled = false;

// Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace {
if (Y_UNLIKELY(error)) \

struct TJsonParserBuffer {
size_t NumberValues = 0;
ui16 NumberValues = 0;
bool Finished = false;
TInstant CreationStartTime = TInstant::Now();
TVector<ui64> Offsets = {};
Expand Down Expand Up @@ -84,32 +84,33 @@ class TColumnParser {
TString TypeYson;

public:
TStatus InitParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, const NKikimr::NMiniKQL::TType* typeMkql) {
TStatus InitParser(const TString& name, const TString& typeYson, std::span<ui16> parsedRows, const NKikimr::NMiniKQL::TType* typeMkql) {
Name = name;
TypeYson = typeYson;
IsOptional = false;
Status = TStatus::Success();

if (2 * ParsedRows.capacity() < maxNumberRows) {
ParsedRows.reserve(maxNumberRows);
}

ParsedRowsCount = 0;
ParsedRows = parsedRows;
return Status = ExtractDataSlot(typeMkql);
}

const TVector<ui64>& GetParsedRows() const {
ui16 GetParsedRowsCount() const {
return ParsedRowsCount;
}

const std::span<ui16>& GetParsedRows() const {
return ParsedRows;
}

TStatus GetStatus() const {
return Status;
}

void ParseJsonValue(ui64 offset, ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
void ParseJsonValue(ui64 offset, ui16 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
if (Y_UNLIKELY(Status.IsFail())) {
return;
}
ParsedRows.emplace_back(rowId);
ParsedRows[ParsedRowsCount++] = rowId;

if (DataSlot != NYql::NUdf::EDataSlot::Json) {
ParseDataType(std::move(jsonValue), resultValue, Status);
Expand All @@ -126,17 +127,17 @@ class TColumnParser {
}
}

void ValidateNumberValues(size_t expectedNumberValues, const TVector<ui64>& offsets) {
void ValidateNumberValues(ui16 expectedNumberValues, const TVector<ui64>& offsets) {
if (Status.IsFail()) {
return;
}
if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) {
Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets));
if (Y_UNLIKELY(!IsOptional && ParsedRowsCount < expectedNumberValues)) {
Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRowsCount << " missing values in non optional column '" << Name << "' with type " << TypeYson << ", buffered offsets: " << JoinSeq(' ' , offsets));
}
}

void ClearParsedRows() {
ParsedRows.clear();
ParsedRowsCount = 0;
Status = TStatus::Success();
}

Expand Down Expand Up @@ -318,7 +319,8 @@ class TColumnParser {
TString DataTypeName;
bool IsOptional = false;

TVector<ui64> ParsedRows;
ui16 ParsedRowsCount = 0;
std::span<ui16> ParsedRows;
TStatus Status = TStatus::Success();
};

Expand All @@ -327,6 +329,9 @@ class TJsonParser : public TTopicParserBase {
using TBase = TTopicParserBase;
using TPtr = TIntrusivePtr<TJsonParser>;

static constexpr ui64 NUMBER_ROWS_LIMIT = 1000;
static_assert(NUMBER_ROWS_LIMIT <= Max<uint16_t>());

public:
TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters)
: TBase(std::move(consumer), __LOCATION__, config.FunctionRegistry, counters)
Expand All @@ -343,6 +348,10 @@ class TJsonParser : public TTopicParserBase {

TStatus InitColumnsParsers() {
const auto& consumerColumns = Consumer->GetColumns();

ParsedRowsIdxBuffer.resize(consumerColumns.size() * MaxNumberRows);
const std::span parsedRowsIdxSpan(ParsedRowsIdxBuffer);

Columns.resize(consumerColumns.size());
for (ui64 i = 0; i < consumerColumns.size(); ++i) {
const auto& name = consumerColumns[i].Name;
Expand All @@ -352,10 +361,11 @@ class TJsonParser : public TTopicParserBase {
return TStatus(typeStatus).AddParentIssue(TStringBuilder() << "Failed to parse column '" << name << "' type " << typeYson);
}

if (auto status = Columns[i].InitParser(name, typeYson, MaxNumberRows, typeStatus.DetachResult()); status.IsFail()) {
if (auto status = Columns[i].InitParser(name, typeYson, parsedRowsIdxSpan.subspan(i * MaxNumberRows, MaxNumberRows), typeStatus.DetachResult()); status.IsFail()) {
return status.AddParentIssue(TStringBuilder() << "Failed to create parser for column '" << name << "' with type " << typeYson);
}
}

return TStatus::Success();
}

Expand Down Expand Up @@ -413,11 +423,11 @@ class TJsonParser : public TTopicParserBase {
return Buffer.Offsets;
}

TValueStatus<const TVector<NYql::NUdf::TUnboxedValue>*> GetParsedColumn(ui64 columnId) const override {
TValueStatus<std::span<NYql::NUdf::TUnboxedValue>> GetParsedColumn(ui64 columnId) override {
if (auto status = Columns[columnId].GetStatus(); status.IsFail()) {
return status;
}
return &ParsedValues[columnId];
return ParsedValues[columnId];
}

protected:
Expand Down Expand Up @@ -445,7 +455,7 @@ class TJsonParser : public TTopicParserBase {
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets()));
}

size_t rowId = 0;
ui16 rowId = 0;
for (auto document : documents) {
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)) << ", buffered offsets: " << JoinSeq(' ', GetOffsets()));
Expand Down Expand Up @@ -486,11 +496,17 @@ class TJsonParser : public TTopicParserBase {
void ClearBuffer() override {
for (size_t i = 0; i < Columns.size(); ++i) {
auto& parsedColumn = ParsedValues[i];
for (size_t rowId : Columns[i].GetParsedRows()) {
ClearObject(parsedColumn[rowId]);

auto& column = Columns[i];
const auto parsedRows = column.GetParsedRows();
const auto parsedRowsCount = column.GetParsedRowsCount();
for (ui16 rowId = 0; rowId < parsedRowsCount; ++rowId) {
ClearObject(parsedColumn[parsedRows[rowId]]);
}
Columns[i].ClearParsedRows();

column.ClearParsedRows();
}

Buffer.Clear();
}

Expand All @@ -507,27 +523,32 @@ class TJsonParser : public TTopicParserBase {
ColumnsIndex.emplace(std::string_view(consumerColumns[i].Name), i);
}

ParsedValuesBuffer.resize(consumerColumns.size() * MaxNumberRows);
const std::span valuesBufferSpan(ParsedValuesBuffer);

ParsedValues.resize(consumerColumns.size());
for (auto& parseBuffer : ParsedValues) {
parseBuffer.resize(MaxNumberRows);
for (ui64 i = 0; i < consumerColumns.size(); ++i) {
ParsedValues[i] = valuesBufferSpan.subspan(i * MaxNumberRows, MaxNumberRows);
}
}

ui64 CalculateMaxNumberRows() const {
return (Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1;
ui16 CalculateMaxNumberRows() const {
return std::min((Config.BufferCellCount - 1) / Consumer->GetColumns().size() + 1, NUMBER_ROWS_LIMIT);
}

private:
const TJsonParserConfig Config;
ui64 MaxNumberRows = 0;
ui16 MaxNumberRows = 0;
const TString LogPrefix;

TVector<TColumnParser> Columns;
TVector<ui16> ParsedRowsIdxBuffer;
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;

TJsonParserBuffer Buffer;
simdjson::ondemand::parser Parser;
TVector<TVector<NYql::NUdf::TUnboxedValue>> ParsedValues;
TVector<NYql::NUdf::TUnboxedValue> ParsedValuesBuffer;
TVector<std::span<NYql::NUdf::TUnboxedValue>> ParsedValues;
};

} // anonymous namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct TJsonParserConfig {
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
ui64 BatchSize = 1_MB;
TDuration LatencyLimit;
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit, amount memory size is O(BufferCellCount * log(BufferCellCount))
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit
};

TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ITopicParser : public TThrRefBase, public TNonCopyable {
virtual TStatus ChangeConsumer(IParsedDataConsumer::TPtr consumer) = 0;

virtual const TVector<ui64>& GetOffsets() const = 0;
virtual TValueStatus<const TVector<NYql::NUdf::TUnboxedValue>*> GetParsedColumn(ui64 columnId) const = 0;
virtual TValueStatus<std::span<NYql::NUdf::TUnboxedValue>> GetParsedColumn(ui64 columnId) = 0;

virtual void FillStatistics(TFormatHandlerStatistic& statistic) = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class TRawParser : public TTopicParserBase {
return Offsets;
}

TValueStatus<const TVector<NYql::NUdf::TUnboxedValue>*> GetParsedColumn(ui64 columnId) const override {
TValueStatus<std::span<NYql::NUdf::TUnboxedValue>> GetParsedColumn(ui64 columnId) override {
Y_ENSURE(columnId == 0, "Invalid column id for raw parser");
return &ParsedColumn;
return std::span<NYql::NUdf::TUnboxedValue>(ParsedColumn);
}

protected:
Expand Down
Loading
Loading