Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheme shard support for cdc stream per index #4650

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@

#include <util/system/hostname.h>

#include <thread>

namespace NKikimr {

namespace NKikimrServicesInitializers {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,10 @@ message TCreateCdcStream {
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
optional uint32 TopicPartitions = 4;
oneof IndexMode {
bool AllIndexes = 5; // Create topic per each index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть смысл передавать AllIndexes = false? Чем это отличается от IndexMode == NOT_SET?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ничем, но просто поле enum нельзя положить в oneof, или как ты предлагаешь?

Copy link
Member

@CyberROFL CyberROFL May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Предлагаю google.protobuf.empty AllIndexes, если достаточно (и всегда будет) самого факта. Или хотя бы TAllIndexes AllIndexes, где TAllIndexes — это пустое сообщение (если его предполагается когда-нибудь расширять).

string IndexName = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не очень понял, для чего нужен этот режим?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну мы можем или создать стримы для всех индексов, или только для определенных. Кроме того нам надо передать TCreateCdcStream в сабоперацию и удобно иметь тут же информацию об индексе для которой создана сабоперация. Есть идеи как это выразить лучше?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Какой сценарий создания стрима только для определенного (даже не списка) индекса?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну к примеру у тебя часть индексов используется для местной аналитики и не нужна на реплике, а часть для OLTP запросов. Или же наоборот. Список наверно лучше тут.

}
}

message TAlterCdcStream {
Expand Down Expand Up @@ -1524,6 +1528,7 @@ message TIndexBuildControl {

message TLockConfig {
optional string Name = 1;
optional bool AllowIndexImplLock = 2;
}

message TLockGuard {
Expand Down
218 changes: 165 additions & 53 deletions ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,38 @@ class TNewCdcStream: public TSubOperation {
}
}

TString BuildWorkingDir() const {
if (Transaction.GetCreateCdcStream().HasIndexName()) {
return Transaction.GetWorkingDir() + "/"
+ Transaction.GetCreateCdcStream().GetIndexName() + "/indexImplTable";
} else {
return Transaction.GetWorkingDir();
}
}

public:
using TSubOperation::TSubOperation;

THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
const auto& streamDesc = op.GetStreamDescription();
const auto& streamName = streamDesc.GetName();
const auto acceptExisted = !Transaction.GetFailOnExist();

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

if (op.HasAllIndexes()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Illigal part operation with all indexes flag");
return result;
}

const auto& workingDir = BuildWorkingDir();

LOG_N("TNewCdcStream Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << streamName);

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

const auto tablePath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = tablePath.Check();
Expand All @@ -130,11 +146,17 @@ class TNewCdcStream: public TSubOperation {
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderDeleting();

if (op.HasIndexName() && op.GetIndexName()) {
checks.IsInsideTableIndexPath();
} else {
checks
.IsTable()
.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand Down Expand Up @@ -507,17 +529,35 @@ class TNewCdcStreamAtTable: public TSubOperation {
}

THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const auto& workingDir = Transaction.GetWorkingDir();
auto workingDir = Transaction.GetWorkingDir();
const auto& op = Transaction.GetCreateCdcStream();
const auto& tableName = op.GetTableName();
auto tableName = op.GetTableName();
const auto& streamName = op.GetStreamDescription().GetName();

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());
bool isIndexTable = false;

if (op.HasAllIndexes()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Illigal part operation with all indexes flag");
return result;
}

if (op.HasIndexName()) {
if (!op.GetIndexName()) {
result->SetError(NKikimrScheme::StatusInvalidParameter,
"Unexpected empty index name");
return result;
}
isIndexTable = true;
workingDir += ("/" + tableName + "/" + op.GetIndexName());
tableName = "indexImplTable";
}

LOG_N("TNewCdcStreamAtTable Propose"
<< ": opId# " << OperationId
<< ", stream# " << workingDir << "/" << tableName << "/" << streamName);

auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID());

const auto workingDirPath = TPath::Resolve(workingDir, context.SS);
{
const auto checks = workingDirPath.Check();
Expand All @@ -526,10 +566,15 @@ class TNewCdcStreamAtTable: public TSubOperation {
.IsAtLocalSchemeShard()
.IsResolved()
.NotDeleted()
.IsCommonSensePath()
.IsLikeDirectory()
.NotUnderDeleting();

if (isIndexTable) {
checks.IsInsideTableIndexPath();
} else {
checks.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand All @@ -547,10 +592,12 @@ class TNewCdcStreamAtTable: public TSubOperation {
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderDeleting();

if (checks) {
if (!isIndexTable) {
checks.IsCommonSensePath();
}
if (InitialScan) {
checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op
} else {
Expand Down Expand Up @@ -632,17 +679,18 @@ class TNewCdcStreamAtTable: public TSubOperation {

private:
const bool InitialScan;

}; // TNewCdcStreamAtTable

void DoCreateLock(const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock,
TVector<ISubOperation::TPtr>& result)
{
auto outTx = TransactionTemplate(workingDirPath.PathString(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock);
outTx.SetFailOnExist(false);
outTx.SetInternal(true);
outTx.MutableLockConfig()->SetName(tablePath.LeafName());
auto cfg = outTx.MutableLockConfig();
cfg->SetName(tablePath.LeafName());
cfg->SetAllowIndexImplLock(allowIndexImplLock);

result.push_back(CreateLock(NextPartId(opId, result), outTx));
}
Expand Down Expand Up @@ -704,30 +752,34 @@ void DoCreatePqPart(const TOperationId& opId, const TPath& streamPath, const TSt
result.push_back(CreateNewPQ(NextPartId(opId, result), outTx));
}

void FillModifySchemaForCdc(NKikimrSchemeOp::TModifyScheme& outTx, const NKikimrSchemeOp::TCreateCdcStream& op,
const TOperationId& opId, const TString& indexName, bool acceptExisted, bool initialScan)
{
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);
if (indexName) {
outTx.MutableCreateCdcStream()->SetIndexName(indexName);
} else {
outTx.MutableCreateCdcStream()->ClearIndexMode();
}

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}
}

void DoCreateStream(const NKikimrSchemeOp::TCreateCdcStream& op, const TOperationId& opId, const TPath& workingDirPath, const TPath& tablePath,
const bool acceptExisted, const bool initialScan, TVector<ISubOperation::TPtr>& result)
const bool acceptExisted, const bool initialScan, const TString& indexName, TVector<ISubOperation::TPtr>& result)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}

FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx));
}

{
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
outTx.SetFailOnExist(!acceptExisted);
outTx.MutableCreateCdcStream()->CopyFrom(op);

if (initialScan) {
outTx.MutableLockGuard()->SetOwnerTxId(ui64(opId.GetTxId()));
}

FillModifySchemaForCdc(outTx, op, opId, indexName, acceptExisted, initialScan);
result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, initialScan));
}
}
Expand Down Expand Up @@ -785,6 +837,36 @@ ISubOperation::TPtr RejectOnTablePathChecks(const TOperationId& opId, const TPat
return nullptr;
}

void CalcBoundaries(const TTableInfo& table, TVector<TString>& boundaries) {
const auto& partitions = table.GetPartitions();
boundaries.reserve(partitions.size() - 1);

for (ui32 i = 0; i < partitions.size(); ++i) {
const auto& partition = partitions.at(i);
if (i != partitions.size() - 1) {
boundaries.push_back(partition.EndOfRange);
}
}
}

bool FillBoundaries(const TTableInfo& table, const ::NKikimrSchemeOp::TCreateCdcStream& op, TVector<TString>& boundaries, TString& errStr) {
if (op.HasTopicPartitions()) {
const auto& keyColumns = table.KeyColumnIds;
const auto& columns = table.Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return false;
}
} else {
CalcBoundaries(table, boundaries);
}
return true;
}

} // anonymous

std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks(
Expand Down Expand Up @@ -889,46 +971,76 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
<< "Initial scan is not supported yet")};
}

Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);

TVector<TString> boundaries;
if (op.HasTopicPartitions()) {
if (op.GetTopicPartitions() <= 0) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")};
}
}

const auto& keyColumns = table->KeyColumnIds;
const auto& columns = table->Columns;

Y_ABORT_UNLESS(!keyColumns.empty());
Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;
std::vector<TString> candidates;

if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kAllIndexes && op.GetAllIndexes()) {
candidates.reserve(tablePath->GetChildren().size());
for (const auto& child : tablePath->GetChildren()) {
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
candidates.emplace_back(child.first);
}
} else {
const auto& partitions = table->GetPartitions();
boundaries.reserve(partitions.size() - 1);

for (ui32 i = 0; i < partitions.size(); ++i) {
const auto& partition = partitions.at(i);
if (i != partitions.size() - 1) {
boundaries.push_back(partition.EndOfRange);
}
} else if (op.GetIndexModeCase() == NKikimrSchemeOp::TCreateCdcStream::kIndexName) {
auto it = tablePath->GetChildren().find(op.GetIndexName());
if (it == tablePath->GetChildren().end()) {
return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
"requested particular path hasn't been found")};
}
candidates.emplace_back(it->first);
}

TVector<ISubOperation::TPtr> result;

for (const auto& name : candidates) {
const TPath indexPath = tablePath.Child(name);
if (!indexPath.IsTableIndex() || indexPath.IsDeleted()) {
continue;
}

const TPath indexImplPath = indexPath.Child("indexImplTable");
if (!indexImplPath) {
return {CreateReject(opId, NKikimrScheme::StatusSchemeError,
"indexImplTable hasn't been found")};
}

Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto indexImplTable = context.SS->Tables.at(indexImplPath.Base()->PathId);

const TPath indexStreamPath = indexImplPath.Child(streamName);
if (auto reject = RejectOnCdcChecks(opId, indexStreamPath, acceptExisted)) {
return {reject};
}

if (initialScan) {
DoCreateLock(opId, indexPath, indexImplPath, true, result);
}

TVector<TString> boundaries;
if (!FillBoundaries(*indexImplTable, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, name, result);
DoCreatePqPart(opId, indexStreamPath, streamName, indexImplTable, op, boundaries, acceptExisted, result);
}

if (initialScan) {
DoCreateLock(opId, workingDirPath, tablePath, result);
DoCreateLock(opId, workingDirPath, tablePath, false, result);
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, result);
DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
auto table = context.SS->Tables.at(tablePath.Base()->PathId);
TVector<TString> boundaries;
if (!FillBoundaries(*table, op, boundaries, errStr)) {
return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
}

DoCreateStream(op, opId, workingDirPath, tablePath, acceptExisted, initialScan, {}, result);
DoCreatePqPart(opId, streamPath, streamName, table, op, boundaries, acceptExisted, result);
return result;
}

Expand Down
Loading
Loading