Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions ydb/core/base/table_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@ TVector<TString>::const_iterator IsUniq(const TVector<TString>& names) {
return names.end();
}

bool Contains(const TVector<TString>& names, TString str) {
return std::find(names.begin(), names.end(), str) != names.end();
}

namespace NKikimr {
namespace NTableIndex {

TTableColumns CalcTableImplDescription(const TTableColumns& table, const TIndexColumns& index) {
{
TString explain;
Y_ABORT_UNLESS(IsCompatibleIndex(table, index, explain), "explain is %s", explain.c_str());
}

TTableColumns CalcTableImplDescription(const NKikimrSchemeOp::EIndexType indexType, const TTableColumns& table, const TIndexColumns& index) {
TTableColumns result;

for (const auto& ik: index.KeyColumns) {
result.Keys.push_back(ik);
result.Columns.insert(ik);
if (indexType == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
result.Keys.push_back(NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn);
result.Columns.insert(NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn);
} else {
for (const auto& ik: index.KeyColumns) {
result.Keys.push_back(ik);
result.Columns.insert(ik);
}
}

for (const auto& tk: table.Keys) {
Expand All @@ -43,7 +47,9 @@ TTableColumns CalcTableImplDescription(const TTableColumns& table, const TIndexC
return result;
}

bool IsCompatibleIndex(const TTableColumns& table, const TIndexColumns& index, TString& explain) {
bool IsCompatibleIndex(const NKikimrSchemeOp::EIndexType indexType, const TTableColumns& table, const TIndexColumns& index, TString& explain) {
const bool isVectorIndex = indexType == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree;

{
auto brokenAt = IsUniq(table.Keys);
if (brokenAt != table.Keys.end()) {
Expand Down Expand Up @@ -71,6 +77,23 @@ bool IsCompatibleIndex(const TTableColumns& table, const TIndexColumns& index, T
}
}

if (isVectorIndex) {
if (index.KeyColumns.size() != 1) {
explain = "Only single key column is supported for vector index";
return false;
}

if (Contains(index.KeyColumns, NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn)) {
explain = TStringBuilder() << "Key column should not have a reserved name: " << NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn;
return false;
}

if (Contains(index.DataColumns, NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn)) {
explain = TStringBuilder() << "Data column should not have a reserved name: " << NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn;
return false;
}
}

THashSet<TString> indexKeys;

for (const auto& tableKeyName: table.Keys) {
Expand All @@ -84,7 +107,8 @@ bool IsCompatibleIndex(const TTableColumns& table, const TIndexColumns& index, T
}

for (const auto& indexKeyName: index.KeyColumns) {
indexKeys.insert(indexKeyName);
if (!isVectorIndex)
indexKeys.insert(indexKeyName);
if (!table.Columns.contains(indexKeyName)) {
explain = TStringBuilder()
<< "all index keys should be in table columns"
Expand All @@ -93,9 +117,9 @@ bool IsCompatibleIndex(const TTableColumns& table, const TIndexColumns& index, T
}
}

if (index.KeyColumns == table.Keys) {
if (index.KeyColumns == table.Keys && !isVectorIndex) {
explain = TStringBuilder()
<< "table and index keys are the same";
<< "table and index keys are the same";
return false;
}

Expand Down
7 changes: 5 additions & 2 deletions ydb/core/base/table_index.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once

#include "table_vector_index.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <util/generic/hash_set.h>
#include <util/generic/vector.h>
#include <util/generic/string.h>
Expand All @@ -18,8 +21,8 @@ struct TIndexColumns {
TVector<TString> DataColumns;
};

bool IsCompatibleIndex(const TTableColumns& table, const TIndexColumns& index, TString& explain);
TTableColumns CalcTableImplDescription(const TTableColumns& table, const TIndexColumns &index);
bool IsCompatibleIndex(const NKikimrSchemeOp::EIndexType indexType, const TTableColumns& table, const TIndexColumns& index, TString& explain);
TTableColumns CalcTableImplDescription(const NKikimrSchemeOp::EIndexType indexType, const TTableColumns& table, const TIndexColumns& index);

}
}
19 changes: 19 additions & 0 deletions ydb/core/base/table_vector_index.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

namespace NKikimr::NTableIndex::NTableVectorKmeansTreeIndex {

// Vector KmeansTree index tables description

// Levels table
inline constexpr const char* LevelTable = "indexImplLevelTable";
inline constexpr const char* LevelTable_ParentIdColumn = "-parent";
inline constexpr const char* LevelTable_IdColumn = "-id";
inline constexpr const char* LevelTable_EmbeddingColumn = "-embedding";

// Posting table
inline constexpr const char* PostingTable = "indexImplPostingTable";
inline constexpr const char* PostingTable_ParentIdColumn = "-parent";


}

13 changes: 1 addition & 12 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,18 +572,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
for (const auto& index : metadata->Indexes) {
auto indexDesc = schemeTx.MutableCreateIndexedTable()->AddIndexDescription();
indexDesc->SetName(index.Name);
switch (index.Type) {
case NYql::TIndexDescription::EType::GlobalSync:
indexDesc->SetType(NKikimrSchemeOp::EIndexType::EIndexTypeGlobal);
break;
case NYql::TIndexDescription::EType::GlobalAsync:
indexDesc->SetType(NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync);
break;
case NYql::TIndexDescription::EType::GlobalSyncUnique:
indexDesc->SetType(NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique);
break;
}

indexDesc->SetType(TIndexDescription::ConvertIndexType(index.Type));
indexDesc->SetState(static_cast<::NKikimrSchemeOp::EIndexState>(index.State));
for (const auto& col : index.KeyColumns) {
indexDesc->AddKeyColumnNames(col);
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/provider/yql_kikimr_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,14 @@ void IKikimrGateway::BuildIndexMetadata(TTableMetadataResult& loadTableMetadataR
for (size_t i = 0; i < indexesCount; i++) {
const auto& index = tableMetadata->Indexes[i];
auto indexTablePath = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(tableName, index.Name);
NKikimr::NTableIndex::TTableColumns indexTableColumns = NKikimr::NTableIndex::CalcTableImplDescription(
tableColumns,
NKikimr::NTableIndex::TIndexColumns{index.KeyColumns, {}});

NKikimr::NTableIndex::TIndexColumns indexColumns{index.KeyColumns, {}};

TString error;
NKikimrSchemeOp::EIndexType indexType = TIndexDescription::ConvertIndexType(index.Type);
YQL_ENSURE(IsCompatibleIndex(indexType, tableColumns, indexColumns, error), "Index is not compatible: " << error);

NKikimr::NTableIndex::TTableColumns indexTableColumns = NKikimr::NTableIndex::CalcTableImplDescription(indexType, tableColumns, indexColumns);

TKikimrTableMetadataPtr indexTableMetadata = new TKikimrTableMetadata(cluster, indexTablePath);
indexTableMetadata->DoesExist = true;
Expand Down
36 changes: 28 additions & 8 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct TIndexDescription {
GlobalSync = 0,
GlobalAsync = 1,
GlobalSyncUnique = 2,
GlobalSyncVectorKMeansTree = 3
};

// Index states here must be in sync with NKikimrSchemeOp::EIndexState protobuf
Expand Down Expand Up @@ -99,7 +100,7 @@ struct TIndexDescription {
: Name(index.GetName())
, KeyColumns(index.GetKeyColumnNames().begin(), index.GetKeyColumnNames().end())
, DataColumns(index.GetDataColumnNames().begin(), index.GetDataColumnNames().end())
, Type(ConvertIndexType(index))
, Type(ConvertIndexType(index.GetType()))
, State(static_cast<EIndexState>(index.GetState()))
, SchemaVersion(index.GetSchemaVersion())
, LocalPathId(index.GetLocalPathId())
Expand All @@ -117,15 +118,32 @@ struct TIndexDescription {
, PathOwnerId(message->GetPathOwnerId())
{}

static TIndexDescription::EType ConvertIndexType(const NKikimrSchemeOp::TIndexDescription& index) {
auto type = NYql::TIndexDescription::EType::GlobalSync;
if (index.GetType() == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync) {
type = NYql::TIndexDescription::EType::GlobalAsync;
} else if (index.GetType() == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique) {
type = NYql::TIndexDescription::EType::GlobalSyncUnique;
static TIndexDescription::EType ConvertIndexType(const NKikimrSchemeOp::EIndexType indexType) {
switch (indexType) {
case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal:
return TIndexDescription::EType::GlobalSync;
case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync:
return TIndexDescription::EType::GlobalAsync;
case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique:
return TIndexDescription::EType::GlobalSyncUnique;
case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree:
return TIndexDescription::EType::GlobalSyncVectorKMeansTree;
default:
YQL_ENSURE(false, "Unexpected NKikimrSchemeOp::EIndexType::EIndexTypeInvalid");
}
}

return type;
static NKikimrSchemeOp::EIndexType ConvertIndexType(const TIndexDescription::EType indexType) {
switch (indexType) {
case TIndexDescription::EType::GlobalSync:
return NKikimrSchemeOp::EIndexType::EIndexTypeGlobal;
case TIndexDescription::EType::GlobalAsync:
return NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync;
case TIndexDescription::EType::GlobalSyncUnique:
return NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique;
case NYql::TIndexDescription::EType::GlobalSyncVectorKMeansTree:
return NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree;
}
}

void ToMessage(NKikimrKqp::TIndexDescriptionProto* message) const {
Expand Down Expand Up @@ -160,6 +178,8 @@ struct TIndexDescription {
return true;
case EType::GlobalAsync:
return false;
case EType::GlobalSyncVectorKMeansTree:
return true;
}
}
};
Expand Down
85 changes: 85 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,21 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto indexDesc = describe.GetTableDescription();
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetMinPartitionsCount(), minPartitionsCount);
}

constexpr int partitionSizeMb = 555;
{
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_PARTITION_SIZE_MB %d;
)", partitionSizeMb)
).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
auto indexDesc = describe.GetTableDescription();
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), partitionSizeMb);
}
}

Y_UNIT_TEST(AlterIndexImplTable) {
Expand Down Expand Up @@ -2638,6 +2653,76 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(CreateTableWithVectorIndexPublicApi) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
{
auto builder = TTableBuilder()
.AddNullableColumn("Key", EPrimitiveType::Uint64)
.AddNullableColumn("Embedding", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
.AddVectorKMeansTreeSecondaryIndex("vector_idx", {"Embedding"},
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
1024});

auto result = session.CreateTable("/Root/TestTable", builder.Build()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

UNIT_ASSERT_VALUES_EQUAL(result.GetTableDescription().GetIndexDescriptions().size(), 1);
auto indexDesc = result.GetTableDescription().GetIndexDescriptions()[0];
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexName(), "vector_idx");
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexType(), EIndexType::GlobalVectorKMeansTree);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexColumns().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexColumns()[0], "Embedding");
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetDataColumns().size(), 0);
UNIT_ASSERT_VALUES_EQUAL(std::get<NYdb::NTable::TVectorIndexSettings::EDistance>(indexDesc.GetVectorIndexSettings()->Metric), NYdb::NTable::TVectorIndexSettings::EDistance::Cosine);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorDimension, 1024);
}
}

Y_UNIT_TEST(CreateTableWithVectorIndexCoveredPublicApi) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
{
auto builder = TTableBuilder()
.AddNullableColumn("Key", EPrimitiveType::Uint64)
.AddNullableColumn("Embedding", EPrimitiveType::String)
.AddNullableColumn("Covered", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
.AddVectorKMeansTreeSecondaryIndex("vector_idx", {"Embedding"}, {"Covered"},
{ NYdb::NTable::TVectorIndexSettings::EDistance::Cosine,
NYdb::NTable::TVectorIndexSettings::EVectorType::Float,
1024});

auto result = session.CreateTable("/Root/TestTable", builder.Build()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

UNIT_ASSERT_VALUES_EQUAL(result.GetTableDescription().GetIndexDescriptions().size(), 1);
auto indexDesc = result.GetTableDescription().GetIndexDescriptions()[0];
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexName(), "vector_idx");
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexType(), EIndexType::GlobalVectorKMeansTree);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexColumns().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetIndexColumns()[0], "Embedding");
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetDataColumns().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetDataColumns()[0], "Covered");
UNIT_ASSERT_VALUES_EQUAL(std::get<NYdb::NTable::TVectorIndexSettings::EDistance>(indexDesc.GetVectorIndexSettings()->Metric), NYdb::NTable::TVectorIndexSettings::EDistance::Cosine);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Float);
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetVectorIndexSettings()->VectorDimension, 1024);
}
}

Y_UNIT_TEST(AlterTableWithDecimalColumn) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "ydb/core/protos/follower_group.proto";
import "ydb/core/protos/blob_depot_config.proto";
import "ydb/public/api/protos/ydb_coordination.proto";
import "ydb/public/api/protos/ydb_export.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/public/api/protos/ydb_value.proto";
import "ydb/library/actors/protos/actors.proto";
import "ydb/library/mkql_proto/protos/minikql.proto";
Expand Down Expand Up @@ -973,6 +974,7 @@ enum EIndexType {
EIndexTypeGlobal = 1;
EIndexTypeGlobalAsync = 2;
EIndexTypeGlobalUnique = 3;
EIndexTypeGlobalVectorKmeansTree = 4;
}

enum EIndexState {
Expand All @@ -982,6 +984,10 @@ enum EIndexState {
EIndexStateWriteOnly = 3;
}

message TVectorIndexKmeansTreeDescription {
optional Ydb.Table.VectorIndexSettings Settings = 1;
}

message TIndexDescription {
optional string Name = 1;
optional uint64 LocalPathId = 2;
Expand All @@ -1000,6 +1006,10 @@ message TIndexDescription {
// DataSize + IndexSize of indexImplTable
optional uint64 DataSize = 9;
repeated TTableDescription IndexImplTableDescriptions = 10;

oneof SpecializedIndexDescription {
TVectorIndexKmeansTreeDescription VectorIndexKmeansTreeDescription = 11;
}
}

message TIndexCreationConfig {
Expand All @@ -1009,6 +1019,9 @@ message TIndexCreationConfig {
repeated TTableDescription IndexImplTableDescriptions = 4; //description for index impl tables
optional EIndexState State = 5; //state of index at the creation time
repeated string DataColumnNames = 6; //columns to be denormalized to read data just from index
oneof SpecializedIndexDescription {
TVectorIndexKmeansTreeDescription VectorIndexKmeansTreeDescription = 7;
}
}

message TIndexAlteringConfig {
Expand Down Expand Up @@ -1849,6 +1862,7 @@ enum EPathSubType {
EPathSubTypeSyncIndexImplTable = 1;
EPathSubTypeAsyncIndexImplTable = 2;
EPathSubTypeStreamImpl = 3;
EPathSubTypeVectorKmeansTreeIndexImplTable = 4;
}

enum EPathState {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_user_table.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/core/base/storage_pools.h>
#include <ydb/core/base/table_vector_index.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tablet_flat/flat_database.h>
#include <ydb/core/tablet_flat/flat_stat_table.h>
Expand Down
Loading