Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ydb/core/base/table_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ bool IsBuildImplTable(std::string_view tableName) {

namespace NKMeans {

inline constexpr TClusterId PostingParentFlag = (1ull << 63ull);

bool HasPostingParentFlag(TClusterId parent) {
return bool(parent & PostingParentFlag);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/base/table_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ inline constexpr const char* IdColumnSequence = "__ydb_id_sequence";

inline constexpr const int DefaultKMeansRounds = 3;

inline constexpr TClusterId PostingParentFlag = (1ull << 63ull);

bool HasPostingParentFlag(TClusterId parent);
void EnsureNoPostingParentFlag(TClusterId parent);
TClusterId SetPostingParentFlag(TClusterId parent);
Expand Down
48 changes: 42 additions & 6 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ TExprBase DoRewriteTopSortOverKMeansTree(
YQL_ENSURE(levelTableDesc->Metadata->Name.EndsWith(NTableIndex::NKMeans::LevelTable));
YQL_ENSURE(postingTableDesc->Metadata->Name.EndsWith(NTableIndex::NKMeans::PostingTable));

// TODO(mbkkt) It's kind of strange that almost everything here have same position
const auto pos = match.Pos();

const auto levelTable = BuildTableMeta(*levelTableDesc->Metadata, pos, ctx);
Expand Down Expand Up @@ -717,6 +716,33 @@ TExprBase DoRewriteTopSortOverKMeansTree(
return TExprBase{read};
}

template<typename T>
TExprBase FilterLeafRows(const TExprBase& read, TExprContext& ctx, TPositionHandle pos) {
auto leafFlag = Build<TCoUint64>(ctx, pos)
.Literal()
.Value(std::to_string(NTableIndex::NKMeans::PostingParentFlag)) // "9223372036854775808"
.Build()
.Done();
auto prefixRowArg = ctx.NewArgument(pos, "prefixRow");
auto prefixCluster = Build<TCoMember>(ctx, pos)
.Struct(prefixRowArg)
.Name().Build(NTableIndex::NKMeans::ParentColumn)
.Done();
return Build<TCoFlatMap>(ctx, pos)
.Input(read)
.Lambda()
.Args({prefixRowArg})
.Body<TCoOptionalIf>()
.Predicate<T>()
.Left(prefixCluster)
.Right(leafFlag)
.Build()
.Value(prefixRowArg)
.Build()
.Build()
.Done();
}

TExprBase DoRewriteTopSortOverPrefixedKMeansTree(
const TReadMatch& match, const TCoFlatMap& flatMap, const TExprBase& lambdaArgs, const TExprBase& lambdaBody, const TCoTopBase& top,
TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
Expand All @@ -732,7 +758,6 @@ TExprBase DoRewriteTopSortOverPrefixedKMeansTree(
YQL_ENSURE(postingTableDesc->Metadata->Name.EndsWith(NTableIndex::NKMeans::PostingTable));
YQL_ENSURE(prefixTableDesc->Metadata->Name.EndsWith(NTableIndex::NKMeans::PrefixTable));

// TODO(mbkkt) It's kind of strange that almost everything here have same position
const auto pos = match.Pos();

const auto levelTable = BuildTableMeta(*levelTableDesc->Metadata, pos, ctx);
Expand Down Expand Up @@ -776,18 +801,29 @@ TExprBase DoRewriteTopSortOverPrefixedKMeansTree(
.Lambda(prefixLambda)
.Done().Ptr();

read = Build<TDqPrecompute>(ctx, pos)
.Input(read)
.Done().Ptr();

RemapIdToParent(ctx, pos, read);

auto prefixLeafRows = FilterLeafRows<TCoCmpGreaterOrEqual>(TExprBase(read), ctx, pos);
auto prefixRootRows = FilterLeafRows<TCoCmpLess>(TExprBase(read), ctx, pos);

TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
read = Build<TKqlStreamLookupTable>(ctx, pos)
auto levelRows = Build<TKqlStreamLookupTable>(ctx, pos)
.Table(levelTable)
.LookupKeys(read)
.LookupKeys(prefixRootRows)
.Columns(levelColumns)
.Settings(settings.BuildNode(ctx, pos))
.Done().Ptr();
.Done().Ptr();
VectorReadLevel(indexDesc, ctx, pos, kqpCtx, levelLambda, top, levelTable, levelColumns, levelRows);

VectorReadLevel(indexDesc, ctx, pos, kqpCtx, levelLambda, top, levelTable, levelColumns, read);
read = Build<TCoUnionAll>(ctx, pos)
.Add(levelRows)
.Add(prefixLeafRows)
.Done().Ptr();

VectorReadMain(ctx, pos, postingTable, postingTableDesc->Metadata, mainTable, tableDesc.Metadata, mainColumns, read);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,9 @@ NYql::NNodes::TExprBase BuildVectorIndexPrefixRows(const NYql::TKikimrTableDescr
bool withData, const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows,
TVector<TStringBuf>& indexTableColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

std::pair<NYql::NNodes::TExprBase, NYql::NNodes::TExprBase> BuildVectorIndexPrefixRowsWithNew(
const NYql::TKikimrTableDescription& table, const NYql::TKikimrTableDescription& prefixTable,
const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows,
TVector<TStringBuf>& indexTableColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

} // NKikimr::NKqp::NOpt
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,14 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
// First resolve prefix IDs using StreamLookup
const auto& prefixTable = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, TStringBuilder() << insert.Table().Path().Value()
<< "/" << indexDesc->Name << "/" << NKikimr::NTableIndex::NKMeans::PrefixTable);
upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx);
if (prefixTable.Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, prefixTable, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx);
}
}
upsertIndexRows = BuildVectorIndexPostingRows(table, insert.Table(), indexDesc->Name, indexTableColumns,
upsertIndexRows, true, insert.Pos(), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,14 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,

if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
if (indexDesc->KeyColumns.size() > 1) {
upsertIndexRows = BuildVectorIndexPrefixRows(table, *prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
if (prefixTable->Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, *prefixTable, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, *prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, pos, ctx);
}
}

upsertIndexRows = BuildVectorIndexPostingRows(table, mainTableNode,
Expand Down
Loading
Loading