From 89d8f9eff9043f8a16aa4da0282c0eb99adee957 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 8 Sep 2025 13:28:24 +0000 Subject: [PATCH 1/3] fix replace into --- ydb/core/backup/impl/change_record.h | 36 +++++--- .../datashard_ut_incremental_backup.cpp | 89 +++++++++++++++++++ 2 files changed, 112 insertions(+), 13 deletions(-) diff --git a/ydb/core/backup/impl/change_record.h b/ydb/core/backup/impl/change_record.h index 01092be8eb3b..34afab02081b 100644 --- a/ydb/core/backup/impl/change_record.h +++ b/ydb/core/backup/impl/change_record.h @@ -71,19 +71,24 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { auto& upsert = *record.MutableUpsert(); switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: { TVector tags; TVector cells; NKikimrBackup::TColumnStateMap columnStateMap; - const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert(); + // Handle both Upsert and Reset operations + const auto& operationData = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert + ? ProtoBody.GetCdcDataChange().GetUpsert() + : ProtoBody.GetCdcDataChange().GetReset(); + TSerializedCellVec originalCells; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells)); + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells)); - tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end()); + tags.assign(operationData.GetTags().begin(), operationData.GetTags().end()); cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end()); - THashSet presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end()); + THashSet presentTags(operationData.GetTags().begin(), operationData.GetTags().end()); for (const auto& [name, columnInfo] : Schema->ValueColumns) { if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") { continue; @@ -93,9 +98,9 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { columnState->SetTag(columnInfo.Tag); if (presentTags.contains(columnInfo.Tag)) { - auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag); - if (it != upsertData.GetTags().end()) { - size_t idx = std::distance(upsertData.GetTags().begin(), it); + auto it = std::find(operationData.GetTags().begin(), operationData.GetTags().end(), columnInfo.Tag); + if (it != operationData.GetTags().end()) { + size_t idx = std::distance(operationData.GetTags().begin(), it); if (idx < originalCells.GetCells().size()) { columnState->SetIsNull(originalCells.GetCells()[idx].IsNull()); } else { @@ -168,7 +173,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { upsert.SetData(TSerializedCellVec::Serialize(cells)); break; } - case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]]; default: Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); } @@ -182,27 +186,33 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: { auto& upsert = *record.MutableUpsert(); - // Check if NewImage is available, otherwise fall back to Upsert + // Check if NewImage is available, otherwise fall back to Upsert/Reset if (ProtoBody.GetCdcDataChange().has_newimage()) { *upsert.MutableTags() = { ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(), ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()}; upsert.SetData(ProtoBody.GetCdcDataChange().GetNewImage().GetData()); - } else { + } else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert) { // Fallback to Upsert field if NewImage is not available *upsert.MutableTags() = { ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + } else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) { + // Handle Reset operation + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetReset().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetReset().GetTags().end()}; + upsert.SetData(ProtoBody.GetCdcDataChange().GetReset().GetData()); } break; } case NKikimrChangeExchange::TDataChange::kErase: record.MutableErase(); break; - case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]]; default: Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); } diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 251a79d2c193..f6fa836af5e2 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -169,6 +169,23 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + NKikimrChangeExchange::TChangeRecord MakeReset(ui32 key, ui32 value) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& reset = *dc.MutableReset(); + reset.AddTags(2); + reset.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + NKikimrChangeExchange::TChangeRecord MakeErase(ui32 key) { auto keyCell = TCell::Make(key); NKikimrChangeExchange::TChangeRecord proto; @@ -1918,6 +1935,78 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute"); } + Y_UNIT_TEST(ResetOperationIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + // Test kReset operation (REPLACE INTO) + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES + (1, 100), + (3, 300); + )"); + + WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", { + MakeReset(1, 100), + MakeReset(3, 300), + }); + } + + Y_UNIT_TEST(ReplaceIntoIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2135), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + // Insert initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + // Test multiple REPLACE operations + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES + (1, 100), + (4, 400); + )"); + + WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", { + MakeReset(1, 100), + MakeReset(4, 400), + }); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr From 1eef59f4fa07f6aec37d5b43802fe29f0a0844f2 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 9 Sep 2025 09:33:54 +0000 Subject: [PATCH 2/3] fix --- ydb/core/backup/impl/change_record.h | 16 +- .../datashard_ut_incremental_backup.cpp | 138 ++++++++++++++++++ 2 files changed, 149 insertions(+), 5 deletions(-) diff --git a/ydb/core/backup/impl/change_record.h b/ydb/core/backup/impl/change_record.h index 34afab02081b..331087826d3d 100644 --- a/ydb/core/backup/impl/change_record.h +++ b/ydb/core/backup/impl/change_record.h @@ -78,9 +78,10 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { NKikimrBackup::TColumnStateMap columnStateMap; // Handle both Upsert and Reset operations - const auto& operationData = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert - ? ProtoBody.GetCdcDataChange().GetUpsert() - : ProtoBody.GetCdcDataChange().GetReset(); + const bool isResetOperation = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset; + const auto& operationData = isResetOperation + ? ProtoBody.GetCdcDataChange().GetReset() + : ProtoBody.GetCdcDataChange().GetUpsert(); TSerializedCellVec originalCells; Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells)); @@ -111,8 +112,13 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { } columnState->SetIsChanged(true); } else { - columnState->SetIsNull(false); - columnState->SetIsChanged(false); + if (isResetOperation) { + columnState->SetIsNull(true); + columnState->SetIsChanged(true); + } else { + columnState->SetIsNull(false); + columnState->SetIsChanged(false); + } } } diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index f6fa836af5e2..d5c83823f9cf 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -65,6 +65,16 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return TShardedTableOptions(); } + TShardedTableOptions ThreeColumnTable() { + TShardedTableOptions opts; + opts.Columns_ = { + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"extra", "Uint32", false, false} + }; + return opts; + } + ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { auto streamDesc = Ls(runtime, sender, path); @@ -199,6 +209,44 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& upsert = *dc.MutableUpsert(); + for (auto tag : tags) { + upsert.AddTags(tag); + } + upsert.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + + NKikimrChangeExchange::TChangeRecord MakeResetPartial(ui32 key, ui32 value, const TVector& tags = {2}) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& reset = *dc.MutableReset(); + for (auto tag : tags) { + reset.AddTags(tag); + } + reset.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + Y_UNIT_TEST(SimpleBackup) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) @@ -2007,6 +2055,96 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { }); } + Y_UNIT_TEST(ResetVsUpsertMissingColumnsTest) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2136), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable()); + + // Insert initial data with all three columns + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value, extra) VALUES + (1, 10, 100), + (2, 20, 200); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15); + )"); + + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES (2, 25); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0); + UNIT_ASSERT_VALUES_EQUAL(records.size(), 2); + + // Parse the first record (Upsert) + NKikimrChangeExchange::TChangeRecord firstRecord; + UNIT_ASSERT(firstRecord.ParseFromString(records[0].second)); + UNIT_ASSERT_C(firstRecord.GetCdcDataChange().HasUpsert(), "First record should be an upsert"); + + // Parse the second record (Reset) + NKikimrChangeExchange::TChangeRecord secondRecord; + UNIT_ASSERT(secondRecord.ParseFromString(records[1].second)); + UNIT_ASSERT_C(secondRecord.GetCdcDataChange().HasReset(), "Second record should be a reset"); + } + + Y_UNIT_TEST(ResetVsUpsertColumnStateSerialization) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2137), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15); + )"); + + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES (1, 25); + )"); + + SimulateSleep(server, TDuration::Seconds(2)); + + auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0); + UNIT_ASSERT_C(records.size() >= 2, "Should have at least 2 records"); + + for (size_t i = 0; i < records.size(); ++i) { + NKikimrChangeExchange::TChangeRecord parsedRecord; + UNIT_ASSERT(parsedRecord.ParseFromString(records[i].second)); + const auto& dataChange = parsedRecord.GetCdcDataChange(); + + UNIT_ASSERT_C(dataChange.HasUpsert() || dataChange.HasReset(), + "Record should be either upsert or reset operation"); + } + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr From 6cab62498b3be525097a950c2eaffeb03ec5cd55 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 9 Sep 2025 11:11:03 +0000 Subject: [PATCH 3/3] fix --- ydb/core/backup/impl/change_record.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ydb/core/backup/impl/change_record.h b/ydb/core/backup/impl/change_record.h index 331087826d3d..f80f07b0df9a 100644 --- a/ydb/core/backup/impl/change_record.h +++ b/ydb/core/backup/impl/change_record.h @@ -208,11 +208,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); } else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) { - // Handle Reset operation - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetReset().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetReset().GetTags().end()}; - upsert.SetData(ProtoBody.GetCdcDataChange().GetReset().GetData()); + Y_ABORT("Reset operation is not supported, all operations must be converted to Upsert"); } break; }