Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3012,6 +3012,89 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set");
}

Y_UNIT_TEST(CdcVersionSync) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
.SetEnableBackupService(true)
.SetEnableRealSystemViewPaths(false)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);

// Create first table with index
CreateShardedTable(server, edgeActor, "/Root", "Table1",
TShardedTableOptions()
.Columns({
{"key", "Uint32", true, false},
{"val1", "Uint32", false, false}
})
.Indexes({
{"idx1", {"val1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
}));

// Create second table with different index
CreateShardedTable(server, edgeActor, "/Root", "Table2",
TShardedTableOptions()
.Columns({
{"key", "Uint32", true, false},
{"val2", "Uint32", false, false}
})
.Indexes({
{"idx2", {"val2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
}));

// Insert data into both tables
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, val1) VALUES (1, 100), (2, 200);
UPSERT INTO `/Root/Table2` (key, val2) VALUES (1, 1000), (2, 2000);
)");

// Create backup collection with both tables
ExecSQL(server, edgeActor, R"(
CREATE BACKUP COLLECTION `MultiTableCollection`
( TABLE `/Root/Table1`
, TABLE `/Root/Table2`
)
WITH
( STORAGE = 'cluster'
, INCREMENTAL_BACKUP_ENABLED = 'true'
);
)", false);

// Full backup
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection`;)", false);
SimulateSleep(server, TDuration::Seconds(1));

// Modify both tables
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, val1) VALUES (3, 300);
UPSERT INTO `/Root/Table2` (key, val2) VALUES (3, 3000);
)");

// Incremental backup
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection` INCREMENTAL;)", false);
SimulateSleep(server, TDuration::Seconds(5));

// Capture expected states
ExecSQL(server, edgeActor, R"(
SELECT key, val1 FROM `/Root/Table1` ORDER BY key
)");

ExecSQL(server, edgeActor, R"(
SELECT key, val2 FROM `/Root/Table2` ORDER BY key
)");

// Drop both tables
ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false);
}

} // Y_UNIT_TEST_SUITE(IncrementalBackup)

} // NKikimr
256 changes: 203 additions & 53 deletions ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace NKikimr::NSchemeShard::NCdcStreamState {

namespace {

constexpr const char* CONTINUOUS_BACKUP_SUFFIX = "_continuousBackupImpl";

bool IsExpectedTxType(TTxState::ETxType txType) {
switch (txType) {
case TTxState::TxCreateCdcStreamAtTable:
Expand All @@ -24,11 +26,206 @@ bool IsExpectedTxType(TTxState::ETxType txType) {
}
}

bool IsContinuousBackupStream(const TString& streamName) {
return streamName.EndsWith(CONTINUOUS_BACKUP_SUFFIX);
}

struct TTableVersionContext {
TPathId PathId;
TPathId ParentPathId;
TPathId GrandParentPathId;
bool IsIndexImplTable = false;
bool IsContinuousBackupStream = false;
bool IsPartOfContinuousBackup = false;
};

bool DetectContinuousBackupStream(const TTxState& txState, TOperationContext& context) {
if (!txState.CdcPathId || !context.SS->PathsById.contains(txState.CdcPathId)) {
return false;
}

auto cdcPath = context.SS->PathsById.at(txState.CdcPathId);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Checking CDC stream name"
<< ", cdcPathId: " << txState.CdcPathId
<< ", streamName: " << cdcPath->Name
<< ", at schemeshard: " << context.SS->SelfTabletId());

return IsContinuousBackupStream(cdcPath->Name);
}

bool DetectIndexImplTable(TPathElement::TPtr path, TOperationContext& context, TPathId& outGrandParentPathId) {
const TPathId& parentPathId = path->ParentPathId;
if (!parentPathId || !context.SS->PathsById.contains(parentPathId)) {
return false;
}

auto parentPath = context.SS->PathsById.at(parentPathId);
if (parentPath->IsTableIndex()) {
outGrandParentPathId = parentPath->ParentPathId;
return true;
}

return false;
}

bool HasParentContinuousBackup(const TPathId& grandParentPathId, TOperationContext& context) {
if (!grandParentPathId || !context.SS->PathsById.contains(grandParentPathId)) {
return false;
}

auto grandParentPath = context.SS->PathsById.at(grandParentPathId);
for (const auto& [childName, childPathId] : grandParentPath->GetChildren()) {
auto childPath = context.SS->PathsById.at(childPathId);
if (childPath->IsCdcStream() && IsContinuousBackupStream(childName)) {
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Detected continuous backup via parent table CDC stream"
<< ", parentTablePathId: " << grandParentPathId
<< ", cdcStreamName: " << childName
<< ", at schemeshard: " << context.SS->SelfTabletId());
return true;
}
}

return false;
}

TTableVersionContext BuildTableVersionContext(
const TTxState& txState,
TPathElement::TPtr path,
TOperationContext& context)
{
TTableVersionContext ctx;
ctx.PathId = txState.TargetPathId;
ctx.ParentPathId = path->ParentPathId;
ctx.IsContinuousBackupStream = DetectContinuousBackupStream(txState, context);
ctx.IsIndexImplTable = DetectIndexImplTable(path, context, ctx.GrandParentPathId);

// Check if impl table is part of continuous backup
if (ctx.IsIndexImplTable) {
ctx.IsPartOfContinuousBackup = HasParentContinuousBackup(ctx.GrandParentPathId, context);
} else {
ctx.IsPartOfContinuousBackup = ctx.IsContinuousBackupStream;
}

return ctx;
}

void SyncImplTableVersion(
const TTableVersionContext& versionCtx,
TTableInfo::TPtr& table,
TOperationContext& context)
{
Y_ABORT_UNLESS(context.SS->Tables.contains(versionCtx.GrandParentPathId));
auto parentTable = context.SS->Tables.at(versionCtx.GrandParentPathId);

ui64 currentImplVersion = table->AlterVersion;
ui64 currentParentVersion = parentTable->AlterVersion;

if (currentImplVersion <= currentParentVersion) {
table->AlterVersion = currentParentVersion;
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synchronized index impl table version to parent table"
<< ", implTablePathId: " << versionCtx.PathId
<< ", parentTablePathId: " << versionCtx.GrandParentPathId
<< ", oldImplVersion: " << currentImplVersion
<< ", parentVersion: " << currentParentVersion
<< ", newImplVersion: " << table->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
} else {
table->AlterVersion += 1;
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"WARNING: Impl table version ahead of parent, incrementing"
<< ", implTablePathId: " << versionCtx.PathId
<< ", implVersion: " << currentImplVersion
<< ", parentVersion: " << currentParentVersion
<< ", newImplVersion: " << table->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
}

void SyncIndexEntityVersion(
const TPathId& indexPathId,
ui64 targetVersion,
TOperationId operationId,
TOperationContext& context,
NIceDb::TNiceDb& db)
{
if (!context.SS->Indexes.contains(indexPathId)) {
return;
}

auto index = context.SS->Indexes.at(indexPathId);
index->AlterVersion = targetVersion;

context.SS->PersistTableIndexAlterVersion(db, indexPathId, index);

auto indexPath = context.SS->PathsById.at(indexPathId);
context.SS->ClearDescribePathCaches(indexPath);
context.OnComplete.PublishToSchemeBoard(operationId, indexPathId);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synced index entity version"
<< ", indexPathId: " << indexPathId
<< ", newVersion: " << index->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}

void SyncChildIndexes(
TPathElement::TPtr parentPath,
ui64 targetVersion,
TOperationId operationId,
TOperationContext& context,
NIceDb::TNiceDb& db)
{
for (const auto& [childName, childPathId] : parentPath->GetChildren()) {
auto childPath = context.SS->PathsById.at(childPathId);

// Skip non-index children and deleted indexes
if (!childPath->IsTableIndex() || childPath->Dropped()) {
continue;
}

SyncIndexEntityVersion(childPathId, targetVersion, operationId, context, db);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Synced parent index version with parent table"
<< ", parentTable: " << parentPath->Name
<< ", indexName: " << childName
<< ", indexPathId: " << childPathId
<< ", newVersion: " << targetVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
}

void UpdateTableVersion(
const TTableVersionContext& versionCtx,
TTableInfo::TPtr& table,
TOperationId operationId,
TOperationContext& context,
NIceDb::TNiceDb& db)
{
if (versionCtx.IsPartOfContinuousBackup && versionCtx.IsIndexImplTable &&
versionCtx.GrandParentPathId && context.SS->Tables.contains(versionCtx.GrandParentPathId)) {
Comment on lines +208 to +209
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The condition on line 208-209 is complex with multiple boolean checks. Consider extracting this into a descriptive helper function like ShouldSyncWithParentTable to improve readability and maintainability.

Copilot uses AI. Check for mistakes.

SyncImplTableVersion(versionCtx, table, context);

SyncIndexEntityVersion(versionCtx.ParentPathId, table->AlterVersion, operationId, context, db);
} else {
table->AlterVersion += 1;
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Incremented table version"
<< ", pathId: " << versionCtx.PathId
<< ", newVersion: " << table->AlterVersion
<< ", isIndexImpl: " << (versionCtx.IsIndexImplTable ? "yes" : "no")
<< ", isContinuousBackup: " << (versionCtx.IsPartOfContinuousBackup ? "yes" : "no")
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
}

} // namespace anonymous


// NCdcStreamState::TConfigurePartsAtTable
//
TConfigurePartsAtTable::TConfigurePartsAtTable(TOperationId id)
: OperationId(id)
{
Expand Down Expand Up @@ -80,8 +277,6 @@ bool TConfigurePartsAtTable::HandleReply(TEvDataShard::TEvProposeTransactionResu
}


// NCdcStreamState::TProposeAtTable
//
TProposeAtTable::TProposeAtTable(TOperationId id)
: OperationId(id)
{
Expand Down Expand Up @@ -124,59 +319,16 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
Y_ABORT_UNLESS(context.SS->Tables.contains(pathId));
auto table = context.SS->Tables.at(pathId);

table->AlterVersion += 1;

NIceDb::TNiceDb db(context.GetDB());

bool isContinuousBackupStream = false;
if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) {
auto cdcPath = context.SS->PathsById.at(txState->CdcPathId);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " Checking CDC stream name"
<< ", cdcPathId: " << txState->CdcPathId
<< ", streamName: " << cdcPath->Name
<< ", at schemeshard: " << context.SS->SelfTabletId());
if (cdcPath->Name.EndsWith("_continuousBackupImpl")) {
isContinuousBackupStream = true;
}
} else {
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " CdcPathId not found"
<< ", cdcPathId: " << txState->CdcPathId
<< ", at schemeshard: " << context.SS->SelfTabletId());
}

// Check if this is an index implementation table
// If so, we need to sync the parent index version to match the impl table version
// Do this ONLY for continuous backup operations
TPathId parentPathId = path->ParentPathId;
if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) {
auto parentPath = context.SS->PathsById.at(parentPathId);
if (parentPath->IsTableIndex()) {
Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId));
auto index = context.SS->Indexes.at(parentPathId);

index->AlterVersion = table->AlterVersion;

// Persist the index version update directly to database
db.Table<Schema::TableIndex>().Key(parentPathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::TableIndex::AlterVersion>(index->AlterVersion)
);

context.SS->ClearDescribePathCaches(parentPath);
context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId);
auto versionCtx = BuildTableVersionContext(*txState, path, context);
UpdateTableVersion(versionCtx, table, OperationId, context, db);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " Synced parent index version with impl table"
<< ", indexPathId: " << parentPathId
<< ", indexName: " << parentPath->Name
<< ", newVersion: " << index->AlterVersion
<< ", at schemeshard: " << context.SS->SelfTabletId());
}
if (versionCtx.IsContinuousBackupStream && !versionCtx.IsIndexImplTable) {
SyncChildIndexes(path, table->AlterVersion, OperationId, context, db);
}

context.SS->PersistTableAlterVersion(db, pathId, table);

context.SS->ClearDescribePathCaches(path);
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);

Expand All @@ -195,8 +347,6 @@ bool TProposeAtTable::HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOpe
}


// NCdcStreamState::TProposeAtTableDropSnapshot
//
bool TProposeAtTableDropSnapshot::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) {
TProposeAtTable::HandleReply(ev, context);

Expand Down
Loading
Loading