Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix getting mid of range for autoscaling #4884

Closed
11 changes: 5 additions & 6 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace NPQ {


TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& databasePath,
const TString& topicName,
const TString& databasePath,
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
)
: TopicName(topicName)
Expand Down Expand Up @@ -34,7 +34,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
return;
}

RequestInflight = true;
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
Expand All @@ -55,7 +55,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
std::vector<TPartitionSplit> splitsToApply;
std::vector<TPartitionMerge> mergesToApply;

size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
auto itSplit = PartitionsToSplit.begin();
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
const auto partitionId = itSplit->first;
Expand Down Expand Up @@ -111,7 +111,7 @@ void TPartitionScaleManager::UpdateDatabasePath(const TString& dbPath) {
}

TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString& to) {
if (from > to) {
if (from > to && to.size() != 0) {
return "";
}

Expand All @@ -133,7 +133,6 @@ TString TPartitionScaleManager::GetRangeMid(const TString& from, const TString&
if (result == from) {
result += static_cast<unsigned char>(127);
}

return result;
}

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ class TPartitionScaleManager {
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};

private:
private:
struct TBalancerConfig {
TBalancerConfig(
NKikimrPQ::TUpdateBalancerConfig& config
)
: PathId(config.GetPathId())
, PathVersion(config.GetVersion())
, PartitionGraph(MakePartitionGraph(config))
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
, CurPartitions(config.PartitionsSize()) {
}

ui64 PathId;
int PathVersion;
TPartitionGraph PartitionGraph;
ui64 PartitionCountLimit;
ui64 MaxActivePartitions;
ui64 MinActivePartitions;
ui64 CurPartitions;
};
Expand All @@ -59,7 +59,7 @@ class TPartitionScaleManager {
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
void UpdateDatabasePath(const TString& dbPath);
void Die(const TActorContext& ctx);

static TString GetRangeMid(const TString& from, const TString& to);

private:
Expand All @@ -74,7 +74,7 @@ class TPartitionScaleManager {
private:
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
TString DatabasePath = "";
TActorId CurrentScaleRequest;
Expand Down
30 changes: 17 additions & 13 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ namespace NKikimr {
namespace NPQ {

TPartitionScaleRequest::TPartitionScaleRequest(
TString topicName,
TString databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
TString topicName,
TString databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
NActors::TActorId parentActorId
)
: Topic(topicName)
Expand All @@ -19,7 +19,7 @@ TPartitionScaleRequest::TPartitionScaleRequest(
, Splits(splits)
, Merges(merges)
, ParentActorId(parentActorId) {

}

void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
Expand All @@ -41,8 +41,8 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa

auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(PathId);
applyIf->SetPathVersion(PathVersion);
//applyIf->SetCheckGeneralVersion(false);
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
applyIf->SetCheckEntityVersion(true);

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);
Expand Down Expand Up @@ -70,14 +70,14 @@ void TPartitionScaleRequest::PassAway() {

void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
if (ev->Get()->Status != NKikimrProto::OK) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
}
}

void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
}
Expand All @@ -90,11 +90,15 @@ void TPartitionScaleRequest::Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCom

void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const NActors::TActorContext& ctx) {
auto msg = ev->Get();
//Cerr << "SAVDBG" << msg->Record.GetIssues()[0].Getmessage(); //savnik: log err

auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);//savnik: проверить, какой статус тут приходит
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
TStringBuilder issues;
for (auto& issue : ev->Get()->Record.GetIssues()) {
issues << issue.ShortDebugString() + ", ";
}
Cerr << "\n SAVDGB " << issues << "\n";
Send(ParentActorId, scaleRequestResult.release());
Die(ctx);
} else {
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,14 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;

if (writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;

if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
} else if (writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
return NKikimrPQ::EScaleStatus::NEED_MERGE;
}
return NKikimrPQ::EScaleStatus::NORMAL;
Expand Down
148 changes: 148 additions & 0 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h>

#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/persqueue/partition_scale_manager.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/test_env.h>

Expand Down Expand Up @@ -498,6 +499,153 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
}

Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) {
auto autoscalingTestTopic = "autoscalit-topic";
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

auto minParts = 5;
auto maxParts = 10;
auto scaleUpPercent = 80;
auto scaleDownPercent = 20;
auto threshold = 500;
auto strategy = EAutoscalingStrategy::ScaleUp;

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(minParts)
.MaxActivePartitions(maxParts)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(scaleUpPercent)
.ScaleDownThresholdPercent(scaleDownPercent)
.ThresholdTime(TDuration::Seconds(threshold))
.Strategy(strategy)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();

TDescribeTopicSettings descSettings;

auto describe = client.DescribeTopic(autoscalingTestTopic, descSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(describe.GetStatus(), NYdb::EStatus::SUCCESS, describe.GetIssues().ToString());


UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);

auto alterMinParts = 10;
auto alterMaxParts = 20;
auto alterScaleUpPercent = 90;
auto alterScaleDownPercent = 10;
auto alterThreshold = 700;
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;

TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MinActivePartitions(alterMinParts)
.MaxActivePartitions(alterMaxParts)
.BeginAlterAutoscalingSettings()
.ScaleDownThresholdPercent(alterScaleDownPercent)
.ScaleUpThresholdPercent(alterScaleUpPercent)
.ThresholdTime(TDuration::Seconds(alterThreshold))
.Strategy(alterStrategy)
.EndAlterAutoscalingSettings()
.EndAlterTopicPartitioningSettings();

client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();

auto describeAfterAlter = client.DescribeTopic(autoscalingTestTopic).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
}

Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(2)
.ScaleDownThresholdPercent(1)
.ThresholdTime(TDuration::Seconds(1))
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(TEST_TOPIC, createSettings).Wait();

auto msg = TString("a", 1_MB);

auto writeSession = CreateWriteSession(client, "producer-1", 0);
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
Sleep(TDuration::Seconds(10));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

auto writeSession2 = CreateWriteSession(client, "producer-1", 1);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
Sleep(TDuration::Seconds(10));
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
}

Y_UNIT_TEST(MidOfRange) {
TString a = "a";
TString b = "c";
auto res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);

b = "b";
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
UNIT_ASSERT(a < res);
UNIT_ASSERT(b > res);

a = {};
b = "b";
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
UNIT_ASSERT(a < res);
UNIT_ASSERT(b > res);

a = "a";
b = {};
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
Cerr << "\n SAVDBG " << res << "\n";
UNIT_ASSERT(a < res);
UNIT_ASSERT(b != res);

a = "aa";
b = {};
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
UNIT_ASSERT(a < res);
UNIT_ASSERT(b != res);

a = "aaa";
b = "b";
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
UNIT_ASSERT(a < res);
UNIT_ASSERT(b > res);

a = "aaa";
b = "aab";
res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b);
UNIT_ASSERT(a < res);
UNIT_ASSERT(b > res);
}
}

} // namespace NKikimr
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ TTopicSdkTestSetup CreateSetup() {
return setup;
}

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition) {
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic) {
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.Path(topic)
.ProducerId(producer);
if (partition) {
writeSettings.PartitionId(*partition);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo);

TTopicSdkTestSetup CreateSetup();

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt);
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC);

struct TTestReadSession {
struct MsgInfo {
Expand Down
Loading
Loading