Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions ydb/core/backup/impl/change_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,25 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
auto& upsert = *record.MutableUpsert();

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
case NKikimrChangeExchange::TDataChange::kUpsert:
case NKikimrChangeExchange::TDataChange::kReset: {
TVector<NTable::TTag> tags;
TVector<TCell> cells;
NKikimrBackup::TColumnStateMap columnStateMap;

const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert();
// Handle both Upsert and Reset operations
const bool isResetOperation = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset;
const auto& operationData = isResetOperation
? ProtoBody.GetCdcDataChange().GetReset()
: ProtoBody.GetCdcDataChange().GetUpsert();

TSerializedCellVec originalCells;
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells));
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells));

tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end());
tags.assign(operationData.GetTags().begin(), operationData.GetTags().end());
cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end());

THashSet<NTable::TTag> presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end());
THashSet<NTable::TTag> presentTags(operationData.GetTags().begin(), operationData.GetTags().end());
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
continue;
Expand All @@ -93,9 +99,9 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
columnState->SetTag(columnInfo.Tag);

if (presentTags.contains(columnInfo.Tag)) {
auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag);
if (it != upsertData.GetTags().end()) {
size_t idx = std::distance(upsertData.GetTags().begin(), it);
auto it = std::find(operationData.GetTags().begin(), operationData.GetTags().end(), columnInfo.Tag);
if (it != operationData.GetTags().end()) {
size_t idx = std::distance(operationData.GetTags().begin(), it);
if (idx < originalCells.GetCells().size()) {
columnState->SetIsNull(originalCells.GetCells()[idx].IsNull());
} else {
Expand All @@ -106,8 +112,13 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
}
columnState->SetIsChanged(true);
} else {
columnState->SetIsNull(false);
columnState->SetIsChanged(false);
if (isResetOperation) {
columnState->SetIsNull(true);
columnState->SetIsChanged(true);
} else {
columnState->SetIsNull(false);
columnState->SetIsChanged(false);
}
}
}

Expand Down Expand Up @@ -168,7 +179,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
upsert.SetData(TSerializedCellVec::Serialize(cells));
break;
}
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}
Expand All @@ -182,27 +192,29 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
case NKikimrChangeExchange::TDataChange::kUpsert:
case NKikimrChangeExchange::TDataChange::kReset: {
auto& upsert = *record.MutableUpsert();
// Check if NewImage is available, otherwise fall back to Upsert
// Check if NewImage is available, otherwise fall back to Upsert/Reset
if (ProtoBody.GetCdcDataChange().has_newimage()) {
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetNewImage().GetData());
} else {
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert) {
// Fallback to Upsert field if NewImage is not available
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) {
Y_ABORT("Reset operation is not supported, all operations must be converted to Upsert");
}
break;
}
case NKikimrChangeExchange::TDataChange::kErase:
record.MutableErase();
break;
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}
Expand Down
227 changes: 227 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
return TShardedTableOptions();
}

TShardedTableOptions ThreeColumnTable() {
TShardedTableOptions opts;
opts.Columns_ = {
{"key", "Uint32", true, false},
{"value", "Uint32", false, false},
{"extra", "Uint32", false, false}
};
return opts;
}

ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
auto streamDesc = Ls(runtime, sender, path);

Expand Down Expand Up @@ -169,6 +179,23 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
return proto;
}

NKikimrChangeExchange::TChangeRecord MakeReset(ui32 key, ui32 value) {
auto keyCell = TCell::Make<ui32>(key);
auto valueCell = TCell::Make<ui32>(value);
NKikimrChangeExchange::TChangeRecord proto;

auto& dc = *proto.MutableCdcDataChange();
auto& dcKey = *dc.MutableKey();
dcKey.AddTags(1);
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));

auto& reset = *dc.MutableReset();
reset.AddTags(2);
reset.SetData(TSerializedCellVec::Serialize({valueCell}));

return proto;
}

NKikimrChangeExchange::TChangeRecord MakeErase(ui32 key) {
auto keyCell = TCell::Make<ui32>(key);
NKikimrChangeExchange::TChangeRecord proto;
Expand All @@ -182,6 +209,44 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
return proto;
}

NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector<ui32>& tags = {2}) {
auto keyCell = TCell::Make<ui32>(key);
auto valueCell = TCell::Make<ui32>(value);
NKikimrChangeExchange::TChangeRecord proto;

auto& dc = *proto.MutableCdcDataChange();
auto& dcKey = *dc.MutableKey();
dcKey.AddTags(1);
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));

auto& upsert = *dc.MutableUpsert();
for (auto tag : tags) {
upsert.AddTags(tag);
}
upsert.SetData(TSerializedCellVec::Serialize({valueCell}));

return proto;
}

NKikimrChangeExchange::TChangeRecord MakeResetPartial(ui32 key, ui32 value, const TVector<ui32>& tags = {2}) {
auto keyCell = TCell::Make<ui32>(key);
auto valueCell = TCell::Make<ui32>(value);
NKikimrChangeExchange::TChangeRecord proto;

auto& dc = *proto.MutableCdcDataChange();
auto& dcKey = *dc.MutableKey();
dcKey.AddTags(1);
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));

auto& reset = *dc.MutableReset();
for (auto tag : tags) {
reset.AddTags(tag);
}
reset.SetData(TSerializedCellVec::Serialize({valueCell}));

return proto;
}

Y_UNIT_TEST(SimpleBackup) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down Expand Up @@ -1918,6 +1983,168 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute");
}

Y_UNIT_TEST(ResetOperationIncrementalBackup) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
);

auto& runtime = *server->GetRuntime();
const TActorId edgeActor = runtime.AllocateEdgeActor();

InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
(2, 20);
)");

WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));

// Test kReset operation (REPLACE INTO)
ExecSQL(server, edgeActor, R"(
REPLACE INTO `/Root/Table` (key, value) VALUES
(1, 100),
(3, 300);
)");

WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", {
MakeReset(1, 100),
MakeReset(3, 300),
});
}

Y_UNIT_TEST(ReplaceIntoIncrementalBackup) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2135), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
);

auto& runtime = *server->GetRuntime();
const TActorId edgeActor = runtime.AllocateEdgeActor();

InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

// Insert initial data
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES
(1, 10),
(2, 20),
(3, 30);
)");

WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));

// Test multiple REPLACE operations
ExecSQL(server, edgeActor, R"(
REPLACE INTO `/Root/Table` (key, value) VALUES
(1, 100),
(4, 400);
)");

WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", {
MakeReset(1, 100),
MakeReset(4, 400),
});
}

Y_UNIT_TEST(ResetVsUpsertMissingColumnsTest) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2136), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
);

auto& runtime = *server->GetRuntime();
const TActorId edgeActor = runtime.AllocateEdgeActor();

InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable());

// Insert initial data with all three columns
ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value, extra) VALUES
(1, 10, 100),
(2, 20, 200);
)");

WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15);
)");

ExecSQL(server, edgeActor, R"(
REPLACE INTO `/Root/Table` (key, value) VALUES (2, 25);
)");

SimulateSleep(server, TDuration::Seconds(1));

auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0);
UNIT_ASSERT_VALUES_EQUAL(records.size(), 2);

// Parse the first record (Upsert)
NKikimrChangeExchange::TChangeRecord firstRecord;
UNIT_ASSERT(firstRecord.ParseFromString(records[0].second));
UNIT_ASSERT_C(firstRecord.GetCdcDataChange().HasUpsert(), "First record should be an upsert");

// Parse the second record (Reset)
NKikimrChangeExchange::TChangeRecord secondRecord;
UNIT_ASSERT(secondRecord.ParseFromString(records[1].second));
UNIT_ASSERT_C(secondRecord.GetCdcDataChange().HasReset(), "Second record should be a reset");
}

Y_UNIT_TEST(ResetVsUpsertColumnStateSerialization) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2137), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
);

auto& runtime = *server->GetRuntime();
const TActorId edgeActor = runtime.AllocateEdgeActor();

InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable());

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100);
)");

WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15);
)");

ExecSQL(server, edgeActor, R"(
REPLACE INTO `/Root/Table` (key, value) VALUES (1, 25);
)");

SimulateSleep(server, TDuration::Seconds(2));

auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0);
UNIT_ASSERT_C(records.size() >= 2, "Should have at least 2 records");

for (size_t i = 0; i < records.size(); ++i) {
NKikimrChangeExchange::TChangeRecord parsedRecord;
UNIT_ASSERT(parsedRecord.ParseFromString(records[i].second));
const auto& dataChange = parsedRecord.GetCdcDataChange();

UNIT_ASSERT_C(dataChange.HasUpsert() || dataChange.HasReset(),
"Record should be either upsert or reset operation");
}
}

} // Y_UNIT_TEST_SUITE(IncrementalBackup)

} // NKikimr
Loading