diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index bf9e9c1d8289..074d8bee4cf4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -236,8 +236,6 @@ #include -#include - namespace NKikimr { namespace NKikimrServicesInitializers { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index d275c590a612..e4b73013fbc0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -827,6 +827,10 @@ message TCreateCdcStream { optional TCdcStreamDescription StreamDescription = 2; optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default optional uint32 TopicPartitions = 4; + oneof IndexMode { + google.protobuf.Empty AllIndexes = 5; // Create topic per each index + string IndexName = 6; + } } message TAlterCdcStream { @@ -1524,6 +1528,7 @@ message TIndexBuildControl { message TLockConfig { optional string Name = 1; + optional bool AllowIndexImplLock = 2; } message TLockGuard { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 874a2836293e..a615d6b68a4a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -105,22 +105,38 @@ class TNewCdcStream: public TSubOperation { } } + TString BuildWorkingDir() const { + if (Transaction.GetCreateCdcStream().HasIndexName()) { + return Transaction.GetWorkingDir() + "/" + + Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable"; + } else { + return Transaction.GetWorkingDir(); + } + } + public: using TSubOperation::TSubOperation; THolder Propose(const TString& owner, TOperationContext& context) override { - const auto& workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); const auto& streamDesc = op.GetStreamDescription(); const auto& streamName = streamDesc.GetName(); const auto acceptExisted = !Transaction.GetFailOnExist(); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + + if (op.HasAllIndexes()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Illigal part operation with all indexes flag"); + return result; + } + + const auto& workingDir = BuildWorkingDir(); + LOG_N("TNewCdcStream Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << streamName); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - const auto tablePath = TPath::Resolve(workingDir, context.SS); { const auto checks = tablePath.Check(); @@ -130,11 +146,17 @@ class TNewCdcStream: public TSubOperation { .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() - .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting(); + if (op.HasIndexName() && op.GetIndexName()) { + checks.IsInsideTableIndexPath(); + } else { + checks + .IsTable() + .IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -507,17 +529,35 @@ class TNewCdcStreamAtTable: public TSubOperation { } THolder Propose(const TString&, TOperationContext& context) override { - const auto& workingDir = Transaction.GetWorkingDir(); + auto workingDir = Transaction.GetWorkingDir(); const auto& op = Transaction.GetCreateCdcStream(); - const auto& tableName = op.GetTableName(); + auto tableName = op.GetTableName(); const auto& streamName = op.GetStreamDescription().GetName(); + auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); + bool isIndexTable = false; + + if (op.HasAllIndexes()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Illigal part operation with all indexes flag"); + return result; + } + + if (op.HasIndexName()) { + if (!op.GetIndexName()) { + result->SetError(NKikimrScheme::StatusInvalidParameter, + "Unexpected empty index name"); + return result; + } + isIndexTable = true; + workingDir += ("/" + tableName + "/" + op.GetIndexName()); + tableName = "indexImplTable"; + } + LOG_N("TNewCdcStreamAtTable Propose" << ": opId# " << OperationId << ", stream# " << workingDir << "/" << tableName << "/" << streamName); - auto result = MakeHolder(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - const auto workingDirPath = TPath::Resolve(workingDir, context.SS); { const auto checks = workingDirPath.Check(); @@ -526,10 +566,15 @@ class TNewCdcStreamAtTable: public TSubOperation { .IsAtLocalSchemeShard() .IsResolved() .NotDeleted() - .IsCommonSensePath() .IsLikeDirectory() .NotUnderDeleting(); + if (isIndexTable) { + checks.IsInsideTableIndexPath(); + } else { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -547,10 +592,12 @@ class TNewCdcStreamAtTable: public TSubOperation { .NotDeleted() .IsTable() .NotAsyncReplicaTable() - .IsCommonSensePath() .NotUnderDeleting(); if (checks) { + if (!isIndexTable) { + checks.IsCommonSensePath(); + } if (InitialScan) { checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op } else { @@ -632,17 +679,18 @@ class TNewCdcStreamAtTable: public TSubOperation { private: const bool InitialScan; - }; // TNewCdcStreamAtTable -void DoCreateLock(const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, +void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, TVector& result) { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); outTx.SetFailOnExist(false); outTx.SetInternal(true); - outTx.MutableLockConfig()->SetName(tablePath.LeafName()); + auto cfg = outTx.MutableLockConfig(); + cfg->SetName(tablePath.LeafName()); + cfg->SetAllowIndexImplLock(allowIndexImplLock); result.push_back(CreateLock(NextPartId(opId, result), outTx)); } @@ -704,30 +752,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); } +void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan) +{ + outTx.SetFailOnExist(!acceptExisted); + outTx.MutableCreateCdcStream()->CopyFrom(op); + if (indexName) { + outTx.MutableCreateCdcStream()->SetIndexName(indexName); + } else { + outTx.MutableCreateCdcStream()->ClearIndexMode(); + } + + if (initialScan) { + outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); + } +} + void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath, - const bool acceptExisted, const bool initialScan, TVector& result) + const bool acceptExisted, const bool initialScan, const TString& indexName, TVector& result) { { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); - outTx.SetFailOnExist(!acceptExisted); - outTx.MutableCreateCdcStream()->CopyFrom(op); - - if (initialScan) { - outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); - } - + FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); } { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); - outTx.SetFailOnExist(!acceptExisted); - outTx.MutableCreateCdcStream()->CopyFrom(op); - - if (initialScan) { - outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId())); - } - + FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan); result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan)); } } @@ -785,6 +837,36 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } +void CalcBoundaries(const TTableInfo& table, TVector& boundaries) { + const auto& partitions = table.GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } +} + +bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector& boundaries, TString& errStr) { + if (op.HasTopicPartitions()) { + const auto& keyColumns = table.KeyColumnIds; + const auto& columns = table.Columns; + + Y_ABORT_UNLESS(!keyColumns.empty()); + Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); + const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + + if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { + return false; + } + } else { + CalcBoundaries(table, boundaries); + } + return true; +} + } // anonymous std::variant DoNewStreamPathChecks( @@ -889,46 +971,76 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran << "Initial scan is not supported yet")}; } - Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto table = context.SS->Tables.at(tablePath.Base()->PathId); - - TVector boundaries; if (op.HasTopicPartitions()) { if (op.GetTopicPartitions() <= 0) { return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; } + } - const auto& keyColumns = table->KeyColumnIds; - const auto& columns = table->Columns; - - Y_ABORT_UNLESS(!keyColumns.empty()); - Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); - const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + std::vector candidates; - if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; + if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes) { + candidates.reserve(tablePath->GetChildren().size()); + for (const auto& child : tablePath->GetChildren()) { + candidates.emplace_back(child.first); } - } else { - const auto& partitions = table->GetPartitions(); - boundaries.reserve(partitions.size() - 1); - - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& partition = partitions.at(i); - if (i != partitions.size() - 1) { - boundaries.push_back(partition.EndOfRange); - } + } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { + auto it = tablePath->GetChildren().find(op.GetIndexName()); + if (it == tablePath->GetChildren().end()) { + return {CreateReject(opId, NKikimrScheme::StatusSchemeError, + "requested particular path hasn't been found")}; } + candidates.emplace_back(it->first); } TVector result; + for (const auto& name : candidates) { + const TPath indexPath = tablePath.Child(name); + if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { + continue; + } + + const TPath indexImplPath = indexPath.Child("indexImplTable"); + if (!indexImplPath) { + return {CreateReject(opId, NKikimrScheme::StatusSchemeError, + "indexImplTable hasn't been found")}; + } + + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId); + + const TPath indexStreamPath = indexImplPath.Child(streamName); + if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { + return {reject}; + } + + if (initialScan) { + DoCreateLock(opId, indexPath, indexImplPath, true, result); + } + + TVector boundaries; + if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; + } + + DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); + DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result); + } + if (initialScan) { - DoCreateLock(opId, workingDirPath, tablePath, result); + DoCreateLock(opId, workingDirPath, tablePath, false, result); } - DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result); - DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result); + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + TVector boundaries; + if (!FillBoundaries(*table, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; + } + DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result); + DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result); return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h index 386bc7817e16..11a921d84168 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h @@ -28,6 +28,7 @@ void DoCreateStream( const TPath& tablePath, const bool acceptExisted, const bool initialScan, + const TString& indexName, TVector& result); void DoCreatePqPart( diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp index ab9f82116217..6bb280316138 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_continuous_backup.cpp @@ -68,7 +68,7 @@ TVector CreateNewContinuousBackup(TOperationId opId, const TVector result; - NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, result); + NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, tablePath, acceptExisted, false, {}, result); NCdc::DoCreatePqPart(opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, createCdcStreamOp, boundaries, acceptExisted, result); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp index 1d293cf020f5..8ec987591691 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_lock.cpp @@ -124,9 +124,14 @@ class TCreateLock: public TSubOperation { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsCommonSensePath() .IsLikeDirectory(); + if (op.GetAllowIndexImplLock()) { + checks.IsInsideTableIndexPath(); + } else { + checks.IsCommonSensePath(); + } + if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); return result; @@ -143,8 +148,11 @@ class TCreateLock: public TSubOperation { .NotUnderDeleting() .NotUnderOperation() .IsTable() - .NotAsyncReplicaTable() - .IsCommonSensePath(); + .NotAsyncReplicaTable(); + + if (!op.GetAllowIndexImplLock()) { + checks.IsCommonSensePath(); + } if (!checks) { result->SetError(checks.GetStatus(), checks.GetError()); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index a76144951927..dc11bfecd01e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -187,7 +187,7 @@ bool TPathElement::IsContainer() const { } bool TPathElement::IsLikeDirectory() const { - return IsDirectory() || IsDomainRoot() || IsOlapStore(); + return IsDirectory() || IsDomainRoot() || IsOlapStore() || IsTableIndex(); } bool TPathElement::HasActiveChanges() const { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index c07870e51fd4..f2cf5f8d5fb0 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -617,6 +617,29 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "StreamWithIndex" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + IndexName: "NotExistedIndex" + )", {NKikimrScheme::StatusSchemeError}); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "StreamWithIndex" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + IndexName: "Index" + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Index/indexImplTable/StreamWithIndex/streamImpl"), {NLs::PathExist}); + TestDropTable(runtime, ++txId, "/MyRoot", "Table"); env.TestWaitNotification(runtime, txId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 7310f0baeef1..1224286150a7 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -6,9 +6,29 @@ using namespace NSchemeShardUT_Private; +static const TString createTableProto = R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] +)"; + +static const TString createTableWithIndexProto = R"( + TableDescription { + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["value"] + } +)"; + Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { template - void CreateStream(const TMaybe& state = Nothing(), bool vt = false) { + void CreateStream(const TMaybe& state = Nothing(), bool vt = false, bool tableWithIndex = false) { T t; t.GetTestEnvOptions().EnableChangefeedInitialScan(true); @@ -16,13 +36,11 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); runtime.GetAppData().DisableCdcAutoSwitchingToReadyStateForTests = true; - - TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( - Name: "Table" - Columns { Name: "key" Type: "Uint64" } - Columns { Name: "value" Type: "Uint64" } - KeyColumnNames: ["key"] - )"); + if (tableWithIndex) { + TestCreateIndexedTable(runtime, ++t.TxId, "/MyRoot", createTableWithIndexProto); + } else { + TestCreateTable(runtime, ++t.TxId, "/MyRoot", createTableProto); + } t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -43,6 +61,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", Sprintf(R"( TableName: "Table" StreamDescription { %s } + AllIndexes {} )", strDesc.c_str())); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -50,6 +69,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { NLs::PathExist, NLs::StreamVirtualTimestamps(vt), }); + + if (tableWithIndex) { + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/SyncIndex/indexImplTable/Stream"), { + NLs::PathExist, + NLs::StreamVirtualTimestamps(vt), + }); + } }); } @@ -57,14 +83,26 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { CreateStream(); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamTableWithIndex) { + CreateStream(Nothing(), false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) { CreateStream(NKikimrSchemeOp::ECdcStreamStateReady); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReadyTableWithIndex) { + CreateStream(NKikimrSchemeOp::ECdcStreamStateReady, false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScan) { CreateStream(NKikimrSchemeOp::ECdcStreamStateScan); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScanTableWithIndex) { + CreateStream(NKikimrSchemeOp::ECdcStreamStateScan, false, true); + } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithVirtualTimestamps) { CreateStream({}, true); }