From a758fb4975a7eefc3702f4bb56fa1fff0acfba65 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 8 Sep 2025 21:18:24 +0300 Subject: [PATCH 1/3] Switch incremental backup cdc mode back to Update (#24271) --- ydb/core/backup/impl/change_record.h | 111 ++++++++++++------ ydb/core/backup/impl/table_writer_ut.cpp | 71 +++++++---- ydb/core/protos/datashard_backup.proto | 9 ++ .../datashard_ut_incremental_backup.cpp | 33 +++--- .../tx/datashard/incr_restore_helpers.cpp | 71 ++++++++++- ...ard__operation_alter_continuous_backup.cpp | 14 ++- ...rd__operation_backup_backup_collection.cpp | 4 +- ...rd__operation_create_continuous_backup.cpp | 2 +- .../ut_continuous_backup.cpp | 10 +- .../ut_continuous_backup_reboots.cpp | 20 ++-- 10 files changed, 248 insertions(+), 97 deletions(-) diff --git a/ydb/core/backup/impl/change_record.h b/ydb/core/backup/impl/change_record.h index a6912538c408..01092be8eb3b 100644 --- a/ydb/core/backup/impl/change_record.h +++ b/ydb/core/backup/impl/change_record.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -71,58 +72,100 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { case NKikimrChangeExchange::TDataChange::kUpsert: { - // Check if NewImage is available, otherwise fall back to Upsert - if (ProtoBody.GetCdcDataChange().HasNewImage()) { - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()}; - auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted"); - Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation"); - upsert.AddTags(it->second.Tag); - - TString serializedCellVec = ProtoBody.GetCdcDataChange().GetNewImage().GetData(); - Y_ABORT_UNLESS( - TSerializedCellVec::UnsafeAppendCells({TCell::Make(false)}, serializedCellVec), - "Invalid cell format, can't append cells"); + TVector tags; + TVector cells; + NKikimrBackup::TColumnStateMap columnStateMap; + + const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert(); + TSerializedCellVec originalCells; + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells)); + + tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end()); + cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end()); + + THashSet presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end()); + for (const auto& [name, columnInfo] : Schema->ValueColumns) { + if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") { + continue; + } + + auto* columnState = columnStateMap.AddColumnStates(); + columnState->SetTag(columnInfo.Tag); + + if (presentTags.contains(columnInfo.Tag)) { + auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag); + if (it != upsertData.GetTags().end()) { + size_t idx = std::distance(upsertData.GetTags().begin(), it); + if (idx < originalCells.GetCells().size()) { + columnState->SetIsNull(originalCells.GetCells()[idx].IsNull()); + } else { + columnState->SetIsNull(true); + } + } else { + columnState->SetIsNull(true); + } + columnState->SetIsChanged(true); + } else { + columnState->SetIsNull(false); + columnState->SetIsChanged(false); + } + } - upsert.SetData(serializedCellVec); - } else { - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; - auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted"); - Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation"); - upsert.AddTags(it->second.Tag); + auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted"); + Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation"); + tags.push_back(deletedIt->second.Tag); + cells.emplace_back(TCell::Make(false)); - TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData(); - Y_ABORT_UNLESS( - TSerializedCellVec::UnsafeAppendCells({TCell::Make(false)}, serializedCellVec), - "Invalid cell format, can't append cells"); + auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates"); + Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation"); + tags.push_back(columnStatesIt->second.Tag); + + TString serializedColumnState; + Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState)); + cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size())); - upsert.SetData(serializedCellVec); - } + *upsert.MutableTags() = {tags.begin(), tags.end()}; + upsert.SetData(TSerializedCellVec::Serialize(cells)); break; } case NKikimrChangeExchange::TDataChange::kErase: { size_t size = Schema->ValueColumns.size(); TVector tags; TVector cells; + NKikimrBackup::TColumnStateMap columnStateMap; tags.reserve(size); cells.reserve(size); - for (const auto& [name, value] : Schema->ValueColumns) { - tags.push_back(value.Tag); - if (name != "__ydb_incrBackupImpl_deleted") { - cells.emplace_back(); - } else { - cells.emplace_back(TCell::Make(true)); + for (const auto& [name, columnInfo] : Schema->ValueColumns) { + if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") { + continue; } + + tags.push_back(columnInfo.Tag); + cells.emplace_back(); + + auto* columnState = columnStateMap.AddColumnStates(); + columnState->SetTag(columnInfo.Tag); + columnState->SetIsNull(true); + columnState->SetIsChanged(true); } + auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted"); + Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation"); + tags.push_back(deletedIt->second.Tag); + cells.emplace_back(TCell::Make(true)); + + auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates"); + Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation"); + tags.push_back(columnStatesIt->second.Tag); + + TString serializedColumnState; + Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState)); + cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size())); + *upsert.MutableTags() = {tags.begin(), tags.end()}; upsert.SetData(TSerializedCellVec::Serialize(cells)); - break; } case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]]; diff --git a/ydb/core/backup/impl/table_writer_ut.cpp b/ydb/core/backup/impl/table_writer_ut.cpp index 2863b976865b..c99b60418475 100644 --- a/ydb/core/backup/impl/table_writer_ut.cpp +++ b/ydb/core/backup/impl/table_writer_ut.cpp @@ -1,6 +1,7 @@ #include "change_record.h" #include "table_writer.h" +#include #include namespace NKikimr::NBackup::NImpl { @@ -19,6 +20,10 @@ Y_UNIT_TEST_SUITE(TableWriter) { .Tag = 123, .Type = NScheme::TTypeInfo{NScheme::NTypeIds::Bool}, }); + schema->ValueColumns.emplace("__ydb_incrBackupImpl_columnStates", TLightweightSchema::TColumn{ + .Tag = 124, + .Type = NScheme::TTypeInfo{NScheme::NTypeIds::String}, + }); { NKikimrChangeExchange::TChangeRecord changeRecord; @@ -51,18 +56,26 @@ Y_UNIT_TEST_SUITE(TableWriter) { NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result; record->Serialize(result, EWriterType::Backup); - TVector outCells{ - TCell::Make(4567), - TCell::Make(false), - }; - - TString out = TSerializedCellVec::Serialize(outCells); - - UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey()); - UNIT_ASSERT(result.GetUpsert().TagsSize() == 2); - UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1); - UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123); - UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData()); + // The serialization logic is complex, so let's just use the actual result + // and verify the structure is correct by parsing it back + TSerializedCellVec resultCells; + UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells)); + UNIT_ASSERT(resultCells.GetCells().size() == 3); + + // Verify the first cell is the value + UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[0].AsValue(), 4567); + + // Verify the second cell is the deleted flag + UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue(), false); + + // Verify the third cell contains a valid column state map + NKikimrBackup::TColumnStateMap actualColumnState; + TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size()); + UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState)); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), false); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true); } { @@ -91,18 +104,34 @@ Y_UNIT_TEST_SUITE(TableWriter) { NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result; record->Serialize(result, EWriterType::Backup); - TVector outCells{ - TCell(), - TCell::Make(true), - }; - - TString out = TSerializedCellVec::Serialize(outCells); + // The serialization logic is complex, so let's just verify the structure + // and content rather than exact binary encoding + TSerializedCellVec resultCells; + UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells)); + UNIT_ASSERT(resultCells.GetCells().size() == 3); + + // For erase records, the first cell should be null/empty + UNIT_ASSERT(resultCells.GetCells()[0].IsNull()); + + // Verify the second cell is the deleted flag (true for erase) + UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue(), true); + + // Verify the third cell contains a valid column state map + NKikimrBackup::TColumnStateMap actualColumnState; + TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size()); + UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState)); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1); + // For erase records, all columns are changed (set to null), so IsChanged should be true + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true); + // For erase records, all columns are set to null + UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), true); UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey()); - UNIT_ASSERT(result.GetUpsert().TagsSize() == 2); - UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123); + UNIT_ASSERT(result.GetUpsert().TagsSize() == 3); UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1); - UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData()); + UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123); + UNIT_ASSERT(result.GetUpsert().GetTags(2) == 124); } } diff --git a/ydb/core/protos/datashard_backup.proto b/ydb/core/protos/datashard_backup.proto index d8ff828ff4be..27b4239f90ec 100644 --- a/ydb/core/protos/datashard_backup.proto +++ b/ydb/core/protos/datashard_backup.proto @@ -23,3 +23,12 @@ message TChecksumState { message TS3DownloadState { optional bytes EncryptedDeserializerState = 1 [(Ydb.sensitive) = true]; // Contains secure key } + +message TColumnStateMap { + message TColumnState { + optional uint32 Tag = 1; + optional bool IsNull = 2; + optional bool IsChanged = 3; + } + repeated TColumnState ColumnStates = 1; +} diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 9d1266b692f1..6103b0e8833c 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -1,22 +1,24 @@ #include "datashard_ut_common_kqp.h" #include -#include #include #include #include #include +#include +#include #include #include +#include #include -#include #include -#include -#include +#include #include #include #include +#include +#include #include #include @@ -159,10 +161,10 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { auto& dcKey = *dc.MutableKey(); dcKey.AddTags(1); dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); - auto& newImage = *dc.MutableNewImage(); - newImage.AddTags(2); - newImage.SetData(TSerializedCellVec::Serialize({valueCell})); - dc.MutableUpsert(); + + auto& upsert = *dc.MutableUpsert(); + upsert.AddTags(2); + upsert.SetData(TSerializedCellVec::Serialize({valueCell})); return proto; } @@ -744,23 +746,24 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { .Columns({ {"key", "Uint32", true, false}, {"value", "Uint32", false, false}, - {"__ydb_incrBackupImpl_deleted", "Bool", false, false}}); + {"__ydb_incrBackupImpl_deleted", "Bool", false, false}, + {"__ydb_incrBackupImpl_columnStates", "String", false, false}}); CreateShardedTable(server, edgeActor, "/Root/.backups/collections/MyCollection/19700101000002Z_incremental", "Table", opts); ExecSQL(server, edgeActor, R"( - UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted) VALUES - (2, 200, NULL) - , (1, NULL, true) + UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted, __ydb_incrBackupImpl_columnStates) VALUES + (2, 200, NULL, NULL) + , (1, NULL, true, NULL) ; )"); CreateShardedTable(server, edgeActor, "/Root/.backups/collections/MyCollection/19700101000003Z_incremental", "Table", opts); ExecSQL(server, edgeActor, R"( - UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000003Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted) VALUES - (2, 2000, NULL) - , (5, NULL, true) + UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000003Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted, __ydb_incrBackupImpl_columnStates) VALUES + (2, 2000, NULL, NULL) + , (5, NULL, true, NULL) ; )"); } diff --git a/ydb/core/tx/datashard/incr_restore_helpers.cpp b/ydb/core/tx/datashard/incr_restore_helpers.cpp index 2929137f5e1b..d823e99c45ce 100644 --- a/ydb/core/tx/datashard/incr_restore_helpers.cpp +++ b/ydb/core/tx/datashard/incr_restore_helpers.cpp @@ -1,29 +1,88 @@ #include "incr_restore_helpers.h" +#include + namespace NKikimr::NDataShard::NIncrRestoreHelpers { std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, const TMap& columns) { Y_ENSURE(cells.size() >= 1); TVector updates(::Reserve(cells.size() - 1)); - bool foundSpecialColumn = false; + int specialColumnCount = 0; + NKikimrBackup::TColumnStateMap columnStateMap; + bool deletedFlag = false; + bool hasNullStateData = false; + Y_ENSURE(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { const auto tag = tags.at(pos); auto it = columns.find(tag); Y_ENSURE(it != columns.end()); + if (it->second.Name == "__ydb_incrBackupImpl_deleted") { if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { - return std::nullopt; + deletedFlag = true; + } + specialColumnCount++; + } else if (it->second.Name == "__ydb_incrBackupImpl_columnStates") { + if (const auto& cell = cells.at(pos); !cell.IsNull()) { + TString serializedNullState(cell.Data(), cell.Size()); + if (!serializedNullState.empty()) { + if (columnStateMap.ParseFromString(serializedNullState)) { + hasNullStateData = true; + } + } } - foundSpecialColumn = true; + specialColumnCount++; + } + } + + Y_ENSURE(specialColumnCount == 1 || specialColumnCount == 2); + + if (deletedFlag) { + return std::nullopt; + } + + struct TColumnState { + bool IsNull = false; + bool IsChanged = false; + }; + + THashMap tagToColumnState; + if (hasNullStateData) { + for (const auto& columnState : columnStateMap.GetColumnStates()) { + tagToColumnState[columnState.GetTag()] = {columnState.GetIsNull(), columnState.GetIsChanged()}; + } + } + + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = columns.find(tag); + Y_ENSURE(it != columns.end()); + + if (it->second.Name == "__ydb_incrBackupImpl_deleted" || + it->second.Name == "__ydb_incrBackupImpl_columnStates") { continue; } - updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type.GetTypeId())); + + + if (hasNullStateData) { + auto columnStateIt = tagToColumnState.find(tag); + + if (columnStateIt != tagToColumnState.end() && !columnStateIt->second.IsChanged) { + continue; + } else if (columnStateIt != tagToColumnState.end() && columnStateIt->second.IsNull) { + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue()); + } else { + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type.GetTypeId())); + } + } else { + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type.GetTypeId())); + } } - Y_ENSURE(foundSpecialColumn); return updates; } -} // namespace NKikimr::NBackup::NImpl +} // namespace NKikimr::NDataShard::NIncrRestoreHelpers diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 41781cdd60bb..a57ba37bf735 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -57,13 +57,21 @@ void DoCreateIncrBackupTable(const TOperationId& opId, const TPath& dst, NKikimr auto& replicationConfig = *desc.MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); replicationConfig.SetConsistencyLevel(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_LEVEL_ROW); - - // TODO: remove NotNull from all columns for correct deletion writing + + for (auto& column : *desc.MutableColumns()) { + column.SetNotNull(false); + } + + auto* columnStatesCol = desc.AddColumns(); + columnStatesCol->SetName("__ydb_incrBackupImpl_columnStates"); + columnStatesCol->SetType("String"); + columnStatesCol->SetNotNull(false); // TODO: cleanup all sequences auto* col = desc.AddColumns(); col->SetName("__ydb_incrBackupImpl_deleted"); col->SetType("Bool"); + col->SetNotNull(false); result.push_back(CreateNewTable(NextPartId(opId, result), outTx)); } @@ -161,7 +169,7 @@ bool CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TO createCdcStreamOp.SetTableName(tableName); auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); streamDescription.SetName(newStreamName); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeNewImage); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); rotateCdcStreamOp.MutableNewStream()->CopyFrom(createCdcStreamOp); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index 2f0636b95fbd..b5416dcbba59 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -86,7 +86,7 @@ TVector CreateBackupBackupCollection(TOperationId opId, con createCdcStreamOp.SetTableName(item.GetPath()); auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); streamDescription.SetName(streamName); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeNewImage); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); const auto sPath = TPath::Resolve(item.GetPath(), context.SS); @@ -106,7 +106,7 @@ TVector CreateBackupBackupCollection(TOperationId opId, con createCdcStreamOp.SetTableName(item.GetPath()); auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); streamDescription.SetName(streamName); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeNewImage); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); const auto sPath = TPath::Resolve(item.GetPath(), context.SS); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp index 7ef88a997c2c..e23dbfd8eb11 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp @@ -68,7 +68,7 @@ TVector CreateNewContinuousBackup(TOperationId opId, const createCdcStreamOp.SetTableName(tableName); auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); streamDescription.SetName(streamName); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeNewImage); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); TVector result; diff --git a/ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp b/ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp index 835cafb219d5..941b17fbe9c1 100644 --- a/ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/ut_continuous_backup/ut_continuous_backup.cpp @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), }); @@ -89,7 +89,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/1_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -146,7 +146,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/IncrBackupImpl"), { NLs::PathExist, NLs::IsTable, - NLs::CheckColumns("IncrBackupImpl", {"key", "value", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), + NLs::CheckColumns("IncrBackupImpl", {"key", "value", "__ydb_incrBackupImpl_columnStates", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), }); } } // TCdcStreamWithInitialScanTests diff --git a/ydb/core/tx/schemeshard/ut_continuous_backup_reboots/ut_continuous_backup_reboots.cpp b/ydb/core/tx/schemeshard/ut_continuous_backup_reboots/ut_continuous_backup_reboots.cpp index c5ea54e72361..e4f552c880ca 100644 --- a/ydb/core/tx/schemeshard/ut_continuous_backup_reboots/ut_continuous_backup_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_continuous_backup_reboots/ut_continuous_backup_reboots.cpp @@ -43,7 +43,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TInactiveZone inactive(activeZone); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -61,7 +61,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TInactiveZone inactive(activeZone); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), }); @@ -109,7 +109,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TInactiveZone inactive(activeZone); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -151,7 +151,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/1_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -172,7 +172,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/IncrBackupImpl"), { NLs::PathExist, NLs::IsTable, - NLs::CheckColumns("IncrBackupImpl", {"key", "value", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), + NLs::CheckColumns("IncrBackupImpl", {"key", "value", "__ydb_incrBackupImpl_deleted", "__ydb_incrBackupImpl_columnStates"}, {}, {"key"}), }); } }); @@ -207,7 +207,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TInactiveZone inactive(activeZone); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/0_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -259,7 +259,7 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/3_continuousBackupImpl"), { NLs::PathExist, - NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeNewImage), + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeUpdate), NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), NLs::StreamVirtualTimestamps(false), @@ -272,19 +272,19 @@ Y_UNIT_TEST_SUITE(TContinuousBackupWithRebootsTests) { TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/IncrBackupImpl1"), { NLs::PathExist, NLs::IsTable, - NLs::CheckColumns("IncrBackupImpl1", {"key", "value", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), + NLs::CheckColumns("IncrBackupImpl1", {"key", "value", "__ydb_incrBackupImpl_deleted", "__ydb_incrBackupImpl_columnStates"}, {}, {"key"}), }); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/IncrBackupImpl2"), { NLs::PathExist, NLs::IsTable, - NLs::CheckColumns("IncrBackupImpl2", {"key", "value", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), + NLs::CheckColumns("IncrBackupImpl2", {"key", "value", "__ydb_incrBackupImpl_deleted", "__ydb_incrBackupImpl_columnStates"}, {}, {"key"}), }); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/IncrBackupImpl3"), { NLs::PathExist, NLs::IsTable, - NLs::CheckColumns("IncrBackupImpl3", {"key", "value", "__ydb_incrBackupImpl_deleted"}, {}, {"key"}), + NLs::CheckColumns("IncrBackupImpl3", {"key", "value", "__ydb_incrBackupImpl_deleted", "__ydb_incrBackupImpl_columnStates"}, {}, {"key"}), }); } }); From 219ddd7ed3ed3c9c903526a3ed3b1c4c815d6eb1 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 9 Sep 2025 00:04:44 +0300 Subject: [PATCH 2/3] Remove usage of __async_repl attr from incr backup impl (#24425) --- .../datashard_ut_incremental_backup.cpp | 112 ++++++++++++++++++ ...ard__operation_alter_continuous_backup.cpp | 5 + .../schemeshard/schemeshard_path_element.cpp | 3 +- 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 6103b0e8833c..7e41b4c63afa 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -1806,6 +1806,118 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { false, Ydb::StatusIds::SCHEME_ERROR); } + Y_UNIT_TEST(VerifyIncrementalBackupTableAttributes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + // Insert some initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), (2, 20), (3, 30); + )"); + + // Create backup collection with incremental backup enabled + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `TestCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Create full backup first + ExecSQL(server, edgeActor, R"(BACKUP `TestCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(5)); + + // Modify some data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 200); + DELETE FROM `/Root/Table` WHERE key = 1; + )"); + + // Create incremental backup - this should create the incremental backup implementation tables + ExecSQL(server, edgeActor, R"(BACKUP `TestCollection` INCREMENTAL;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); // More time for incremental backup to complete + + // Try to find the incremental backup table by iterating through possible timestamps + TString foundIncrementalBackupPath; + bool foundIncrementalBackupTable = false; + + // Common incremental backup paths to try (timestamp-based) + TVector possiblePaths = { + "/Root/.backups/collections/TestCollection/19700101000002Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000003Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000004Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000005Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000006Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000007Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000008Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000009Z_incremental/Table", + "/Root/.backups/collections/TestCollection/19700101000010Z_incremental/Table", + }; + + for (const auto& path : possiblePaths) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(path); + request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(edgeActor); + + if (reply->Get()->GetRecord().GetStatus() == NKikimrScheme::EStatus::StatusSuccess) { + foundIncrementalBackupPath = path; + foundIncrementalBackupTable = true; + Cerr << "Found incremental backup table at: " << path << Endl; + break; + } + } + + UNIT_ASSERT_C(foundIncrementalBackupTable, TStringBuilder() << "Could not find incremental backup table. Tried paths: " << JoinSeq(", ", possiblePaths)); + + // Now check the found incremental backup table attributes + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(foundIncrementalBackupPath); + request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), edgeActor, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(edgeActor); + + UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess); + + const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription(); + UNIT_ASSERT(pathDescription.HasTable()); + + // Verify that incremental backup table has __incremental_backup attribute + bool hasIncrementalBackupAttr = false; + bool hasAsyncReplicaAttr = false; + + for (const auto& attr : pathDescription.GetUserAttributes()) { + Cerr << "Found attribute: " << attr.GetKey() << " = " << attr.GetValue() << Endl; + if (attr.GetKey() == "__incremental_backup") { + hasIncrementalBackupAttr = true; + } + if (attr.GetKey() == "__async_replica") { + hasAsyncReplicaAttr = true; + } + } + + // Verify that we have __incremental_backup but NOT __async_replica + UNIT_ASSERT_C(hasIncrementalBackupAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must have __incremental_backup attribute"); + UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute"); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index a57ba37bf735..5f578f237404 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -57,6 +57,11 @@ void DoCreateIncrBackupTable(const TOperationId& opId, const TPath& dst, NKikimr auto& replicationConfig = *desc.MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); replicationConfig.SetConsistencyLevel(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_LEVEL_ROW); + + // Set incremental backup config so DataShard can distinguish between async replica and incremental backup + auto& incrementalBackupConfig = *desc.MutableIncrementalBackupConfig(); + incrementalBackupConfig.SetMode(NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_INCREMENTAL_BACKUP); + incrementalBackupConfig.SetConsistency(NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_WEAK); for (auto& column : *desc.MutableColumns()) { column.SetNotNull(false); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index f9698ff0067d..294749723020 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -477,7 +477,8 @@ void TPathElement::SerializeRuntimeAttrs( process(FileStoreSpaceHDD, "__filestore_space_allocated_hdd"); process(FileStoreSpaceSSDSystem, "__filestore_space_allocated_ssd_system"); - if (IsAsyncReplica) { + // Set __async_replica attribute only for true async replica tables, not for incremental backup tables + if (IsAsyncReplica && !IsIncrementalRestoreTable) { auto* attr = userAttrs->Add(); attr->SetKey(ToString(ATTR_ASYNC_REPLICA)); attr->SetValue("true"); From f58dadf9ee1fa5ef1b56b31ea9edfffa1ea985cc Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 9 Sep 2025 16:57:27 +0300 Subject: [PATCH 3/3] Fix REPLACE INTO handling in incremental backup (#24427) --- ydb/core/backup/impl/change_record.h | 42 ++-- .../datashard_ut_incremental_backup.cpp | 227 ++++++++++++++++++ 2 files changed, 254 insertions(+), 15 deletions(-) diff --git a/ydb/core/backup/impl/change_record.h b/ydb/core/backup/impl/change_record.h index 01092be8eb3b..f80f07b0df9a 100644 --- a/ydb/core/backup/impl/change_record.h +++ b/ydb/core/backup/impl/change_record.h @@ -71,19 +71,25 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { auto& upsert = *record.MutableUpsert(); switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: { TVector tags; TVector cells; NKikimrBackup::TColumnStateMap columnStateMap; - const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert(); + // Handle both Upsert and Reset operations + const bool isResetOperation = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset; + const auto& operationData = isResetOperation + ? ProtoBody.GetCdcDataChange().GetReset() + : ProtoBody.GetCdcDataChange().GetUpsert(); + TSerializedCellVec originalCells; - Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells)); + Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells)); - tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end()); + tags.assign(operationData.GetTags().begin(), operationData.GetTags().end()); cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end()); - THashSet presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end()); + THashSet presentTags(operationData.GetTags().begin(), operationData.GetTags().end()); for (const auto& [name, columnInfo] : Schema->ValueColumns) { if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") { continue; @@ -93,9 +99,9 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { columnState->SetTag(columnInfo.Tag); if (presentTags.contains(columnInfo.Tag)) { - auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag); - if (it != upsertData.GetTags().end()) { - size_t idx = std::distance(upsertData.GetTags().begin(), it); + auto it = std::find(operationData.GetTags().begin(), operationData.GetTags().end(), columnInfo.Tag); + if (it != operationData.GetTags().end()) { + size_t idx = std::distance(operationData.GetTags().begin(), it); if (idx < originalCells.GetCells().size()) { columnState->SetIsNull(originalCells.GetCells()[idx].IsNull()); } else { @@ -106,8 +112,13 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { } columnState->SetIsChanged(true); } else { - columnState->SetIsNull(false); - columnState->SetIsChanged(false); + if (isResetOperation) { + columnState->SetIsNull(true); + columnState->SetIsChanged(true); + } else { + columnState->SetIsNull(false); + columnState->SetIsChanged(false); + } } } @@ -168,7 +179,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { upsert.SetData(TSerializedCellVec::Serialize(cells)); break; } - case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]]; default: Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); } @@ -182,27 +192,29 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: { + case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kReset: { auto& upsert = *record.MutableUpsert(); - // Check if NewImage is available, otherwise fall back to Upsert + // Check if NewImage is available, otherwise fall back to Upsert/Reset if (ProtoBody.GetCdcDataChange().has_newimage()) { *upsert.MutableTags() = { ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(), ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()}; upsert.SetData(ProtoBody.GetCdcDataChange().GetNewImage().GetData()); - } else { + } else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert) { // Fallback to Upsert field if NewImage is not available *upsert.MutableTags() = { ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + } else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) { + Y_ABORT("Reset operation is not supported, all operations must be converted to Upsert"); } break; } case NKikimrChangeExchange::TDataChange::kErase: record.MutableErase(); break; - case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]]; default: Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); } diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 7e41b4c63afa..2a09f07dd937 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -65,6 +65,16 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return TShardedTableOptions(); } + TShardedTableOptions ThreeColumnTable() { + TShardedTableOptions opts; + opts.Columns_ = { + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"extra", "Uint32", false, false} + }; + return opts; + } + ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) { auto streamDesc = Ls(runtime, sender, path); @@ -169,6 +179,23 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + NKikimrChangeExchange::TChangeRecord MakeReset(ui32 key, ui32 value) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& reset = *dc.MutableReset(); + reset.AddTags(2); + reset.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + NKikimrChangeExchange::TChangeRecord MakeErase(ui32 key) { auto keyCell = TCell::Make(key); NKikimrChangeExchange::TChangeRecord proto; @@ -182,6 +209,44 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& upsert = *dc.MutableUpsert(); + for (auto tag : tags) { + upsert.AddTags(tag); + } + upsert.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + + NKikimrChangeExchange::TChangeRecord MakeResetPartial(ui32 key, ui32 value, const TVector& tags = {2}) { + auto keyCell = TCell::Make(key); + auto valueCell = TCell::Make(value); + NKikimrChangeExchange::TChangeRecord proto; + + auto& dc = *proto.MutableCdcDataChange(); + auto& dcKey = *dc.MutableKey(); + dcKey.AddTags(1); + dcKey.SetData(TSerializedCellVec::Serialize({keyCell})); + + auto& reset = *dc.MutableReset(); + for (auto tag : tags) { + reset.AddTags(tag); + } + reset.SetData(TSerializedCellVec::Serialize({valueCell})); + + return proto; + } + Y_UNIT_TEST(SimpleBackup) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) @@ -1918,6 +1983,168 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute"); } + Y_UNIT_TEST(ResetOperationIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + // Test kReset operation (REPLACE INTO) + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES + (1, 100), + (3, 300); + )"); + + WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", { + MakeReset(1, 100), + MakeReset(3, 300), + }); + } + + Y_UNIT_TEST(ReplaceIntoIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2135), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + // Insert initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + // Test multiple REPLACE operations + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES + (1, 100), + (4, 400); + )"); + + WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", { + MakeReset(1, 100), + MakeReset(4, 400), + }); + } + + Y_UNIT_TEST(ResetVsUpsertMissingColumnsTest) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2136), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable()); + + // Insert initial data with all three columns + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value, extra) VALUES + (1, 10, 100), + (2, 20, 200); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15); + )"); + + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES (2, 25); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0); + UNIT_ASSERT_VALUES_EQUAL(records.size(), 2); + + // Parse the first record (Upsert) + NKikimrChangeExchange::TChangeRecord firstRecord; + UNIT_ASSERT(firstRecord.ParseFromString(records[0].second)); + UNIT_ASSERT_C(firstRecord.GetCdcDataChange().HasUpsert(), "First record should be an upsert"); + + // Parse the second record (Reset) + NKikimrChangeExchange::TChangeRecord secondRecord; + UNIT_ASSERT(secondRecord.ParseFromString(records[1].second)); + UNIT_ASSERT_C(secondRecord.GetCdcDataChange().HasReset(), "Second record should be a reset"); + } + + Y_UNIT_TEST(ResetVsUpsertColumnStateSerialization) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2137), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + ); + + auto& runtime = *server->GetRuntime(); + const TActorId edgeActor = runtime.AllocateEdgeActor(); + + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100); + )"); + + WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table")); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15); + )"); + + ExecSQL(server, edgeActor, R"( + REPLACE INTO `/Root/Table` (key, value) VALUES (1, 25); + )"); + + SimulateSleep(server, TDuration::Seconds(2)); + + auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0); + UNIT_ASSERT_C(records.size() >= 2, "Should have at least 2 records"); + + for (size_t i = 0; i < records.size(); ++i) { + NKikimrChangeExchange::TChangeRecord parsedRecord; + UNIT_ASSERT(parsedRecord.ParseFromString(records[i].second)); + const auto& dataChange = parsedRecord.GetCdcDataChange(); + + UNIT_ASSERT_C(dataChange.HasUpsert() || dataChange.HasReset(), + "Record should be either upsert or reset operation"); + } + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr