Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add few auto partitioning fields to describe and SDK. Add important consumer to CLI #6118

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,41 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

bool firstPartitionFound = false;
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 0) {
firstPartitionFound = true;
UNIT_ASSERT(!partition.GetActive());
UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
auto childIds = partition.GetChildPartitionIds();
std::sort(childIds.begin(), childIds.end());
UNIT_ASSERT_EQUAL(childIds[0], 1);
UNIT_ASSERT_EQUAL(childIds[1], 2);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a break here

}
UNIT_ASSERT(firstPartitionFound);

TString secondPartitionTo = "";
TString thirdPartitionFrom = "";
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetActive());
if (partition.GetPartitionId() == 1) {
UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
secondPartitionTo = *partition.GetToBound();
}
if (partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
thirdPartitionFrom = *partition.GetFromBound();
}
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
}
}

UNIT_ASSERT(!secondPartitionTo.Empty());
UNIT_ASSERT(!thirdPartitionFrom.Empty());

auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation {
return nullptr;
}

if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy());
}

if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) {
const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds();
if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,11 @@ message DescribeTopicResult {

// Partition location, filled only when include_location in request is true.
PartitionLocation partition_location = 6;

// Inclusive left border. Emptiness means -inf.
optional bytes from_bound = 7;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что там внутри bytes?

Copy link
Collaborator Author

@niksaveliev niksaveliev Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Строка. Но там могут быть нечитаемые символы, поэтому байты. Границы не человекочитаемы

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну вот непонятно как этим пользоваться тогда?
Какой формат, если это бинарные данные?
Что тут будет SDK/Describe возвращать?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Как пользоваться - лексикографически сравнивать. Можно, например, определить, лежит ли твой ключ в границах партиции просто сравнением

// Exclusive right border. Emptiness means +inf.
optional bytes to_bound = 8;
}

message TopicStats {
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ namespace {
config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
.Optional()
.StoreResult(&StartingMessageTimestamp_);
config.Opts->AddLongOption("important", "Is consumer important")
.Optional()
.DefaultValue(false)
.StoreResult(&IsImportant_);
config.Opts->SetFreeArgsNum(1);
SetFreeArgTitle(0, "<topic-path>", "Topic path");
AddAllowedCodecs(config, AllowedCodecs);
Expand Down Expand Up @@ -537,6 +541,7 @@ namespace {
codecs.push_back(NTopic::ECodec::RAW);
}
consumerSettings.SetSupportedCodecs(codecs);
consumerSettings.SetImportant(IsImportant_);

readRuleSettings.AppendAddConsumers(consumerSettings);

Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient {

private:
TString ConsumerName_;
bool IsImportant_;
TMaybe<ui64> StartingMessageTimestamp_;
};

Expand Down
25 changes: 25 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,22 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
for (const auto& partId : partitionInfo.parent_partition_ids()) {
ParentPartitionIds_.push_back(partId);
}

if (partitionInfo.has_partition_stats()) {
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
}

if (partitionInfo.has_partition_location()) {
PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
}

if (partitionInfo.has_from_bound()) {
FromBound_ = TString(partitionInfo.from_bound());
}

if (partitionInfo.has_to_bound()) {
ToBound_ = TString(partitionInfo.to_bound());
}
}

TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
Expand Down Expand Up @@ -437,6 +446,14 @@ const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const {
return PartitionLocation_;
}

const TVector<ui64> TPartitionInfo::GetChildPartitionIds() const {
return ChildPartitionIds_;
}

const TVector<ui64> TPartitionInfo::GetParentPartitionIds() const {
return ParentPartitionIds_;
}

bool TPartitionInfo::GetActive() const {
return Active_;
}
Expand All @@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const {
return PartitionId_;
}

const TMaybe<TString>& TPartitionInfo::GetFromBound() const {
return FromBound_;
}

const TMaybe<TString>& TPartitionInfo::GetToBound() const {
return ToBound_;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TTopicClient

Expand Down
16 changes: 14 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,21 @@ class TPartitionInfo {
const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
const TMaybe<TPartitionLocation>& GetPartitionLocation() const;

const TMaybe<TString>& GetFromBound() const;
const TMaybe<TString>& GetToBound() const;

private:
ui64 PartitionId_;
bool Active_;
TVector<ui64> ChildPartitionIds_;
TVector<ui64> ParentPartitionIds_;

TMaybe<TPartitionStats> PartitionStats_;
TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
TMaybe<TPartitionLocation> PartitionLocation_;

TMaybe<TString> FromBound_;
TMaybe<TString> ToBound_;
};

struct TAlterPartitioningSettings;
Expand Down Expand Up @@ -206,11 +213,11 @@ class TPartitioningSettings {
public:
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){}
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {})
: MinActivePartitions_(minActivePartitions)
, MaxActivePartitions_(maxActivePartitions)
, PartitionCountLimit_(0)
, AutoPartitioningSettings_(autoscalingSettings)
, AutoPartitioningSettings_(autoPartitioning)
{
}

Expand Down Expand Up @@ -459,6 +466,11 @@ struct TConsumerSettings {
return *this;
}

TConsumerSettings& SetImportant(bool isImportant) {
Important_ = isImportant;
return *this;
}

TSettings& EndAddConsumer() { return Parent_; };

private:
Expand Down
25 changes: 20 additions & 5 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv

if (response.PQGroupInfo) {
const auto& pqDescr = response.PQGroupInfo->Description;
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
auto part = Result.add_partitions();
part->set_partition_id(i);
part->set_active(true);
for (auto& sourcePart: pqDescr.GetPartitions()) {
auto destPart = Result.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = pqDescr.GetPQTabletConfig();
Expand Down Expand Up @@ -1401,7 +1417,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE
for (auto partData : record.GetPartResult()) {
if ((ui32)partData.GetPartition() != Settings.Partitions[0])
continue;

Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
partResult->set_partition_id(partData.GetPartition());
partResult->set_active(true);
Expand Down
Loading