Skip to content

Commit

Permalink
Fix split by load ut (#5383)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Jun 13, 2024
1 parent 9795803 commit 9524970
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
12 changes: 11 additions & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,11 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
avg.Update(WriteNewSize, now);
}

LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::HandleWriteResponse writeNewSize# " << WriteNewSize;
);

if (SplitMergeEnabled(Config)) {
SplitMergeAvgWriteBytes->Update(WriteNewSize, now);
auto needScaling = CheckScaleStatus(ctx);
Expand Down Expand Up @@ -551,7 +556,12 @@ NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) {
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." <<
"TPartition::CheckScaleStatus"
<< " splitMergeAvgWriteBytes# " << SplitMergeAvgWriteBytes->GetValue()
<< " writeSpeedUsagePercent# " << writeSpeedUsagePercent
<< " scaleThresholdSeconds# " << Config.GetPartitionStrategy().GetScaleThresholdSeconds()
<< " totalPartitionWriteSpeed# " << TotalPartitionWriteSpeed
<< " Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition
);
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/persqueue/ut/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,16 +742,17 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

auto msg = TString(1_MB, 'a');

auto writeSession = CreateWriteSession(client, "producer-1", 0);
auto writeSession = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
Sleep(TDuration::Seconds(10));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

auto writeSession2 = CreateWriteSession(client, "producer-1", 1);
auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
Sleep(TDuration::Seconds(10));
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
Sleep(TDuration::Seconds(5));
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ TTopicSdkTestSetup CreateSetup() {
return setup;
}

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic) {
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic, bool useCodec) {
auto writeSettings = TWriteSessionSettings()
.Path(topic)
.ProducerId(producer);
if (!useCodec) {
writeSettings.Codec(ECodec::RAW);
}
if (partition) {
writeSettings.PartitionId(*partition);
} else {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo);

TTopicSdkTestSetup CreateSetup();

std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC);
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC, bool useCodec = true);

struct TTestReadSession {
struct MsgInfo {
Expand Down

0 comments on commit 9524970

Please sign in to comment.