From cbdb6457af07408dce72a05a13faa2eae9d2870a Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 10 Jun 2024 09:47:47 +0000 Subject: [PATCH] Fix scale by load ut --- ydb/core/persqueue/partition_write.cpp | 12 +++++++++++- ydb/core/persqueue/ut/autoscaling_ut.cpp | 9 +++++---- .../persqueue/ut/common/autoscaling_ut_common.cpp | 5 ++++- ydb/core/persqueue/ut/common/autoscaling_ut_common.h | 2 +- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 6c7bf5dcfe7a..52a7aeaf8129 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -519,6 +519,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { avg.Update(WriteNewSize, now); } + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::HandleWriteResponse writeNewSize# " << WriteNewSize; + ); + if (SplitMergeEnabled(Config)) { SplitMergeAvgWriteBytes->Update(WriteNewSize, now); auto needScaling = CheckScaleStatus(ctx); @@ -551,7 +556,12 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed; LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." << + "TPartition::CheckScaleStatus" + << " splitMergeAvgWriteBytes# " << SplitMergeAvgWriteBytes->GetValue() + << " writeSpeedUsagePercent# " << writeSpeedUsagePercent + << " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds() + << " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed + << " Topic: \"" << TopicName() << "\"." << " Partition: " << Partition ); auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index a5e8e494b727..b9b17ab91dfe 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -622,16 +622,17 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto msg = TString(1_MB, 'a'); - auto writeSession = CreateWriteSession(client, "producer-1", 0); + auto writeSession = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false); UNIT_ASSERT(writeSession->Write(Msg(msg, 1))); UNIT_ASSERT(writeSession->Write(Msg(msg, 2))); - Sleep(TDuration::Seconds(10)); + Sleep(TDuration::Seconds(5)); auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); - auto writeSession2 = CreateWriteSession(client, "producer-1", 1); + auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false); UNIT_ASSERT(writeSession2->Write(Msg(msg, 3))); - Sleep(TDuration::Seconds(10)); + UNIT_ASSERT(writeSession2->Write(Msg(msg, 4))); + Sleep(TDuration::Seconds(5)); auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync(); UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5); } diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp index 2df0c684bf31..d1eb209f1707 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp @@ -105,10 +105,13 @@ TTopicSdkTestSetup CreateSetup() { return setup; } -std::shared_ptr CreateWriteSession(TTopicClient& client, const TString& producer, std::optional partition, TString topic) { +std::shared_ptr CreateWriteSession(TTopicClient& client, const TString& producer, std::optional partition, TString topic, bool useCodec) { auto writeSettings = TWriteSessionSettings() .Path(topic) .ProducerId(producer); + if (!useCodec) { + writeSettings.Codec(ECodec::RAW); + } if (partition) { writeSettings.PartitionId(*partition); } else { diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h index dc97a1ddbd7c..832d2acb9ab3 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h @@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo); TTopicSdkTestSetup CreateSetup(); -std::shared_ptr CreateWriteSession(TTopicClient& client, const TString& producer, std::optional partition = std::nullopt, TString topic = TEST_TOPIC); +std::shared_ptr CreateWriteSession(TTopicClient& client, const TString& producer, std::optional partition = std::nullopt, TString topic = TEST_TOPIC, bool useCodec = true); struct TTestReadSession { struct MsgInfo {