Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 95 additions & 40 deletions ydb/core/backup/impl/change_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/change_exchange/change_record.h>
#include <ydb/core/protos/change_exchange.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/protos/datashard_backup.pb.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/tx/replication/service/lightweight_schema.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
Expand Down Expand Up @@ -70,62 +71,114 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
auto& upsert = *record.MutableUpsert();

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
// Check if NewImage is available, otherwise fall back to Upsert
if (ProtoBody.GetCdcDataChange().HasNewImage()) {
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()};
auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation");
upsert.AddTags(it->second.Tag);

TString serializedCellVec = ProtoBody.GetCdcDataChange().GetNewImage().GetData();
Y_ABORT_UNLESS(
TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec),
"Invalid cell format, can't append cells");
case NKikimrChangeExchange::TDataChange::kUpsert:
case NKikimrChangeExchange::TDataChange::kReset: {
TVector<NTable::TTag> tags;
TVector<TCell> cells;
NKikimrBackup::TColumnStateMap columnStateMap;

// Handle both Upsert and Reset operations
const bool isResetOperation = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset;
const auto& operationData = isResetOperation
? ProtoBody.GetCdcDataChange().GetReset()
: ProtoBody.GetCdcDataChange().GetUpsert();

TSerializedCellVec originalCells;
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells));

tags.assign(operationData.GetTags().begin(), operationData.GetTags().end());
cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end());

THashSet<NTable::TTag> presentTags(operationData.GetTags().begin(), operationData.GetTags().end());
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
continue;
}

auto* columnState = columnStateMap.AddColumnStates();
columnState->SetTag(columnInfo.Tag);

if (presentTags.contains(columnInfo.Tag)) {
auto it = std::find(operationData.GetTags().begin(), operationData.GetTags().end(), columnInfo.Tag);
if (it != operationData.GetTags().end()) {
size_t idx = std::distance(operationData.GetTags().begin(), it);
if (idx < originalCells.GetCells().size()) {
columnState->SetIsNull(originalCells.GetCells()[idx].IsNull());
} else {
columnState->SetIsNull(true);
}
} else {
columnState->SetIsNull(true);
}
columnState->SetIsChanged(true);
} else {
if (isResetOperation) {
columnState->SetIsNull(true);
columnState->SetIsChanged(true);
} else {
columnState->SetIsNull(false);
columnState->SetIsChanged(false);
}
}
}

upsert.SetData(serializedCellVec);
} else {
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation");
upsert.AddTags(it->second.Tag);
auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation");
tags.push_back(deletedIt->second.Tag);
cells.emplace_back(TCell::Make<bool>(false));

TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData();
Y_ABORT_UNLESS(
TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec),
"Invalid cell format, can't append cells");
auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates");
Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation");
tags.push_back(columnStatesIt->second.Tag);

TString serializedColumnState;
Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState));
cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size()));

upsert.SetData(serializedCellVec);
}
*upsert.MutableTags() = {tags.begin(), tags.end()};
upsert.SetData(TSerializedCellVec::Serialize(cells));
break;
}
case NKikimrChangeExchange::TDataChange::kErase: {
size_t size = Schema->ValueColumns.size();
TVector<NTable::TTag> tags;
TVector<TCell> cells;
NKikimrBackup::TColumnStateMap columnStateMap;

tags.reserve(size);
cells.reserve(size);

for (const auto& [name, value] : Schema->ValueColumns) {
tags.push_back(value.Tag);
if (name != "__ydb_incrBackupImpl_deleted") {
cells.emplace_back();
} else {
cells.emplace_back(TCell::Make<bool>(true));
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
continue;
}

tags.push_back(columnInfo.Tag);
cells.emplace_back();

auto* columnState = columnStateMap.AddColumnStates();
columnState->SetTag(columnInfo.Tag);
columnState->SetIsNull(true);
columnState->SetIsChanged(true);
}

auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation");
tags.push_back(deletedIt->second.Tag);
cells.emplace_back(TCell::Make<bool>(true));

auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates");
Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation");
tags.push_back(columnStatesIt->second.Tag);

TString serializedColumnState;
Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState));
cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size()));

*upsert.MutableTags() = {tags.begin(), tags.end()};
upsert.SetData(TSerializedCellVec::Serialize(cells));

break;
}
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}
Expand All @@ -139,27 +192,29 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
case NKikimrChangeExchange::TDataChange::kUpsert:
case NKikimrChangeExchange::TDataChange::kReset: {
auto& upsert = *record.MutableUpsert();
// Check if NewImage is available, otherwise fall back to Upsert
// Check if NewImage is available, otherwise fall back to Upsert/Reset
if (ProtoBody.GetCdcDataChange().has_newimage()) {
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetNewImage().GetData());
} else {
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert) {
// Fallback to Upsert field if NewImage is not available
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) {
Y_ABORT("Reset operation is not supported, all operations must be converted to Upsert");
}
break;
}
case NKikimrChangeExchange::TDataChange::kErase:
record.MutableErase();
break;
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}
Expand Down
71 changes: 50 additions & 21 deletions ydb/core/backup/impl/table_writer_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "change_record.h"
#include "table_writer.h"

#include <ydb/core/protos/datashard_backup.pb.h>
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NBackup::NImpl {
Expand All @@ -19,6 +20,10 @@ Y_UNIT_TEST_SUITE(TableWriter) {
.Tag = 123,
.Type = NScheme::TTypeInfo{NScheme::NTypeIds::Bool},
});
schema->ValueColumns.emplace("__ydb_incrBackupImpl_columnStates", TLightweightSchema::TColumn{
.Tag = 124,
.Type = NScheme::TTypeInfo{NScheme::NTypeIds::String},
});

{
NKikimrChangeExchange::TChangeRecord changeRecord;
Expand Down Expand Up @@ -51,18 +56,26 @@ Y_UNIT_TEST_SUITE(TableWriter) {
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
record->Serialize(result, EWriterType::Backup);

TVector<TCell> outCells{
TCell::Make<ui64>(4567),
TCell::Make<bool>(false),
};

TString out = TSerializedCellVec::Serialize(outCells);

UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
// The serialization logic is complex, so let's just use the actual result
// and verify the structure is correct by parsing it back
TSerializedCellVec resultCells;
UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells));
UNIT_ASSERT(resultCells.GetCells().size() == 3);

// Verify the first cell is the value
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[0].AsValue<ui64>(), 4567);

// Verify the second cell is the deleted flag
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue<bool>(), false);

// Verify the third cell contains a valid column state map
NKikimrBackup::TColumnStateMap actualColumnState;
TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size());
UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState));
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1);
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1);
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), false);
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true);
}

{
Expand Down Expand Up @@ -91,18 +104,34 @@ Y_UNIT_TEST_SUITE(TableWriter) {
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
record->Serialize(result, EWriterType::Backup);

TVector<TCell> outCells{
TCell(),
TCell::Make<bool>(true),
};

TString out = TSerializedCellVec::Serialize(outCells);
// The serialization logic is complex, so let's just verify the structure
// and content rather than exact binary encoding
TSerializedCellVec resultCells;
UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells));
UNIT_ASSERT(resultCells.GetCells().size() == 3);

// For erase records, the first cell should be null/empty
UNIT_ASSERT(resultCells.GetCells()[0].IsNull());

// Verify the second cell is the deleted flag (true for erase)
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue<bool>(), true);

// Verify the third cell contains a valid column state map
NKikimrBackup::TColumnStateMap actualColumnState;
TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size());
UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState));
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1);
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1);
// For erase records, all columns are changed (set to null), so IsChanged should be true
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true);
// For erase records, all columns are set to null
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), true);

UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
UNIT_ASSERT(result.GetUpsert().TagsSize() == 3);
UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
UNIT_ASSERT(result.GetUpsert().GetTags(2) == 124);
}
}

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/protos/datashard_backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@ message TChecksumState {
message TS3DownloadState {
optional bytes EncryptedDeserializerState = 1 [(Ydb.sensitive) = true]; // Contains secure key
}

message TColumnStateMap {
message TColumnState {
optional uint32 Tag = 1;
optional bool IsNull = 2;
optional bool IsChanged = 3;
}
repeated TColumnState ColumnStates = 1;
}
Loading
Loading