Skip to content

Commit

Permalink
Merge bc90b90 into c709dc0
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Mar 25, 2024
2 parents c709dc0 + bc90b90 commit ab34b07
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 19 deletions.
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,11 @@ message TMoveIndex {
optional bool AllowOverwrite = 4;
}

message TSetVal {
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}

message TSequenceDescription {
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
Expand All @@ -1305,6 +1310,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
}

message TSequenceSharding {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ message TEvMarkSchemeShardPipe {
uint64 Round = 3;
}

message TSetVal {
sint64 NextValue = 1;
bool NextUsed = 2;
}

message TEvCreateSequence {
NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
Expand All @@ -40,6 +45,9 @@ message TEvCreateSequence {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
oneof OptionalSetVal {
TSetVal SetVal = 11;
}
}

message TEvCreateSequenceResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
Expand All @@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (descr.GetOmitIndexes()) {
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

auto* copySequence = scheme.MutableCopySequence();
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
}
}

Expand Down
27 changes: 24 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
event->Record.SetFrozen(true);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCoptSequence TConfigureParts ProgressState"
"TCopySequence TConfigureParts ProgressState"
<< " sending TEvCreateSequence to tablet " << tabletId
<< " operationId# " << OperationId
<< " at tablet " << ssId);
Expand Down Expand Up @@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
<< " operationId#" << OperationId;
}

void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
descr.SetStartValue(GetSequenceResult.GetStartValue());
descr.SetMinValue(GetSequenceResult.GetMinValue());
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
descr.SetCache(GetSequenceResult.GetCache());
descr.SetIncrement(GetSequenceResult.GetIncrement());
descr.SetCycle(GetSequenceResult.GetCycle());
auto* setValMsg = descr.MutableSetVal();
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
}

public:
TProposedCopySequence(TOperationId id)
: OperationId(id)
Expand Down Expand Up @@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

TPathId pathId = txState->TargetPathId;

NIceDb::TNiceDb db(context.GetDB());

auto sequenceInfo = context.SS->Sequences.at(pathId);
UpdateSequenceDescription(sequenceInfo->Description);

context.SS->PersistSequence(db, pathId, *sequenceInfo);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
return true;
Expand Down Expand Up @@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

auto getSequenceResult = ev->Get()->Record;
GetSequenceResult = ev->Get()->Record;

Y_ABORT_UNLESS(txState->Shards.size() == 1);
for (auto shard : txState->Shards) {
Expand All @@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);

auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
txState->TargetPathId, getSequenceResult);
txState->TargetPathId, GetSequenceResult);

event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
if (alterData->Description.HasSetVal()) {
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSequence TConfigureParts ProgressState"
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));

const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
if (sourcePath.IsResolved()) {
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
task.MutableTable()->CopyFrom(exportDescription);
}

task.SetSnapshotStep(exportInfo->SnapshotStep);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}
if (fromSequence.has_set_val()) {
auto* setVal = seqDesc->MutableSetVal();
setVal->SetNextUsed(fromSequence.set_val().next_used());
setVal->SetNextValue(fromSequence.set_val().next_value());
}

break;
}
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
}

i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Value;
}

i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender);
return WaitNextValResult(runtime, sender, expectedStatus);
}
}
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
i64 DoNextVal(
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

} //NSchemeShardUT_Private
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ PEERDIR(
ydb/core/tx
ydb/core/tx/datashard
ydb/core/tx/schemeshard
ydb/core/tx/sequenceproxy
ydb/core/tx/tx_allocator
ydb/core/tx/tx_proxy
ydb/public/lib/scheme_types
Expand Down
150 changes: 150 additions & 0 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,156 @@ value {
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequence) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq", Ydb::StatusIds::SCHEME_ERROR);
UNIT_ASSERT_VALUES_EQUAL(value, 2);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequenceWithOverflow) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
MinValue: 1
MaxValue: 2
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_restore/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PEERDIR(
ydb/core/wrappers/ut_helpers
ydb/core/ydb_convert
ydb/library/yql/sql/pg
ydb/library/yql/parser/pg_wrapper
ydb/library/yql/parser/pg_wrapper
)

SRCS(
Expand Down
Loading

0 comments on commit ab34b07

Please sign in to comment.