Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/kqp/ut/scheme/kqp_scheme_fulltext_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Y_UNIT_TEST_SUITE(KqpSchemeFulltext) {
WITH (layout=flat, tokenizer=whitespace, use_filter_lowercase=true)
)";
auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
// TODO: implement build index
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ void TSchemeShard::Handle(TEvDataShard::TEvValidateUniqueIndexResponse::TPtr& ev
Execute(CreateTxReply(ev), ctx);
}

void TSchemeShard::Handle(TEvDataShard::TEvBuildFulltextIndexResponse::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxReply(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxBilling(ev), ctx);
}
Expand Down
143 changes: 109 additions & 34 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
: request.GetTabletId());
}

void FillBuildInfoColumns(TIndexBuildInfo& buildInfo, TTableColumns&& columns) {
buildInfo.FillIndexColumns.clear();
buildInfo.FillIndexColumns.reserve(columns.Keys.size());
for (const auto& x: columns.Keys) {
buildInfo.FillIndexColumns.emplace_back(x);
columns.Columns.erase(x);
}
buildInfo.FillDataColumns.clear();
buildInfo.FillDataColumns.reserve(columns.Columns.size());
for (const auto& x: columns.Columns) {
buildInfo.FillDataColumns.emplace_back(x);
}
}

void SendSampleKRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
Y_ENSURE(buildInfo.IsBuildVectorIndex());
auto ev = MakeHolder<TEvDataShard::TEvSampleKRequest>();
Expand Down Expand Up @@ -821,19 +835,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
buildInfo.TargetName = implTable.PathString();

const auto& implTableInfo = Self->Tables.at(implTable.Base()->PathId);
auto implTableColumns = NTableIndex::ExtractInfo(implTableInfo);
buildInfo.FillIndexColumns.clear();
buildInfo.FillIndexColumns.reserve(implTableColumns.Keys.size());
for (const auto& x: implTableColumns.Keys) {
buildInfo.FillIndexColumns.emplace_back(x);
implTableColumns.Columns.erase(x);
}
// TODO(mbkkt) why order doesn't matter?
buildInfo.FillDataColumns.clear();
buildInfo.FillDataColumns.reserve(implTableColumns.Columns.size());
for (const auto& x: implTableColumns.Columns) {
buildInfo.FillDataColumns.emplace_back(x);
}
FillBuildInfoColumns(buildInfo, NTableIndex::ExtractInfo(implTableInfo));
}
*ev->Record.MutableIndexColumns() = {
buildInfo.FillIndexColumns.begin(),
Expand Down Expand Up @@ -899,6 +901,34 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
LOG_N("TTxBuildProgress: TUploadSampleK: " << buildInfo);
}

void SendBuildFulltextIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
auto ev = MakeHolder<TEvDataShard::TEvBuildFulltextIndexRequest>();
ev->Record.SetId(ui64(BuildId));

buildInfo.TablePathId.ToProto(ev->Record.MutablePathId());

if (buildInfo.TargetName.empty()) {
TPath implTable = GetBuildPath(Self, buildInfo, NTableIndex::ImplTable);
buildInfo.TargetName = implTable.PathString();

const auto& implTableInfo = Self->Tables.at(implTable.Base()->PathId);
FillBuildInfoColumns(buildInfo, NTableIndex::ExtractInfo(implTableInfo));
}
ev->Record.SetIndexName(buildInfo.TargetName);
*ev->Record.MutableSettings() = std::get<NKikimrSchemeOp::TFulltextIndexDescription>(
buildInfo.SpecializedIndexDescription).GetSettings();
*ev->Record.MutableDataColumns() = {
buildInfo.FillDataColumns.begin(),
buildInfo.FillDataColumns.end()
};

auto shardId = FillScanRequestCommon(ev->Record, shardIdx, buildInfo);

LOG_N("TTxBuildProgress: TEvBuildFulltextIndexRequest: " << ev->Record.ShortDebugString());

ToTabletSend.emplace(shardId, std::move(ev));
}

void ClearAfterFill(const TActorContext& ctx, TIndexBuildInfo& buildInfo) {
buildInfo.DoneShards = {};
buildInfo.InProgressShards = {};
Expand Down Expand Up @@ -1328,11 +1358,27 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return true;
}

bool FillFulltextIndex(TIndexBuildInfo& buildInfo) {
LOG_D("FillFulltextIndex Start");

if (NoShardsAdded(buildInfo)) {
AddAllShards(buildInfo);
}

auto done = SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildFulltextIndexRequest(shardIdx, buildInfo); }) &&
buildInfo.DoneShards.size() == buildInfo.Shards.size();

if (done) {
LOG_D("FillFulltextIndex Done");
}

return done;
}

bool FillIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
// About Level == 1, for now build index impl tables don't need snapshot,
// for now build index impl tables don't need snapshot,
// because they're used only by build index
if (buildInfo.KMeans.Level == 1 && !buildInfo.SnapshotTxId) {
Y_ENSURE(!buildInfo.SnapshotStep);
if (!buildInfo.SnapshotTxId && GetShardsPath(buildInfo)->PathId == buildInfo.TablePathId) {
Y_ENSURE(Self->TablesWithSnapshots.contains(buildInfo.TablePathId));
Y_ENSURE(Self->TablesWithSnapshots.at(buildInfo.TablePathId) == buildInfo.InitiateTxId);

Expand All @@ -1357,8 +1403,10 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return FillVectorIndex(txc, buildInfo);
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
return FillPrefixedVectorIndex(txc, buildInfo);
case TIndexBuildInfo::EBuildKind::BuildFulltext:
return FillFulltextIndex(buildInfo);
default:
Y_ENSURE(false);
Y_ENSURE(false, buildInfo.InvalidBuildKind());
return true;
}
}
Expand Down Expand Up @@ -1541,7 +1589,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}
break;
case TIndexBuildInfo::EState::LockBuild:
Y_ENSURE(buildInfo.IsBuildVectorIndex() || buildInfo.IsValidatingUniqueIndex());
Y_ENSURE(buildInfo.IsBuildVectorIndex() && buildInfo.KMeans.Level > 1 || buildInfo.IsValidatingUniqueIndex());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А это почему, тут же вроде про фултекст пр?

Copy link
Member Author

@kunga kunga Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это я выносил GetShardsPath в котором внезапно случай вторичного индекса попадал в проверку buildInfo.KMeans.Level == 1

table = Self->Tables.at(buildInfo.TablePathId);

И искал другие такие сомнительные места, в итоге чуть подвинул где применяется переход в LockBuild, там теперь

https://github.com/kunga/ydb/blob/59de1ca68ec15687c9dbb38927b1f87365c1e7e9/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp#L1816

TPath path = GetShardsPath(buildInfo);
if (!path.IsLocked()) { // lock is needed to prevent table shards from being split
    Y_ENSURE(buildInfo.IsBuildVectorIndex() && buildInfo.KMeans.Level > 1);
    ChangeState(buildInfo.Id, TIndexBuildInfo::EState::LockBuild);
    Progress(buildInfo.Id);
    return false;
}

Поэтому тут симметричная проверка добавилась

if (buildInfo.ApplyTxId == InvalidTxId) {
AllocateTxId(BuildId);
} else if (buildInfo.ApplyTxStatus == NKikimrScheme::StatusSuccess) {
Expand Down Expand Up @@ -1722,6 +1770,28 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return TSerializedTableRange{{&from, 1}, false, {&to, 1}, true};
}

TPath GetShardsPath(TIndexBuildInfo& buildInfo) {
switch (buildInfo.BuildKind) {
case TIndexBuildInfo::EBuildKind::BuildSecondaryIndex:
case TIndexBuildInfo::EBuildKind::BuildColumns:
case TIndexBuildInfo::EBuildKind::BuildFulltext:
return TPath::Init(buildInfo.TablePathId, Self);
case TIndexBuildInfo::EBuildKind::BuildSecondaryUniqueIndex:
return buildInfo.IsValidatingUniqueIndex()
? GetBuildPath(Self, buildInfo, NTableIndex::ImplTable)
: TPath::Init(buildInfo.TablePathId, Self);
case TIndexBuildInfo::EBuildKind::BuildVectorIndex:
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
if (buildInfo.KMeans.Level == 1) {
return TPath::Init(buildInfo.TablePathId, Self);
} else {
return GetBuildPath(Self, buildInfo, buildInfo.KMeans.ReadFrom());
}
default:
Y_ENSURE(false, buildInfo.InvalidBuildKind());
}
}

bool InitiateShards(NIceDb::TNiceDb& db, TIndexBuildInfo& buildInfo) {
LOG_D("InitiateShards " << buildInfo.DebugString());

Expand All @@ -1730,23 +1800,17 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Y_ENSURE(buildInfo.InProgressShards.empty());
Y_ENSURE(buildInfo.DoneShards.empty());

TTableInfo::TPtr table;
if (buildInfo.IsValidatingUniqueIndex()) {
auto path = GetBuildPath(Self, buildInfo, NTableIndex::ImplTable);
table = Self->Tables.at(path->PathId);
} else if (buildInfo.KMeans.Level == 1) {
table = Self->Tables.at(buildInfo.TablePathId);
} else {
auto path = GetBuildPath(Self, buildInfo, buildInfo.KMeans.ReadFrom());
table = Self->Tables.at(path->PathId);

if (!path.IsLocked()) { // lock is needed to prevent table shards from being split
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::LockBuild);
Progress(buildInfo.Id);
return false;
}
Y_ENSURE(path.LockedBy() == buildInfo.LockTxId);
TPath path = GetShardsPath(buildInfo);
if (!path.IsLocked()) { // lock is needed to prevent table shards from being split
Y_ENSURE(buildInfo.IsBuildVectorIndex() && buildInfo.KMeans.Level > 1);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::LockBuild);
Progress(buildInfo.Id);
return false;
}
Y_ENSURE(path.LockedBy() == buildInfo.LockTxId);

TTableInfo::TPtr table = Self->Tables.at(path->PathId);

auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
// In case of unique index validation the real range will arrive after index validation for each shard:
// it will describe the first and the last index keys for further validation.
Expand Down Expand Up @@ -2259,6 +2323,13 @@ struct TSchemeShard::TIndexBuilder::TTxReplyValidateUniqueIndex: public TTxShard
}
};

struct TSchemeShard::TIndexBuilder::TTxReplyFulltextIndex: public TTxShardReply<TEvDataShard::TEvBuildFulltextIndexResponse> {
TTxReplyFulltextIndex(TSelf* self, TEvDataShard::TEvBuildFulltextIndexResponse::TPtr& response)
: TTxShardReply(self, TIndexBuildId(response->Get()->Record.GetId()), response)
{
}
};

struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TTxShardReply<TEvDataShard::TEvBuildIndexProgressResponse> {
explicit TTxReplyProgress(TSelf* self, TEvDataShard::TEvBuildIndexProgressResponse::TPtr& response)
: TTxShardReply(self, TIndexBuildId(response->Get()->Record.GetId()), response)
Expand Down Expand Up @@ -2750,6 +2821,10 @@ ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvValidateUniqueIndexRe
return new TIndexBuilder::TTxReplyValidateUniqueIndex(this, response);
}

ITransaction* TSchemeShard::CreateTxReply(TEvDataShard::TEvBuildFulltextIndexResponse::TPtr& response) {
return new TIndexBuilder::TTxReplyFulltextIndex(this, response);
}

ITransaction* TSchemeShard::CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId) {
return new TIndexBuilder::TTxReplyRetry(this, indexBuildId, tabletId);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5378,6 +5378,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvDataShard::TEvPrefixKMeansResponse, Handle);
HFuncTraced(TEvIndexBuilder::TEvUploadSampleKResponse, Handle);
HFuncTraced(TEvDataShard::TEvValidateUniqueIndexResponse, Handle);
HFuncTraced(TEvDataShard::TEvBuildFulltextIndexResponse, Handle);
// } // NIndexBuilder

//namespace NCdcStreamScan {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,7 @@ class TSchemeShard
struct TTxReplyPrefixKMeans;
struct TTxReplyUploadSample;
struct TTxReplyValidateUniqueIndex;
struct TTxReplyFulltextIndex;

struct TTxPipeReset;
struct TTxBilling;
Expand All @@ -1590,7 +1591,8 @@ class TSchemeShard
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvLocalKMeansResponse::TPtr& local);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvPrefixKMeansResponse::TPtr& prefix);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& upload);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvValidateUniqueIndexResponse::TPtr& progress);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvValidateUniqueIndexResponse::TPtr& response);
NTabletFlatExecutor::ITransaction* CreateTxReply(TEvDataShard::TEvBuildFulltextIndexResponse::TPtr& response);
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TIndexBuildId indexBuildId, TTabletId tabletId);
NTabletFlatExecutor::ITransaction* CreateTxBilling(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev);

Expand All @@ -1608,6 +1610,7 @@ class TSchemeShard
void Handle(TEvDataShard::TEvPrefixKMeansResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvIndexBuilder::TEvUploadSampleKResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvValidateUniqueIndexResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvBuildFulltextIndexResponse::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvIndexBuildingMakeABill::TPtr& ev, const TActorContext& ctx);

Expand Down
36 changes: 23 additions & 13 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3433,19 +3433,20 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
std::unique_ptr<NKikimr::NKMeans::IClusters> Clusters;

TString DebugString() const {
auto result = TStringBuilder() << BuildKind;
auto result = TStringBuilder() << BuildKind << " " << State << "/" << SubState << " ";

if (IsBuildVectorIndex()) {
result << " "
<< KMeans.DebugString() << ", "
result << KMeans.DebugString() << ", "
<< "{ Rows = " << Sample.Rows.size()
<< ", Sample = " << Sample.State
<< ", Clusters = " << Clusters->GetClusters().size() << " }, "
<< "{ Done = " << DoneShards.size()
<< ", ToUpload = " << ToUploadShards.size()
<< ", InProgress = " << InProgressShards.size() << " }";
<< ", Clusters = " << Clusters->GetClusters().size() << " }, ";
}

result
<< "{ Done = " << DoneShards.size()
<< ", ToUpload = " << ToUploadShards.size()
<< ", InProgress = " << InProgressShards.size() << " }";

return result;
}

Expand Down Expand Up @@ -3613,7 +3614,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
indexInfo->Billed.SetReadRows(row.template GetValueOrDefault<Schema::IndexBuild::ReadRowsBilled>(0));
indexInfo->Billed.SetReadBytes(row.template GetValueOrDefault<Schema::IndexBuild::ReadBytesBilled>(0));
indexInfo->Billed.SetCpuTimeUs(row.template GetValueOrDefault<Schema::IndexBuild::CpuTimeUsBilled>(0));
if (indexInfo->IsFillBuildIndex()) {
if (indexInfo->IsOldBuildIndex()) {
TMeteringStatsHelper::TryFixOldFormat(indexInfo->Billed);
}

Expand All @@ -3622,7 +3623,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
indexInfo->Processed.SetReadRows(row.template GetValueOrDefault<Schema::IndexBuild::ReadRowsProcessed>(0));
indexInfo->Processed.SetReadBytes(row.template GetValueOrDefault<Schema::IndexBuild::ReadBytesProcessed>(0));
indexInfo->Processed.SetCpuTimeUs(row.template GetValueOrDefault<Schema::IndexBuild::CpuTimeUsProcessed>(0));
if (indexInfo->IsFillBuildIndex()) {
if (indexInfo->IsOldBuildIndex()) {
TMeteringStatsHelper::TryFixOldFormat(indexInfo->Processed);
}

Expand Down Expand Up @@ -3702,7 +3703,7 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
shardStatus.Processed.SetReadRows(row.template GetValueOrDefault<Schema::IndexBuildShardStatus::ReadRowsProcessed>(0));
shardStatus.Processed.SetReadBytes(row.template GetValueOrDefault<Schema::IndexBuildShardStatus::ReadBytesProcessed>(0));
shardStatus.Processed.SetCpuTimeUs(row.template GetValueOrDefault<Schema::IndexBuildShardStatus::CpuTimeUsProcessed>(0));
if (IsFillBuildIndex()) {
if (IsOldBuildIndex()) {
TMeteringStatsHelper::TryFixOldFormat(shardStatus.Processed);
}
Processed += shardStatus.Processed;
Expand All @@ -3712,8 +3713,13 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return CancelRequested;
}

bool IsFillBuildIndex() const {
return IsBuildSecondaryIndex() || IsBuildSecondaryUniqueIndex() || IsBuildColumns();
bool IsOldBuildIndex() const {
return IsBuildSecondaryIndex() || IsBuildColumns();
}

TString InvalidBuildKind() {
return TStringBuilder() << "Invalid index build kind " << static_cast<int>(BuildKind)
<< " for index type " << static_cast<int>(IndexType);
}

bool IsBuildSecondaryIndex() const {
Expand All @@ -3732,8 +3738,12 @@ struct TIndexBuildInfo: public TSimpleRefCount<TIndexBuildInfo> {
return BuildKind == EBuildKind::BuildVectorIndex || IsBuildPrefixedVectorIndex();
}

bool IsBuildFulltextIndex() const {
return BuildKind == EBuildKind::BuildFulltext;
}

bool IsBuildIndex() const {
return IsBuildSecondaryIndex() || IsBuildSecondaryUniqueIndex() || IsBuildVectorIndex();
return IsBuildSecondaryIndex() || IsBuildSecondaryUniqueIndex() || IsBuildVectorIndex() || IsBuildFulltextIndex();
}

bool IsBuildColumns() const {
Expand Down
Loading
Loading