From 3dbdd9880ce7cf8630351cb86fe04e2a61e39d2e Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 21 May 2024 11:40:11 +0000 Subject: [PATCH] Clean the code --- ...hemeshard__operation_create_cdc_stream.cpp | 70 +++++++++---------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 12a0a47f3c48..32172f6dd327 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -837,8 +837,8 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat return nullptr; } -void CalcBoundaries(const TTableInfo::TPtr table, TVector& boundaries) { - const auto& partitions = table->GetPartitions(); +void CalcBoundaries(const TTableInfo& table, TVector& boundaries) { + const auto& partitions = table.GetPartitions(); boundaries.reserve(partitions.size() - 1); for (ui32 i = 0; i < partitions.size(); ++i) { @@ -849,6 +849,24 @@ void CalcBoundaries(const TTableInfo::TPtr table, TVector& boundaries) } } +bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector& boundaries, TString& errStr) { + if (op.HasTopicPartitions()) { + const auto& keyColumns = table.KeyColumnIds; + const auto& columns = table.Columns; + + Y_ABORT_UNLESS(!keyColumns.empty()); + Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); + const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + + if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { + return false; + } + } else { + CalcBoundaries(table, boundaries); + } + return true; +} + } // anonymous std::variant DoNewStreamPathChecks( @@ -959,27 +977,25 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } } - std::vector> indexes; + std::vector candidates; if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) { - indexes.reserve(tablePath->GetChildren().size()); + candidates.reserve(tablePath->GetChildren().size()); for (const auto& child : tablePath->GetChildren()) { - indexes.emplace_back(child); + candidates.emplace_back(child.first); } } else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) { auto it = tablePath->GetChildren().find(op.GetIndexName()); if (it == tablePath->GetChildren().end()) { return {CreateReject(opId, NKikimrScheme::StatusSchemeError, - "requested particular index hasn't been found")}; + "requested particular path hasn't been found")}; } - indexes.emplace_back(*it); + candidates.emplace_back(it->first); } TVector result; - for (const auto& child : indexes) { - const auto& name = child.first; - + for (const auto& name : candidates) { const TPath indexPath = tablePath.Child(name); if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) { continue; @@ -992,7 +1008,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto indexImpltable = context.SS->Tables.at(indexImplPath.Base()->PathId); + auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId); const TPath indexStreamPath = indexImplPath.Child(streamName); if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) { @@ -1004,23 +1020,12 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran } TVector boundaries; - if (op.HasTopicPartitions()) { - const auto& keyColumns = indexImpltable->KeyColumnIds; - const auto& columns = indexImpltable->Columns; - - Y_ABORT_UNLESS(!keyColumns.empty()); - Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); - const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; - - if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - } else { - CalcBoundaries(indexImpltable, boundaries); + if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result); - DoCreatePqPart(opId, indexStreamPath, streamName, indexImpltable, op, boundaries, acceptExisted, result); + DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result); } if (initialScan) { @@ -1030,19 +1035,8 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); TVector boundaries; - if (op.HasTopicPartitions()) { - const auto& keyColumns = table->KeyColumnIds; - const auto& columns = table->Columns; - - Y_ABORT_UNLESS(!keyColumns.empty()); - Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); - const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; - - if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; - } - } else { - CalcBoundaries(table, boundaries); + if (!FillBoundaries(*table, op, boundaries, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; } DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);