Skip to content

Commit

Permalink
Clean the code
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed May 21, 2024
1 parent 82781e8 commit 3dbdd98
Showing 1 changed file with 32 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,8 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
return nullptr;
}

void CalcBoundaries(const TTableInfo::TPtr table, TVector<TString>& boundaries) {
const auto& partitions = table->GetPartitions();
void CalcBoundaries(const TTableInfo& table, TVector<TString>& boundaries) {
const auto& partitions = table.GetPartitions();
boundaries.reserve(partitions.size() - 1);

for (ui32 i = 0; i < partitions.size(); ++i) {
Expand All @@ -849,6 +849,24 @@ void CalcBoundaries(const TTableInfo::TPtr table, TVector<TString>& boundaries)
}
}

bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
if (op.HasTopicPartitions()) {
const auto& keyColumns = table.KeyColumnIds;
const auto& columns = table.Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return false;
}
} else {
CalcBoundaries(table, boundaries);
}
return true;
}

} // anonymous

std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks(
Expand Down Expand Up @@ -959,27 +977,25 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}
}

std::vector<std::pair<TString, TPathId>> indexes;
std::vector<TString> candidates;

if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) {
indexes.reserve(tablePath->GetChildren().size());
candidates.reserve(tablePath->GetChildren().size());
for (const auto& child : tablePath->GetChildren()) {
indexes.emplace_back(child);
candidates.emplace_back(child.first);
}
} else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) {
auto it = tablePath->GetChildren().find(op.GetIndexName());
if (it == tablePath->GetChildren().end()) {
return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
"requested particular index hasn't been found")};
"requested particular path hasn't been found")};
}
indexes.emplace_back(*it);
candidates.emplace_back(it->first);
}

TVector<ISubOperation::TPtr> result;

for (const auto& child : indexes) {
const auto& name = child.first;

for (const auto& name : candidates) {
const TPath indexPath = tablePath.Child(name);
if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) {
continue;
Expand All @@ -992,7 +1008,7 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}

Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto indexImpltable = context.SS->Tables.at(indexImplPath.Base()->PathId);
auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId);

const TPath indexStreamPath = indexImplPath.Child(streamName);
if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) {
Expand All @@ -1004,23 +1020,12 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}

TVector<TString> boundaries;
if (op.HasTopicPartitions()) {
const auto& keyColumns = indexImpltable->KeyColumnIds;
const auto& columns = indexImpltable->Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}
} else {
CalcBoundaries(indexImpltable, boundaries);
if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
DoCreatePqPart(opId, indexStreamPath, streamName, indexImpltable, op, boundaries, acceptExisted, result);
DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result);
}

if (initialScan) {
Expand All @@ -1030,19 +1035,8 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);
TVector<TString> boundaries;
if (op.HasTopicPartitions()) {
const auto& keyColumns = table->KeyColumnIds;
const auto& columns = table->Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}
} else {
CalcBoundaries(table, boundaries);
if (!FillBoundaries(*table, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
Expand Down

0 comments on commit 3dbdd98

Please sign in to comment.