From 82781e8d0cf8710bb6804c98599f99de12f78a7f Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 19 Apr 2024 16:35:21 +0000 Subject: [PATCH 1/5] Scheme shard support for cdc stream per index --- .../run/kikimr_services_initializers.cpp | 2 - ydb/core/protos/flat_scheme_op.proto | 5 + ...hemeshard__operation_create_cdc_stream.cpp | 218 ++++++++++++++---- .../schemeshard__operation_create_lock.cpp | 14 +- .../schemeshard/schemeshard_path_element.cpp | 2 +- .../ut_cdc_stream/ut_cdc_stream.cpp | 23 ++ .../ut_cdc_stream_reboots.cpp | 54 ++++- 7 files changed, 254 insertions(+), 64 deletions(-) diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index bf9e9c1d8289..074d8bee4cf4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -236,8 +236,6 @@ #include -#include - namespace NKikimr { namespace NKikimrServicesInitializers { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index d275c590a612..ae02e266c583 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -827,6 +827,10 @@ message TCreateCdcStream { optional TCdcStreamDescription StreamDescription = 2; optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default optional uint32 TopicPartitions = 4; + oneof IndexMode { + bool AllIndexes = 5; // Create topic per each index + string IndexName = 6; + } } message TAlterCdcStream { @@ -1524,6 +1528,7 @@ message TIndexBuildControl { message TLockConfig { optional string Name = 1; + optional bool AllowIndexImplLock = 2; } message TLockGuard { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 874a2836293e..12a0a47f3c48 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -105,22 +105,38 @@ class TNewCdcStream: public TSubOperation { } } + TString BuildWorkingDir() const { + if (Transaction.GetCreateCdcStream().HasIndexName()) { + return Transaction.GetWorkingDir() + "/" + + Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable"; + } else { + return Transaction.GetWorkingDir(); + } + } + public: using TSubOperation::TSubOperation; THolder Propose(const TString& owner, TOperationContext& context) override { - const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); const auto acceptExisted = !Transaction.GetFailOnExist(); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + + if (op.HasAllIndexes()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Illigal part operation with all indexes flag"); + return result; + } + + const auto& workingDir = BuildWorkingDir(); + LOG_N("TNewCdcStream Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << streamName); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - const auto tablePath = TPath::Resolve(workingDir, context.SS); { const auto checks = tablePath.Check(); @@ -130,11 +146,17 @@ class TNewCdcStream: public TSubOperation { .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() - .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting(); + if (op.HasIndexName() && op.GetIndexName()) { + checks.IsInsideTableIndexPath(); + } else { + checks + .IsTable() + .IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -507,17 +529,35 @@ class TNewCdcStreamAtTable: public TSubOperation { } THolder Propose(const TString&, TOperationContext& context) override { - const auto& workingDir = Transaction.GetWorkingDir(); + auto workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); - const auto& tableName = op.GetTableName(); + auto tableName = op.GetTableName(); const auto& streamName = op.GetStreamDescription().GetName(); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + bool isIndexTable = false; + + if (op.HasAllIndexes()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Illigal part operation with all indexes flag"); + return result; + } + + if (op.HasIndexName()) { + if (!op.GetIndexName()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Unexpected empty index name"); + return result; + } + isIndexTable = true; + workingDir += ("/" + tableName + "/" + op.GetIndexName()); + tableName = "indexImplTable"; + } + LOG_N("TNewCdcStreamAtTable Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << tableName << "/" << streamName); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - const auto workingDirPath = TPath::Resolve(workingDir, context.SS); { const auto checks = workingDirPath.Check(); @@ -526,10 +566,15 @@ class TNewCdcStreamAtTable: public TSubOperation { .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() - .IsCommonSensePath() .IsLikeDirectory() .NotUnderDeleting(); + if (isIndexTable) { + checks.IsInsideTableIndexPath(); + } else { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -547,10 +592,12 @@ class TNewCdcStreamAtTable: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting(); if (checks) { + if (!isIndexTable) { + checks.IsCommonSensePath(); + } if (InitialScan) { checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op } else { @@ -632,17 +679,18 @@ class TNewCdcStreamAtTable: public TSubOperation { private: const bool InitialScan; - }; // TNewCdcStreamAtTable -void DoCreateLock(const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, +void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, TVector& result) { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); outTx.SetFailOnExist(false); outTx.SetInternal(true); - outTx.MutableLockConfig()->SetName(tablePath.LeafName()); + auto cfg = outTx.MutableLockConfig(); + cfg->SetName(tablePath.LeafName()); + cfg->SetAllowIndexImplLock(allowIndexImplLock); result.push_back(CreateLock(NextPartId(opId, result), outTx)); } @@ -704,30 +752,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); } +void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan) +{ + outTx.SetFailOnExist(!acceptExisted); + outTx.MutableCreateCdcStream()->CopyFrom(op); + if (indexName) { + outTx.MutableCreateCdcStream()->SetIndexName(indexName); + } else { + outTx.MutableCreateCdcStream()->ClearIndexMode(); + } + + if (initialScan) { + outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); + } +} + void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, - const bool acceptExisted, const bool initialScan, TVector& result) + const bool acceptExisted, const bool initialScan, const TString& indexName, TVector& result) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); - outTx.SetFailOnExist(!acceptExisted); - outTx.MutableCreateCdcStream()->CopyFrom(op); - - if (initialScan) { - outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); - } - + FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); } { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); - outTx.SetFailOnExist(!acceptExisted); - outTx.MutableCreateCdcStream()->CopyFrom(op); - - if (initialScan) { - outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); - } - + FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan)); } } @@ -785,6 +837,18 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } +void CalcBoundaries(const TTableInfo::TPtr table, TVector& boundaries) { + const auto& partitions = table->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } +} + } // anonymous std::variant DoNewStreamPathChecks( @@ -889,15 +953,84 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran << "Initial scan is not supported yet")}; } - Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto table = context.SS->Tables.at(tablePath.Base()->PathId); - - TVector boundaries; if (op.HasTopicPartitions()) { if (op.GetTopicPartitions() <= 0) { return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; } + } + + std::vector> indexes; + + if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) { + indexes.reserve(tablePath->GetChildren().size()); + for (const auto& child : tablePath->GetChildren()) { + indexes.emplace_back(child); + } + } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { + auto it = tablePath->GetChildren().find(op.GetIndexName()); + if (it == tablePath->GetChildren().end()) { + return {CreateReject(opId, NKikimrScheme::StatusSchemeError, + "requested particular index hasn't been found")}; + } + indexes.emplace_back(*it); + } + + TVector result; + + for (const auto& child : indexes) { + const auto& name = child.first; + const TPath indexPath = tablePath.Child(name); + if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { + continue; + } + + const TPath indexImplPath = indexPath.Child("indexImplTable"); + if (!indexImplPath) { + return {CreateReject(opId, NKikimrScheme::StatusSchemeError, + "indexImplTable hasn't been found")}; + } + + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto indexImpltable = context.SS->Tables.at(indexImplPath.Base()->PathId); + + const TPath indexStreamPath = indexImplPath.Child(streamName); + if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { + return {reject}; + } + + if (initialScan) { + DoCreateLock(opId, indexPath, indexImplPath, true, result); + } + + TVector boundaries; + if (op.HasTopicPartitions()) { + const auto& keyColumns = indexImpltable->KeyColumnIds; + const auto& columns = indexImpltable->Columns; + + Y_ABORT_UNLESS(!keyColumns.empty()); + Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); + const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + + if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; + } + } else { + CalcBoundaries(indexImpltable, boundaries); + } + + DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); + DoCreatePqPart(opId, indexStreamPath, streamName, indexImpltable, op, boundaries, acceptExisted, result); + } + + if (initialScan) { + DoCreateLock(opId, workingDirPath, tablePath, false, result); + } + + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + TVector boundaries; + if (op.HasTopicPartitions()) { const auto& keyColumns = table->KeyColumnIds; const auto& columns = table->Columns; @@ -909,26 +1042,11 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } } else { - const auto& partitions = table->GetPartitions(); - boundaries.reserve(partitions.size() - 1); - - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& partition = partitions.at(i); - if (i != partitions.size() - 1) { - boundaries.push_back(partition.EndOfRange); - } - } + CalcBoundaries(table, boundaries); } - TVector result; - - if (initialScan) { - DoCreateLock(opId, workingDirPath, tablePath, result); - } - - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result); + DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result); DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result); - return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp index 1d293cf020f5..8ec987591691 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp @@ -124,9 +124,14 @@ class TCreateLock: public TSubOperation { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() .IsLikeDirectory(); + if (op.GetAllowIndexImplLock()) { + checks.IsInsideTableIndexPath(); + } else { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -143,8 +148,11 @@ class TCreateLock: public TSubOperation { .NotUnderDeleting() .NotUnderOperation() .IsTable() - .NotAsyncReplicaTable() - .IsCommonSensePath(); + .NotAsyncReplicaTable(); + + if (!op.GetAllowIndexImplLock()) { + checks.IsCommonSensePath(); + } if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index a76144951927..dc11bfecd01e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -187,7 +187,7 @@ bool TPathElement::IsContainer() const { } bool TPathElement::IsLikeDirectory() const { - return IsDirectory() || IsDomainRoot() || IsOlapStore(); + return IsDirectory() || IsDomainRoot() || IsOlapStore() || IsTableIndex(); } bool TPathElement::HasActiveChanges() const { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index c07870e51fd4..f2cf5f8d5fb0 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -617,6 +617,29 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "StreamWithIndex" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + IndexName: "NotExistedIndex" + )", {NKikimrScheme::StatusSchemeError}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "StreamWithIndex" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + IndexName: "Index" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/StreamWithIndex/streamImpl"), {NLs::PathExist}); + TestDropTable(runtime, ++txId, "/MyRoot", "Table"); env.TestWaitNotification(runtime, txId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 7310f0baeef1..b65133a291ea 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -6,9 +6,29 @@ using namespace NSchemeShardUT_Private; +static const TString createTableProto = R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] +)"; + +static const TString createTableWithIndexProto = R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["value"] + } +)"; + Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { template - void CreateStream(const TMaybe& state = Nothing(), bool vt = false) { + void CreateStream(const TMaybe& state = Nothing(), bool vt = false, bool tableWithIndex = false) { T t; t.GetTestEnvOptions().EnableChangefeedInitialScan(true); @@ -16,13 +36,11 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - - TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - )"); + if (tableWithIndex) { + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", createTableWithIndexProto); + } else { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", createTableProto); + } t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -43,6 +61,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( TableName: "Table" StreamDescription { %s } + AllIndexes: true )", strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -50,6 +69,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { NLs::PathExist, NLs::StreamVirtualTimestamps(vt), }); + + if (tableWithIndex) { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { + NLs::PathExist, + NLs::StreamVirtualTimestamps(vt), + }); + } }); } @@ -57,14 +83,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream(); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamTableWithIndex) { + CreateStream(Nothing(), false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) { CreateStream(NKikimrSchemeOp::ECdcStreamStateReady); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReadyTableWithIndex) { + CreateStream(NKikimrSchemeOp::ECdcStreamStateReady, false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScan) { CreateStream(NKikimrSchemeOp::ECdcStreamStateScan); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScanTableWithIndex) { + CreateStream(NKikimrSchemeOp::ECdcStreamStateScan, false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithVirtualTimestamps) { CreateStream({}, true); } From 3dbdd9880ce7cf8630351cb86fe04e2a61e39d2e Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 21 May 2024 11:40:11 +0000 Subject: [PATCH 2/5] Clean the code --- ...hemeshard__operation_create_cdc_stream.cpp | 70 +++++++++---------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 12a0a47f3c48..32172f6dd327 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -837,8 +837,8 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } -void CalcBoundaries(const TTableInfo::TPtr table, TVector& boundaries) { - const auto& partitions = table->GetPartitions(); +void CalcBoundaries(const TTableInfo& table, TVector& boundaries) { + const auto& partitions = table.GetPartitions(); boundaries.reserve(partitions.size() - 1); for (ui32 i = 0; i < partitions.size(); ++i) { @@ -849,6 +849,24 @@ void CalcBoundaries(const TTableInfo::TPtr table, TVector& boundaries) } } +bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector& boundaries, TString& errStr) { + if (op.HasTopicPartitions()) { + const auto& keyColumns = table.KeyColumnIds; + const auto& columns = table.Columns; + + Y_ABORT_UNLESS(!keyColumns.empty()); + Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); + const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + + if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { + return false; + } + } else { + CalcBoundaries(table, boundaries); + } + return true; +} + } // anonymous std::variant DoNewStreamPathChecks( @@ -959,27 +977,25 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } } - std::vector> indexes; + std::vector candidates; if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) { - indexes.reserve(tablePath->GetChildren().size()); + candidates.reserve(tablePath->GetChildren().size()); for (const auto& child : tablePath->GetChildren()) { - indexes.emplace_back(child); + candidates.emplace_back(child.first); } } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { auto it = tablePath->GetChildren().find(op.GetIndexName()); if (it == tablePath->GetChildren().end()) { return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "requested particular index hasn't been found")}; + "requested particular path hasn't been found")}; } - indexes.emplace_back(*it); + candidates.emplace_back(it->first); } TVector result; - for (const auto& child : indexes) { - const auto& name = child.first; - + for (const auto& name : candidates) { const TPath indexPath = tablePath.Child(name); if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { continue; @@ -992,7 +1008,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto indexImpltable = context.SS->Tables.at(indexImplPath.Base()->PathId); + auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId); const TPath indexStreamPath = indexImplPath.Child(streamName); if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { @@ -1004,23 +1020,12 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } TVector boundaries; - if (op.HasTopicPartitions()) { - const auto& keyColumns = indexImpltable->KeyColumnIds; - const auto& columns = indexImpltable->Columns; - - Y_ABORT_UNLESS(!keyColumns.empty()); - Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); - const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; - - if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - } else { - CalcBoundaries(indexImpltable, boundaries); + if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); - DoCreatePqPart(opId, indexStreamPath, streamName, indexImpltable, op, boundaries, acceptExisted, result); + DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result); } if (initialScan) { @@ -1030,19 +1035,8 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); TVector boundaries; - if (op.HasTopicPartitions()) { - const auto& keyColumns = table->KeyColumnIds; - const auto& columns = table->Columns; - - Y_ABORT_UNLESS(!keyColumns.empty()); - Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); - const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; - - if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - } else { - CalcBoundaries(table, boundaries); + if (!FillBoundaries(*table, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result); From 4af112109590097d1226aefde4f95fad178c8c5a Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 21 May 2024 11:45:12 +0000 Subject: [PATCH 3/5] fix --- .../tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 32172f6dd327..cda337d141db 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -988,7 +988,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran auto it = tablePath->GetChildren().find(op.GetIndexName()); if (it == tablePath->GetChildren().end()) { return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "requested particular path hasn't been found")}; + "requested particular path hasn't been found")}; } candidates.emplace_back(it->first); } From 2d63f85d27f8de49b56cb7e7cfb3f37790a0e572 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 21 May 2024 12:53:29 +0000 Subject: [PATCH 4/5] fix --- .../tx/schemeshard/schemeshard__operation_create_cdc_stream.h | 1 + .../schemeshard__operation_create_continuous_backup.cpp | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h index 386bc7817e16..11a921d84168 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h @@ -28,6 +28,7 @@ void DoCreateStream( const TPath& tablePath, const bool acceptExisted, const bool initialScan, + const TString& indexName, TVector& result); void DoCreatePqPart( diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp index ab9f82116217..6bb280316138 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp @@ -68,7 +68,7 @@ TVector CreateNewContinuousBackup(TOperationId opId, const TVector result; - NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, result); + NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result); NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result); return result; From 3b1f52c3dfd1d9dfc3c400d3d1ec80f64b9898e9 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Wed, 22 May 2024 13:46:28 +0000 Subject: [PATCH 5/5] Use empty message instead of bool --- ydb/core/protos/flat_scheme_op.proto | 2 +- .../tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 2 +- .../schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index ae02e266c583..e4b73013fbc0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -828,7 +828,7 @@ message TCreateCdcStream { optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default optional uint32 TopicPartitions = 4; oneof IndexMode { - bool AllIndexes = 5; // Create topic per each index + google.protobuf.Empty AllIndexes = 5; // Create topic per each index string IndexName = 6; } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index cda337d141db..a615d6b68a4a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -979,7 +979,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran std::vector candidates; - if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) { + if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) { candidates.reserve(tablePath->GetChildren().size()); for (const auto& child : tablePath->GetChildren()) { candidates.emplace_back(child.first); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index b65133a291ea..1224286150a7 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -61,7 +61,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( TableName: "Table" StreamDescription { %s } - AllIndexes: true + AllIndexes {} )", strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId);