Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,118 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
false, Ydb::StatusIds::SCHEME_ERROR);
}

Y_UNIT_TEST(VerifyIncrementalBackupTableAttributes) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
.SetEnableBackupService(true)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

// Insert some initial data
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10), (2, 20), (3, 30);
)");

// Create backup collection with incremental backup enabled
ExecSQL(server, edgeActor, R"(
CREATE BACKUP COLLECTION `TestCollection`
( TABLE `/Root/Table`
)
WITH
( STORAGE = 'cluster'
, INCREMENTAL_BACKUP_ENABLED = 'true'
);
)", false);

// Create full backup first
ExecSQL(server, edgeActor, R"(BACKUP `TestCollection`;)", false);
runtime.SimulateSleep(TDuration::Seconds(5));

// Modify some data
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 200);
DELETE FROM `/Root/Table` WHERE key = 1;
)");

// Create incremental backup - this should create the incremental backup implementation tables
ExecSQL(server, edgeActor, R"(BACKUP `TestCollection` INCREMENTAL;)", false);
runtime.SimulateSleep(TDuration::Seconds(10)); // More time for incremental backup to complete

// Try to find the incremental backup table by iterating through possible timestamps
TString foundIncrementalBackupPath;
bool foundIncrementalBackupTable = false;

// Common incremental backup paths to try (timestamp-based)
TVector<TString> possiblePaths = {
"/Root/.backups/collections/TestCollection/19700101000002Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000003Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000004Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000005Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000006Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000007Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000008Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000009Z_incremental/Table",
"/Root/.backups/collections/TestCollection/19700101000010Z_incremental/Table",
};

for (const auto& path : possiblePaths) {
auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>();
request->Record.MutableDescribePath()->SetPath(path);
request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true);
runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release()));
auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(edgeActor);

if (reply->Get()->GetRecord().GetStatus() == NKikimrScheme::EStatus::StatusSuccess) {
foundIncrementalBackupPath = path;
foundIncrementalBackupTable = true;
Cerr << "Found incremental backup table at: " << path << Endl;
break;
}
}

UNIT_ASSERT_C(foundIncrementalBackupTable, TStringBuilder() << "Could not find incremental backup table. Tried paths: " << JoinSeq(", ", possiblePaths));

// Now check the found incremental backup table attributes
auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>();
request->Record.MutableDescribePath()->SetPath(foundIncrementalBackupPath);
request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true);
runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release()));
auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(edgeActor);

UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess);

const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription();
UNIT_ASSERT(pathDescription.HasTable());

// Verify that incremental backup table has __incremental_backup attribute
bool hasIncrementalBackupAttr = false;
bool hasAsyncReplicaAttr = false;

for (const auto& attr : pathDescription.GetUserAttributes()) {
Cerr << "Found attribute: " << attr.GetKey() << " = " << attr.GetValue() << Endl;
if (attr.GetKey() == "__incremental_backup") {
hasIncrementalBackupAttr = true;
}
if (attr.GetKey() == "__async_replica") {
hasAsyncReplicaAttr = true;
}
}

// Verify that we have __incremental_backup but NOT __async_replica
UNIT_ASSERT_C(hasIncrementalBackupAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must have __incremental_backup attribute");
UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute");
}

} // Y_UNIT_TEST_SUITE(IncrementalBackup)

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ void DoCreateIncrBackupTable(const TOperationId& opId, const TPath& dst, NKikimr
auto& replicationConfig = *desc.MutableReplicationConfig();
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY);
replicationConfig.SetConsistencyLevel(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_LEVEL_ROW);

// Set incremental backup config so DataShard can distinguish between async replica and incremental backup
auto& incrementalBackupConfig = *desc.MutableIncrementalBackupConfig();
incrementalBackupConfig.SetMode(NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_INCREMENTAL_BACKUP);
incrementalBackupConfig.SetConsistency(NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_WEAK);

for (auto& column : *desc.MutableColumns()) {
column.SetNotNull(false);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_path_element.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ void TPathElement::SerializeRuntimeAttrs(
process(FileStoreSpaceHDD, "__filestore_space_allocated_hdd");
process(FileStoreSpaceSSDSystem, "__filestore_space_allocated_ssd_system");

if (IsAsyncReplica) {
// Set __async_replica attribute only for true async replica tables, not for incremental backup tables
if (IsAsyncReplica && !IsIncrementalRestoreTable) {
auto* attr = userAttrs->Add();
attr->SetKey(ToString(ATTR_ASYNC_REPLICA));
attr->SetValue("true");
Expand Down
Loading