From a6314fbb6cec1292ef02ae18d01f2c373daa7d0b Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Sat, 1 Nov 2025 18:28:07 +0300 Subject: [PATCH 1/2] [+] counter PQ/LatencyTxCalcPredicate --- ydb/core/persqueue/pqtablet/partition/partition.cpp | 4 +++- ydb/core/persqueue/pqtablet/partition/partition.h | 8 +++++--- ydb/core/protos/counters_pq.proto | 2 ++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index e8f2c6495e76..518825cbdf56 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -1663,6 +1663,8 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { if (isPredicate) { + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_TXCALCPREDICATE].IncrementFor((Now() - tx->CalcPredicateTimestamp).MilliSeconds()); + tx->CalcPredicateSpan.End(); tx->CalcPredicateSpan = {}; @@ -2195,7 +2197,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& void TPartition::PushBackDistrTx(TSimpleSharedPtr event) { - UserActionAndTransactionEvents.emplace_back(MakeSimpleShared(std::move(event))); + UserActionAndTransactionEvents.emplace_back(MakeSimpleShared(std::move(event), Now())); RequestWriteInfoIfRequired(); } diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index def8352fe48f..ac8e51446cab 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -57,13 +57,14 @@ class IAutopartitioningManager; class TPartitionCompaction; struct TTransaction { - - explicit TTransaction(TSimpleSharedPtr tx, - TMaybe predicate = Nothing()) + TTransaction(TSimpleSharedPtr tx, + TInstant calcPredicateTimestamp, + TMaybe predicate = Nothing()) : Tx(tx) , Predicate(predicate) , SupportivePartitionActor(tx->SupportivePartitionActor) , CalcPredicateSpan(std::move(tx->Span)) + , CalcPredicateTimestamp(calcPredicateTimestamp) { AFL_ENSURE(Tx); } @@ -123,6 +124,7 @@ struct TTransaction { NWilson::TSpan CommitSpan; TInstant WriteInfoResponseTimestamp; + TInstant CalcPredicateTimestamp; }; class TPartitionCompaction; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 1b604e200161..5a78fdf94000 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -141,6 +141,8 @@ enum EPercentileCounters { COUNTER_LATENCY_PQ_SPLIT_MESSAGE_GROUP = 19 [(CounterOpts) = {Name: "LatencySplitMessageGroup"}]; COUNTER_LATENCY_PQ_PUBLISH_READ = 20 [(CounterOpts) = {Name: "LatencyPublishRead"}]; COUNTER_LATENCY_PQ_FORGET_READ = 21 [(CounterOpts) = {Name: "LatencyForgetRead"}]; + + COUNTER_LATENCY_PQ_TXCALCPREDICATE = 22 [(CounterOpts) = {Name: "LatencyTxCalcPredicate"}]; } From 5af363c204fa18a6cbd8ab64fd547c3e99f1eee6 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 6 Nov 2025 11:59:53 +0300 Subject: [PATCH 2/2] [/] unused code --- ydb/core/persqueue/ut/partition_ut.cpp | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index d5a0a3f64211..054497962c4f 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -282,11 +282,6 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { void SendChangePartitionConfig(const TConfigParams& config = {}); void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {}); - TTransaction MakeTransaction(ui64 step, ui64 txId, - TString consumer, - ui64 begin, ui64 end, - TMaybe predicate = Nothing()); - void SendSubDomainStatus(bool subDomainOutOfSpace = false); void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false); void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true); @@ -1106,17 +1101,6 @@ void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigM } } -TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId, - TString consumer, - ui64 begin, ui64 end, - TMaybe predicate) -{ - auto event = MakeSimpleShared(step, txId); - event->AddOperation(std::move(consumer), begin, end); - - return TTransaction(event, predicate); -} - template void CompareVectors(const TVector& expected, const TIterable& actual) { auto i = 0u;