Skip to content

Commit

Permalink
Merge 46e1e9c into 52ef3f3
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Mar 26, 2024
2 parents 52ef3f3 + 46e1e9c commit f28e33e
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 25 deletions.
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,10 @@ message TMoveIndex {
}

message TSequenceDescription {
message TSetVal {
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
optional uint64 Version = 3; // incremented every time sequence is altered
Expand All @@ -1305,6 +1309,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
}

message TSequenceSharding {
Expand Down Expand Up @@ -1613,6 +1618,7 @@ message TDescribeOptions {
optional bool ShowPrivateTable = 7 [default = false];
optional bool ReturnChannelsBinding = 8 [default = false];
optional bool ReturnRangeKey = 9 [default = true];
optional bool ReturnSetVal = 10 [default = false];
}

// Request to read scheme for a specific path
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ message TEvMarkSchemeShardPipe {
}

message TEvCreateSequence {
message TSetVal {
sint64 NextValue = 1;
bool NextUsed = 2;
}

NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
Expand All @@ -40,6 +45,7 @@ message TEvCreateSequence {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
TSetVal SetVal = 11;
}

message TEvCreateSequenceResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
Expand All @@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (descr.GetOmitIndexes()) {
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

auto* copySequence = scheme.MutableCopySequence();
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
}
}

Expand Down
27 changes: 24 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
event->Record.SetFrozen(true);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCoptSequence TConfigureParts ProgressState"
"TCopySequence TConfigureParts ProgressState"
<< " sending TEvCreateSequence to tablet " << tabletId
<< " operationId# " << OperationId
<< " at tablet " << ssId);
Expand Down Expand Up @@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
<< " operationId#" << OperationId;
}

void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
descr.SetStartValue(GetSequenceResult.GetStartValue());
descr.SetMinValue(GetSequenceResult.GetMinValue());
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
descr.SetCache(GetSequenceResult.GetCache());
descr.SetIncrement(GetSequenceResult.GetIncrement());
descr.SetCycle(GetSequenceResult.GetCycle());
auto* setValMsg = descr.MutableSetVal();
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
}

public:
TProposedCopySequence(TOperationId id)
: OperationId(id)
Expand Down Expand Up @@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

TPathId pathId = txState->TargetPathId;

NIceDb::TNiceDb db(context.GetDB());

auto sequenceInfo = context.SS->Sequences.at(pathId);
UpdateSequenceDescription(sequenceInfo->Description);

context.SS->PersistSequence(db, pathId, *sequenceInfo);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
return true;
Expand Down Expand Up @@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

auto getSequenceResult = ev->Get()->Record;
GetSequenceResult = ev->Get()->Record;

Y_ABORT_UNLESS(txState->Shards.size() == 1);
for (auto shard : txState->Shards) {
Expand All @@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);

auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
txState->TargetPathId, getSequenceResult);
txState->TargetPathId, GetSequenceResult);

event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
if (alterData->Description.HasSetVal()) {
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSequence TConfigureParts ProgressState"
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static NKikimrSchemeOp::TPathDescription GetTableDescription(TSchemeShard* ss, c
opts.SetReturnPartitioningInfo(false);
opts.SetReturnPartitionConfig(true);
opts.SetReturnBoundaries(true);
opts.SetReturnSetVal(true);

auto desc = DescribePath(ss, TlsActivationContext->AsActorContext(), pathId, opts);
auto record = desc->GetRecord();
Expand Down Expand Up @@ -106,8 +107,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));

const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
if (sourcePath.IsResolved()) {
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
task.MutableTable()->CopyFrom(exportDescription);
}

task.SetSnapshotStep(exportInfo->SnapshotStep);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,10 @@ class TSchemeShard
void DescribeTableIndex(const TPathId& pathId, const TString& name, TTableIndexInfo::TPtr indexInfo, NKikimrSchemeOp::TIndexDescription& entry);
void DescribeCdcStream(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TCdcStreamDescription& desc);
void DescribeCdcStream(const TPathId& pathId, const TString& name, TCdcStreamInfo::TPtr info, NKikimrSchemeOp::TCdcStreamDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TSequenceDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info, NKikimrSchemeOp::TSequenceDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name,
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
void DescribeReplication(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TReplicationDescription& desc);
void DescribeReplication(const TPathId& pathId, const TString& name, TReplicationInfo::TPtr info, NKikimrSchemeOp::TReplicationDescription& desc);
void DescribeBlobDepot(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TBlobDepotDescription& desc);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}
if (fromSequence.has_set_val()) {
auto* setVal = seqDesc->MutableSetVal();
setVal->SetNextUsed(fromSequence.set_val().next_used());
setVal->SetNextValue(fromSequence.set_val().next_value());
}

break;
}
Expand Down
13 changes: 9 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
bool returnBackupInfo = Params.GetBackupInfo();
bool returnBoundaries = false;
bool returnRangeKey = true;
bool returnSetVal = Params.GetOptions().GetReturnSetVal();
if (Params.HasOptions()) {
returnConfig = Params.GetOptions().GetReturnPartitionConfig();
returnPartitioning = Params.GetOptions().GetReturnPartitioningInfo();
Expand Down Expand Up @@ -361,7 +362,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
Self->DescribeCdcStream(childPathId, childName, *entry->AddCdcStreams());
break;
case NKikimrSchemeOp::EPathTypeSequence:
Self->DescribeSequence(childPathId, childName, *entry->AddSequences());
Self->DescribeSequence(childPathId, childName, *entry->AddSequences(), returnSetVal);
break;
default:
Y_FAIL_S("Unexpected table's child"
Expand Down Expand Up @@ -1241,24 +1242,28 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
}

void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name,
NKikimrSchemeOp::TSequenceDescription& desc)
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
{
auto it = Sequences.find(pathId);
Y_VERIFY_S(it != Sequences.end(), "Sequence not found"
<< " pathId# " << pathId
<< " name# " << name);
DescribeSequence(pathId, name, it->second, desc);
DescribeSequence(pathId, name, it->second, desc, fillSetVal);
}

void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
NKikimrSchemeOp::TSequenceDescription& desc)
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
{
Y_VERIFY_S(info, "Empty sequence info"
<< " pathId# " << pathId
<< " name# " << name);

desc = info->Description;

if (!fillSetVal) {
desc.ClearSetVal();
}

desc.SetName(name);
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetVersion(info->AlterVersion);
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
}

i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Value;
}

i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender);
return WaitNextValResult(runtime, sender, expectedStatus);
}
}
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
i64 DoNextVal(
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

} //NSchemeShardUT_Private
Loading

0 comments on commit f28e33e

Please sign in to comment.