diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 51ad1ecf66df..1aca65d4b34d 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -3012,6 +3012,89 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set"); } + Y_UNIT_TEST(CdcVersionSync) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create first table with index + CreateShardedTable(server, edgeActor, "/Root", "Table1", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"val1", "Uint32", false, false} + }) + .Indexes({ + {"idx1", {"val1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Create second table with different index + CreateShardedTable(server, edgeActor, "/Root", "Table2", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"val2", "Uint32", false, false} + }) + .Indexes({ + {"idx2", {"val2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Insert data into both tables + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, val1) VALUES (1, 100), (2, 200); + UPSERT INTO `/Root/Table2` (key, val2) VALUES (1, 1000), (2, 2000); + )"); + + // Create backup collection with both tables + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MultiTableCollection` + ( TABLE `/Root/Table1` + , TABLE `/Root/Table2` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Full backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + // Modify both tables + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, val1) VALUES (3, 300); + UPSERT INTO `/Root/Table2` (key, val2) VALUES (3, 3000); + )"); + + // Incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Capture expected states + ExecSQL(server, edgeActor, R"( + SELECT key, val1 FROM `/Root/Table1` ORDER BY key + )"); + + ExecSQL(server, edgeActor, R"( + SELECT key, val2 FROM `/Root/Table2` ORDER BY key + )"); + + // Drop both tables + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp index d8e4566bfebb..f1690f31f9b6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp @@ -9,6 +9,8 @@ namespace NKikimr::NSchemeShard::NCdcStreamState { namespace { +constexpr const char* CONTINUOUS_BACKUP_SUFFIX = "_continuousBackupImpl"; + bool IsExpectedTxType(TTxState::ETxType txType) { switch (txType) { case TTxState::TxCreateCdcStreamAtTable: @@ -24,11 +26,206 @@ bool IsExpectedTxType(TTxState::ETxType txType) { } } +bool IsContinuousBackupStream(const TString& streamName) { + return streamName.EndsWith(CONTINUOUS_BACKUP_SUFFIX); +} + +struct TTableVersionContext { + TPathId PathId; + TPathId ParentPathId; + TPathId GrandParentPathId; + bool IsIndexImplTable = false; + bool IsContinuousBackupStream = false; + bool IsPartOfContinuousBackup = false; +}; + +bool DetectContinuousBackupStream(const TTxState& txState, TOperationContext& context) { + if (!txState.CdcPathId || !context.SS->PathsById.contains(txState.CdcPathId)) { + return false; + } + + auto cdcPath = context.SS->PathsById.at(txState.CdcPathId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Checking CDC stream name" + << ", cdcPathId: " << txState.CdcPathId + << ", streamName: " << cdcPath->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); + + return IsContinuousBackupStream(cdcPath->Name); +} + +bool DetectIndexImplTable(TPathElement::TPtr path, TOperationContext& context, TPathId& outGrandParentPathId) { + const TPathId& parentPathId = path->ParentPathId; + if (!parentPathId || !context.SS->PathsById.contains(parentPathId)) { + return false; + } + + auto parentPath = context.SS->PathsById.at(parentPathId); + if (parentPath->IsTableIndex()) { + outGrandParentPathId = parentPath->ParentPathId; + return true; + } + + return false; +} + +bool HasParentContinuousBackup(const TPathId& grandParentPathId, TOperationContext& context) { + if (!grandParentPathId || !context.SS->PathsById.contains(grandParentPathId)) { + return false; + } + + auto grandParentPath = context.SS->PathsById.at(grandParentPathId); + for (const auto& [childName, childPathId] : grandParentPath->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + if (childPath->IsCdcStream() && IsContinuousBackupStream(childName)) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Detected continuous backup via parent table CDC stream" + << ", parentTablePathId: " << grandParentPathId + << ", cdcStreamName: " << childName + << ", at schemeshard: " << context.SS->SelfTabletId()); + return true; + } + } + + return false; +} + +TTableVersionContext BuildTableVersionContext( + const TTxState& txState, + TPathElement::TPtr path, + TOperationContext& context) +{ + TTableVersionContext ctx; + ctx.PathId = txState.TargetPathId; + ctx.ParentPathId = path->ParentPathId; + ctx.IsContinuousBackupStream = DetectContinuousBackupStream(txState, context); + ctx.IsIndexImplTable = DetectIndexImplTable(path, context, ctx.GrandParentPathId); + + // Check if impl table is part of continuous backup + if (ctx.IsIndexImplTable) { + ctx.IsPartOfContinuousBackup = HasParentContinuousBackup(ctx.GrandParentPathId, context); + } else { + ctx.IsPartOfContinuousBackup = ctx.IsContinuousBackupStream; + } + + return ctx; +} + +void SyncImplTableVersion( + const TTableVersionContext& versionCtx, + TTableInfo::TPtr& table, + TOperationContext& context) +{ + Y_ABORT_UNLESS(context.SS->Tables.contains(versionCtx.GrandParentPathId)); + auto parentTable = context.SS->Tables.at(versionCtx.GrandParentPathId); + + ui64 currentImplVersion = table->AlterVersion; + ui64 currentParentVersion = parentTable->AlterVersion; + + if (currentImplVersion <= currentParentVersion) { + table->AlterVersion = currentParentVersion; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Synchronized index impl table version to parent table" + << ", implTablePathId: " << versionCtx.PathId + << ", parentTablePathId: " << versionCtx.GrandParentPathId + << ", oldImplVersion: " << currentImplVersion + << ", parentVersion: " << currentParentVersion + << ", newImplVersion: " << table->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } else { + table->AlterVersion += 1; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "WARNING: Impl table version ahead of parent, incrementing" + << ", implTablePathId: " << versionCtx.PathId + << ", implVersion: " << currentImplVersion + << ", parentVersion: " << currentParentVersion + << ", newImplVersion: " << table->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } +} + +void SyncIndexEntityVersion( + const TPathId& indexPathId, + ui64 targetVersion, + TOperationId operationId, + TOperationContext& context, + NIceDb::TNiceDb& db) +{ + if (!context.SS->Indexes.contains(indexPathId)) { + return; + } + + auto index = context.SS->Indexes.at(indexPathId); + index->AlterVersion = targetVersion; + + context.SS->PersistTableIndexAlterVersion(db, indexPathId, index); + + auto indexPath = context.SS->PathsById.at(indexPathId); + context.SS->ClearDescribePathCaches(indexPath); + context.OnComplete.PublishToSchemeBoard(operationId, indexPathId); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Synced index entity version" + << ", indexPathId: " << indexPathId + << ", newVersion: " << index->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); +} + +void SyncChildIndexes( + TPathElement::TPtr parentPath, + ui64 targetVersion, + TOperationId operationId, + TOperationContext& context, + NIceDb::TNiceDb& db) +{ + for (const auto& [childName, childPathId] : parentPath->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children and deleted indexes + if (!childPath->IsTableIndex() || childPath->Dropped()) { + continue; + } + + SyncIndexEntityVersion(childPathId, targetVersion, operationId, context, db); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Synced parent index version with parent table" + << ", parentTable: " << parentPath->Name + << ", indexName: " << childName + << ", indexPathId: " << childPathId + << ", newVersion: " << targetVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } +} + +void UpdateTableVersion( + const TTableVersionContext& versionCtx, + TTableInfo::TPtr& table, + TOperationId operationId, + TOperationContext& context, + NIceDb::TNiceDb& db) +{ + if (versionCtx.IsPartOfContinuousBackup && versionCtx.IsIndexImplTable && + versionCtx.GrandParentPathId && context.SS->Tables.contains(versionCtx.GrandParentPathId)) { + + SyncImplTableVersion(versionCtx, table, context); + + SyncIndexEntityVersion(versionCtx.ParentPathId, table->AlterVersion, operationId, context, db); + } else { + table->AlterVersion += 1; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Incremented table version" + << ", pathId: " << versionCtx.PathId + << ", newVersion: " << table->AlterVersion + << ", isIndexImpl: " << (versionCtx.IsIndexImplTable ? "yes" : "no") + << ", isContinuousBackup: " << (versionCtx.IsPartOfContinuousBackup ? "yes" : "no") + << ", at schemeshard: " << context.SS->SelfTabletId()); + } +} + } // namespace anonymous -// NCdcStreamState::TConfigurePartsAtTable -// TConfigurePartsAtTable::TConfigurePartsAtTable(TOperationId id) : OperationId(id) { @@ -80,8 +277,6 @@ bool TConfigurePartsAtTable::HandleReply(TEvDataShard::TEvProposeTransactionResu } -// NCdcStreamState::TProposeAtTable -// TProposeAtTable::TProposeAtTable(TOperationId id) : OperationId(id) { @@ -124,59 +319,16 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); auto table = context.SS->Tables.at(pathId); - table->AlterVersion += 1; - NIceDb::TNiceDb db(context.GetDB()); - bool isContinuousBackupStream = false; - if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) { - auto cdcPath = context.SS->PathsById.at(txState->CdcPathId); - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " Checking CDC stream name" - << ", cdcPathId: " << txState->CdcPathId - << ", streamName: " << cdcPath->Name - << ", at schemeshard: " << context.SS->SelfTabletId()); - if (cdcPath->Name.EndsWith("_continuousBackupImpl")) { - isContinuousBackupStream = true; - } - } else { - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " CdcPathId not found" - << ", cdcPathId: " << txState->CdcPathId - << ", at schemeshard: " << context.SS->SelfTabletId()); - } - - // Check if this is an index implementation table - // If so, we need to sync the parent index version to match the impl table version - // Do this ONLY for continuous backup operations - TPathId parentPathId = path->ParentPathId; - if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) { - auto parentPath = context.SS->PathsById.at(parentPathId); - if (parentPath->IsTableIndex()) { - Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId)); - auto index = context.SS->Indexes.at(parentPathId); - - index->AlterVersion = table->AlterVersion; - - // Persist the index version update directly to database - db.Table().Key(parentPathId.LocalPathId).Update( - NIceDb::TUpdate(index->AlterVersion) - ); - - context.SS->ClearDescribePathCaches(parentPath); - context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId); + auto versionCtx = BuildTableVersionContext(*txState, path, context); + UpdateTableVersion(versionCtx, table, OperationId, context, db); - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - DebugHint() << " Synced parent index version with impl table" - << ", indexPathId: " << parentPathId - << ", indexName: " << parentPath->Name - << ", newVersion: " << index->AlterVersion - << ", at schemeshard: " << context.SS->SelfTabletId()); - } + if (versionCtx.IsContinuousBackupStream && !versionCtx.IsIndexImplTable) { + SyncChildIndexes(path, table->AlterVersion, OperationId, context, db); } context.SS->PersistTableAlterVersion(db, pathId, table); - context.SS->ClearDescribePathCaches(path); context.OnComplete.PublishToSchemeBoard(OperationId, pathId); @@ -195,8 +347,6 @@ bool TProposeAtTable::HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOpe } -// NCdcStreamState::TProposeAtTableDropSnapshot -// bool TProposeAtTableDropSnapshot::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) { TProposeAtTable::HandleReply(ev, context); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 0ced223da83a..fddcd9572ff0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1959,6 +1959,12 @@ void TSchemeShard::PersistTableIndexAlterData(NIceDb::TNiceDb& db, const TPathId } } +void TSchemeShard::PersistTableIndexAlterVersion(NIceDb::TNiceDb& db, const TPathId& pathId, const TTableIndexInfo::TPtr indexInfo) { + db.Table().Key(pathId.LocalPathId).Update( + NIceDb::TUpdate(indexInfo->AlterVersion) + ); +} + void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId) { Y_ABORT_UNLESS(PathsById.contains(pathId)); auto path = PathsById.at(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 226abf154171..64617ce216a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -725,6 +725,7 @@ class TSchemeShard // table index void PersistTableIndex(NIceDb::TNiceDb& db, const TPathId& pathId); void PersistTableIndexAlterData(NIceDb::TNiceDb& db, const TPathId& pathId); + void PersistTableIndexAlterVersion(NIceDb::TNiceDb& db, const TPathId& pathId, const TTableIndexInfo::TPtr indexInfo); // cdc stream void PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId);