From 9314cd1fb7d76de4eebbd18f9f8a97a468da1977 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 8 Sep 2025 12:47:32 +0000 Subject: [PATCH] fix attr --- .../datashard_ut_incremental_backup.cpp | 112 ++++++++++++++++++ ...ard__operation_alter_continuous_backup.cpp | 5 + .../schemeshard/schemeshard_path_element.cpp | 3 +- 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 7a640adfc141..251a79d2c193 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -1806,6 +1806,118 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { false, Ydb::StatusIds::SCHEME_ERROR); } + Y_UNIT_TEST(VerifyIncrementalBackupTableAttributes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + // Insert some initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), (2, 20), (3, 30); + )"); + + // Create backup collection with incremental backup enabled + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `TestCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Create full backup first + ExecSQL(server, edgeActor, R"(BACKUP `TestCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(5)); + + // Modify some data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 200); + DELETE FROM `/Root/Table` WHERE key = 1; + )"); + + // Create incremental backup - this should create the incremental backup implementation tables + ExecSQL(server, edgeActor, R"(BACKUP `TestCollection` INCREMENTAL;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); // More time for incremental backup to complete + + // Try to find the incremental backup table by iterating through possible timestamps + TString foundIncrementalBackupPath; + bool foundIncrementalBackupTable = false; + + // Common incremental backup paths to try (timestamp-based) + TVector possiblePaths = { + "/Root/.backups/collections/TestCollection/19700101000002Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000003Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000004Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000005Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000006Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000007Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000008Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000009Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000010Z_incremental/Table", + }; + + for (const auto& path : possiblePaths) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(path); + request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(edgeActor); + + if (reply->Get()->GetRecord().GetStatus() == NKikimrScheme::EStatus::StatusSuccess) { + foundIncrementalBackupPath = path; + foundIncrementalBackupTable = true; + Cerr << "Found incremental backup table at: " << path << Endl; + break; + } + } + + UNIT_ASSERT_C(foundIncrementalBackupTable, TStringBuilder() << "Could not find incremental backup table. Tried paths: " << JoinSeq(", ", possiblePaths)); + + // Now check the found incremental backup table attributes + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(foundIncrementalBackupPath); + request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(edgeActor); + + UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess); + + const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription(); + UNIT_ASSERT(pathDescription.HasTable()); + + // Verify that incremental backup table has __incremental_backup attribute + bool hasIncrementalBackupAttr = false; + bool hasAsyncReplicaAttr = false; + + for (const auto& attr : pathDescription.GetUserAttributes()) { + Cerr << "Found attribute: " << attr.GetKey() << " = " << attr.GetValue() << Endl; + if (attr.GetKey() == "__incremental_backup") { + hasIncrementalBackupAttr = true; + } + if (attr.GetKey() == "__async_replica") { + hasAsyncReplicaAttr = true; + } + } + + // Verify that we have __incremental_backup but NOT __async_replica + UNIT_ASSERT_C(hasIncrementalBackupAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must have __incremental_backup attribute"); + UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute"); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 2de69efe8f40..decaaee0be76 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -57,6 +57,11 @@ void DoCreateIncrBackupTable(const TOperationId& opId, const TPath& dst, NKikimr auto& replicationConfig = *desc.MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); replicationConfig.SetConsistencyLevel(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_LEVEL_ROW); + + // Set incremental backup config so DataShard can distinguish between async replica and incremental backup + auto& incrementalBackupConfig = *desc.MutableIncrementalBackupConfig(); + incrementalBackupConfig.SetMode(NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_INCREMENTAL_BACKUP); + incrementalBackupConfig.SetConsistency(NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_WEAK); for (auto& column : *desc.MutableColumns()) { column.SetNotNull(false); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index 2f87067439b5..89e081e830b7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -485,7 +485,8 @@ void TPathElement::SerializeRuntimeAttrs( process(FileStoreSpaceHDD, "__filestore_space_allocated_hdd"); process(FileStoreSpaceSSDSystem, "__filestore_space_allocated_ssd_system"); - if (IsAsyncReplica) { + // Set __async_replica attribute only for true async replica tables, not for incremental backup tables + if (IsAsyncReplica && !IsIncrementalRestoreTable) { auto* attr = userAttrs->Add(); attr->SetKey(ToString(ATTR_ASYNC_REPLICA)); attr->SetValue("true");