Skip to content

Commit

Permalink
Merge c84607a into 0ca1ac0
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed May 28, 2024
2 parents 0ca1ac0 + c84607a commit 7df7f9a
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 11 deletions.
141 changes: 134 additions & 7 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ namespace NYdb::NConsoleClient {
std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis."),
};

THashMap<TString, NTopic::EAutoscalingStrategy> AutoscaleStrategies = {
std::pair<TString, NTopic::EAutoscalingStrategy>("disabled", NTopic::EAutoscalingStrategy::Disabled),
std::pair<TString, NTopic::EAutoscalingStrategy>("up", NTopic::EAutoscalingStrategy::ScaleUp),
std::pair<TString, NTopic::EAutoscalingStrategy>("up-and-down", NTopic::EAutoscalingStrategy::ScaleUpAndDown),
};

THashMap<NTopic::EAutoscalingStrategy, TString> AutoscaleStrategiesDescriptions = {
std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::Disabled, "Automatic scaling of the number of partitions is disabled"),
std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::ScaleUp, "The number of partitions can increase under high load, but cannot decrease"),
std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::ScaleUpAndDown, "The number of partitions can increase under high load and decrease under low load"),
};

THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = {
{ETopicMetadataField::Body, "Message data"},
{ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server."},
Expand Down Expand Up @@ -172,6 +184,81 @@ namespace {
return MeteringMode_;
}

void TCommandWithAutoscaling::AddAutoscaling(TClientCommand::TConfig& config, bool isAlter) {
TStringStream description;
description << "A strategy to automatically change the number of partitions depending on the load. Available strategies: ";
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
for (const auto& strategy: AutoscaleStrategies) {
auto findResult = AutoscaleStrategiesDescriptions.find(strategy.second);
Y_ABORT_UNLESS(findResult != AutoscaleStrategiesDescriptions.end(),
"Couldn't find description for %s autoscale strategy", (TStringBuilder() << strategy.second).c_str());
description << "\n " << colors.BoldColor() << strategy.first << colors.OldColor()
<< "\n " << findResult->second;
}

if (isAlter) {
config.Opts->AddLongOption("autoscale-strategy", description.Str())
.Optional()
.StoreResult(&AutoscaleStrategy_);
config.Opts->AddLongOption("autoscale-threshold-time", "Duration in seconds of high or low load before automatically scale the number of partitions")
.Optional()
.StoreResult(&ScaleThresholdTime_);
config.Opts->AddLongOption("autoscale-scale-up-threshold-percent", "The load percentage at which the number of partitions will increase")
.Optional()
.StoreResult(&ScaleUpThresholdPercent_);
config.Opts->AddLongOption("autoscale-scale-down-threshold-percent", "The load percentage at which the number of partitions will decrease")
.Optional()
.StoreResult(&ScaleDownThresholdPercent_);
} else {
config.Opts->AddLongOption("autoscale-strategy", description.Str())
.Optional()
.DefaultValue("disabled")
.StoreResult(&AutoscaleStrategy_);
config.Opts->AddLongOption("autoscale-threshold-time", "Duration in seconds of high or low load before automatically scale the number of partitions")
.Optional()
.DefaultValue(300)
.StoreResult(&ScaleThresholdTime_);
config.Opts->AddLongOption("autoscale-scale-up-threshold-percent", "The load percentage at which the number of partitions will increase")
.Optional()
.DefaultValue(90)
.StoreResult(&ScaleUpThresholdPercent_);
config.Opts->AddLongOption("autoscale-scale-down-threshold-percent", "The load percentage at which the number of partitions will decrease")
.Optional()
.DefaultValue(30)
.StoreResult(&ScaleDownThresholdPercent_);
}
}

void TCommandWithAutoscaling::ParseAutoscalingStrategy() {
if (AutoscalingStrategyStr_.empty()) {
return;
}

TString toLowerStrategy = to_lower(AutoscalingStrategyStr_);
auto strategyIt = AutoscaleStrategies.find(toLowerStrategy);
if (strategyIt.IsEnd()) {
throw TMisuseException() << "Autoscaling strategy " << AutoscalingStrategyStr_ << " is not available for this command";
} else {
AutoscaleStrategy_ = strategyIt->second;
}
}

TMaybe<NTopic::EAutoscalingStrategy> TCommandWithAutoscaling::GetAutoscalingStrategy() const {
return AutoscaleStrategy_;
}

TMaybe<ui32> TCommandWithAutoscaling::GetScaleThresholdTime() const {
return ScaleThresholdTime_;
}

TMaybe<ui32> TCommandWithAutoscaling::GetScaleUpThresholdPercent() const {
return ScaleUpThresholdPercent_;
}

TMaybe<ui32> TCommandWithAutoscaling::GetScaleDownThresholdPercent() const {
return ScaleDownThresholdPercent_;
}

TCommandTopic::TCommandTopic()
: TClientCommandTree("topic", {}, "TopicService operations") {
AddCommand(std::make_unique<TCommandTopicCreate>());
Expand All @@ -188,9 +275,13 @@ namespace {

void TCommandTopicCreate::Config(TConfig& config) {
TYdbCommand::Config(config);
config.Opts->AddLongOption("partitions-count", "Total partitions count for topic")
config.Opts->AddLongOption("partitions-count", "Initial number of partitions for topic")
.DefaultValue(1)
.StoreResult(&PartitionsCount_);
.StoreResult(&MinActivePartitions_);
config.Opts->AddLongOption("max-partitions-count", "Maximum number of partitions for topic")
.DefaultValue(1)
.Optional()
.StoreResult(&MaxActivePartitions_);
config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored")
.DefaultValue(24)
.Optional()
Expand All @@ -207,21 +298,30 @@ namespace {
SetFreeArgTitle(0, "<topic-path>", "Topic path");
AddAllowedCodecs(config, AllowedCodecs);
AddAllowedMeteringModes(config);
AddAutoscaling(config, false);
}

void TCommandTopicCreate::Parse(TConfig& config) {
TYdbCommand::Parse(config);
ParseTopicName(config, 0);
ParseCodecs();
ParseMeteringMode();
ParseAutoscalingStrategy();
}

int TCommandTopicCreate::Run(TConfig& config) {
TDriver driver = CreateDriver(config);
NYdb::NTopic::TTopicClient topicClient(driver);

auto settings = NYdb::NTopic::TCreateTopicSettings();
settings.PartitioningSettings(PartitionsCount_, PartitionsCount_);

auto autoscaleSettings = NTopic::TAutoscalingSettings(
GetAutoscalingStrategy() ? *GetAutoscalingStrategy() : NTopic::EAutoscalingStrategy::Disabled,
GetScaleThresholdTime() ? TDuration::Seconds(*GetScaleThresholdTime()) : TDuration::Seconds(0),
GetScaleUpThresholdPercent() ? *GetScaleUpThresholdPercent() : 0,
GetScaleDownThresholdPercent() ? *GetScaleDownThresholdPercent() : 0);

settings.PartitioningSettings(MinActivePartitions_, MaxActivePartitions_, autoscaleSettings);
settings.PartitionWriteBurstBytes(PartitionWriteSpeedKbps_ * 1_KB);
settings.PartitionWriteSpeedBytesPerSecond(PartitionWriteSpeedKbps_ * 1_KB);

Expand Down Expand Up @@ -249,8 +349,11 @@ namespace {

void TCommandTopicAlter::Config(TConfig& config) {
TYdbCommand::Config(config);
config.Opts->AddLongOption("partitions-count", "Total partitions count for topic")
.StoreResult(&PartitionsCount_);
config.Opts->AddLongOption("partitions-count", "Initial number of partitions for topic")
.StoreResult(&MinActivePartitions_);
config.Opts->AddLongOption("max-partitions-count", "Maximum number of partitions for topic")
.Optional()
.StoreResult(&MaxActivePartitions_);
config.Opts->AddLongOption("retention-period-hours", "Duration for which data in topic is stored")
.Optional()
.StoreResult(&RetentionPeriodHours_);
Expand All @@ -264,6 +367,7 @@ namespace {
SetFreeArgTitle(0, "<topic-path>", "Topic path");
AddAllowedCodecs(config, AllowedCodecs);
AddAllowedMeteringModes(config);
AddAutoscaling(config, true);
}

void TCommandTopicAlter::Parse(TConfig& config) {
Expand All @@ -276,9 +380,32 @@ namespace {
NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings(
NYdb::NTopic::TDescribeTopicResult& describeResult) {
auto settings = NYdb::NTopic::TAlterTopicSettings();
auto partitioningSettings = settings.BeginAlterPartitioningSettings();

if (MinActivePartitions_.Defined() && (*MinActivePartitions_ != describeResult.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions())) {
partitioningSettings.MinActivePartitions(*MinActivePartitions_);
}

if (MaxActivePartitions_.Defined() && (*MaxActivePartitions_ != describeResult.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions())) {
partitioningSettings.MaxActivePartitions(*MaxActivePartitions_);
}

auto autoscalingSettings = partitioningSettings.BeginAlterAutoscalingSettings();

if (GetScaleThresholdTime().Defined() && *GetScaleThresholdTime() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds()) {
autoscalingSettings.ThresholdTime(TDuration::Seconds(*GetScaleThresholdTime()));
}

if (GetAutoscalingStrategy().Defined() && *GetAutoscalingStrategy() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy()) {
autoscalingSettings.Strategy(*GetAutoscalingStrategy());
}

if (GetScaleDownThresholdPercent().Defined() && *GetScaleDownThresholdPercent() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent()) {
autoscalingSettings.ScaleDownThresholdPercent(*GetScaleDownThresholdPercent());
}

if (PartitionsCount_.Defined() && (*PartitionsCount_ != describeResult.GetTopicDescription().GetTotalPartitionsCount())) {
settings.AlterPartitioningSettings(*PartitionsCount_, *PartitionsCount_);
if (GetScaleUpThresholdPercent().Defined() && *GetScaleUpThresholdPercent() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent()) {
autoscalingSettings.ScaleUpThresholdPercent(*GetScaleUpThresholdPercent());
}

auto codecs = GetCodecs();
Expand Down
30 changes: 26 additions & 4 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,30 @@ namespace NYdb::NConsoleClient {
NTopic::EMeteringMode MeteringMode_ = NTopic::EMeteringMode::Unspecified;
};

class TCommandWithAutoscaling {
protected:
void AddAutoscaling(TClientCommand::TConfig& config, bool withDefault);
void ParseAutoscalingStrategy();
TMaybe<NTopic::EAutoscalingStrategy> GetAutoscalingStrategy() const;
TMaybe<ui32> GetScaleThresholdTime() const;
TMaybe<ui32> GetScaleUpThresholdPercent() const;
TMaybe<ui32> GetScaleDownThresholdPercent() const;

private:
TMaybe<ui32> ScaleThresholdTime_;
TMaybe<ui32> ScaleUpThresholdPercent_;
TMaybe<ui32> ScaleDownThresholdPercent_;

TString AutoscalingStrategyStr_;
TMaybe<NTopic::EAutoscalingStrategy> AutoscaleStrategy_;
};

class TCommandTopic: public TClientCommandTree {
public:
TCommandTopic();
};

class TCommandTopicCreate: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode {
class TCommandTopicCreate: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode, public TCommandWithAutoscaling {
public:
TCommandTopicCreate();
void Config(TConfig& config) override;
Expand All @@ -52,11 +70,13 @@ namespace NYdb::NConsoleClient {
private:
ui64 RetentionPeriodHours_;
ui64 RetentionStorageMb_;
ui32 PartitionsCount_;
ui32 MinActivePartitions_;
ui32 MaxActivePartitions_;

ui32 PartitionWriteSpeedKbps_;
};

class TCommandTopicAlter: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode {
class TCommandTopicAlter: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode, public TCommandWithAutoscaling {
public:
TCommandTopicAlter();
void Config(TConfig& config) override;
Expand All @@ -66,7 +86,9 @@ namespace NYdb::NConsoleClient {
private:
TMaybe<ui64> RetentionPeriodHours_;
TMaybe<ui64> RetentionStorageMb_;
TMaybe<ui32> PartitionsCount_;
TMaybe<ui32> MinActivePartitions_;
TMaybe<ui32> MaxActivePartitions_;

TMaybe<ui32> PartitionWriteSpeedKbps_;

NYdb::NTopic::TAlterTopicSettings PrepareAlterSettings(NYdb::NTopic::TDescribeTopicResult& describeResult);
Expand Down

0 comments on commit 7df7f9a

Please sign in to comment.