From 0096092d195cc7e0b678ae778ee4a7cd13230f81 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Nov 2025 14:35:27 +0500 Subject: [PATCH] Fixed an error in calculating metrics in the PQ tablet (#29321) --- .../pqtablet/partition/mirrorer/mirrorer.cpp | 3 + .../pqtablet/partition/partition.cpp | 14 ++--- ydb/core/persqueue/pqtablet/pq_impl.cpp | 27 ++++----- ydb/core/persqueue/pqtablet/pq_impl_types.h | 7 +-- ydb/core/tablet/tablet_counters.h | 55 +++++++++++++++---- 5 files changed, 68 insertions(+), 38 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp index a09e659da859..0d642bc26b50 100644 --- a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp @@ -41,6 +41,7 @@ TMirrorer::TMirrorer( , Config(config) { Counters.Populate(counters); + Counters.ResetCounters(); } void TMirrorer::Bootstrap(const TActorContext& ctx) { @@ -239,6 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters)); + Counters.Cumulative().ResetCounters(); + Counters.Percentile().ResetCounters(); if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) { LastStateLogTimestamp = ctx.Now(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 51d964a2397a..de2af18c042b 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -174,13 +174,11 @@ bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const } struct TMirrorerInfo { - TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline) + TMirrorerInfo(const TActorId& actor) : Actor(actor) { - Baseline.Populate(baseline); } TActorId Actor; - TTabletCountersBase Baseline; }; const TString& TPartition::TopicName() const { @@ -359,6 +357,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo , SamplingControl(samplingControl) { TabletCounters.Populate(*Counters); + TabletCounters.ResetCounters(); } void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { @@ -447,6 +446,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup()); ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters)); + TabletCounters.Cumulative().ResetCounters(); + TabletCounters.Percentile().ResetCounters(); ui64 usedStorage = GetUsedStorage(now); if (usedStorage > 0) { @@ -606,9 +607,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) { if (Mirrorer) { - auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline); - TabletCounters.Populate(*diff.Get()); - ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline); + TabletCounters.Populate(ev->Get()->Counters); } } @@ -4217,8 +4216,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { void TPartition::CreateMirrorerActor() { Mirrorer = MakeHolder( - RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), - TabletCounters + RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)) ); } diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 4e84852b0150..8c3ef7b0be92 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -636,8 +636,7 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi Partitions.emplace(std::piecewise_construct, std::forward_as_tuple(partitionId), std::forward_as_tuple(actorId, - GetPartitionKeyRange(config, partition), - *Counters)); + GetPartitionKeyRange(config, partition))); ++OriginalPartitionsCount; } @@ -674,8 +673,7 @@ void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId) { Partitions.emplace(partitionId, TPartitionInfo(TActorId(), - {}, - *Counters)); + {})); NewSupportivePartitions.insert(partitionId); } @@ -1077,11 +1075,12 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" << " PartitionId " << ev->Get()->Partition); - const auto& partitionId = ev->Get()->Partition; + auto& partitionId = ev->Get()->Partition; auto& partition = GetPartitionInfo(partitionId); - auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline); - ui64 cpuUsage = diff->Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get(); - ui64 networkBytesUsage = diff->Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get(); + + auto& counters = ev->Get()->Counters; + ui64 cpuUsage = counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get(); + ui64 networkBytesUsage = counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get(); if (ResourceMetrics) { if (cpuUsage > 0) { ResourceMetrics->CPU.Increment(cpuUsage); @@ -1093,17 +1092,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte ResourceMetrics->TryUpdate(ctx); } } + Counters->Percentile().Populate(counters.Percentile()); + Counters->Cumulative().Populate(counters.Cumulative()); - Counters->Populate(*diff.Get()); - ev->Get()->Counters.RememberCurrentStateAsBaseline(partition.Baseline); + partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get(); // restore cache's simple counters cleaned by partition's counters SetCacheCounters(CacheCounters); - ui64 reservedSize = 0; - for (auto& p : Partitions) { - if (p.second.Baseline.Simple().Size() > 0) //there could be no counters from this partition yet - reservedSize += p.second.Baseline.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get(); - } + ui64 reservedSize = std::accumulate(Partitions.begin(), Partitions.end(), 0ul, + [](ui64 sum, const auto& p) { return sum + p.second.ReservedBytes; }); Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize); // Features of the implementation of SimpleCounters. It is necessary to restore the value of diff --git a/ydb/core/persqueue/pqtablet/pq_impl_types.h b/ydb/core/persqueue/pqtablet/pq_impl_types.h index 6b7cad3bbceb..731d68d4a383 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl_types.h +++ b/ydb/core/persqueue/pqtablet/pq_impl_types.h @@ -6,13 +6,11 @@ namespace NKikimr::NPQ { struct TPartitionInfo { TPartitionInfo(const TActorId& actor, - TMaybe&& keyRange, - const TTabletCountersBase& baseline) + TMaybe&& keyRange) : Actor(actor) , KeyRange(std::move(keyRange)) , InitDone(false) { - Baseline.Populate(baseline); } TPartitionInfo(const TPartitionInfo& info) @@ -21,14 +19,13 @@ struct TPartitionInfo { , InitDone(info.InitDone) , PendingRequests(info.PendingRequests) { - Baseline.Populate(info.Baseline); } TActorId Actor; TMaybe KeyRange; bool InitDone; - TTabletCountersBase Baseline; THashMap LabeledCounters; + size_t ReservedBytes = 0; struct TPendingRequest { TPendingRequest(ui64 cookie, diff --git a/ydb/core/tablet/tablet_counters.h b/ydb/core/tablet/tablet_counters.h index 6c58c7211ef8..61a82d62e860 100644 --- a/ydb/core/tablet/tablet_counters.h +++ b/ydb/core/tablet/tablet_counters.h @@ -129,10 +129,16 @@ class TTabletCumulativeCounter : public TTabletSimpleCounterBase{ void Initialize(const TTabletCumulativeCounter& rp) { SetTo(rp); } + + void Set(ui64 value) { + Value = value; + } + void AdjustToBaseLine(const TTabletCumulativeCounter& baseLine) { Y_DEBUG_ABORT_UNLESS(Value >= baseLine.Value); Value -= baseLine.Value; } + void SetTo(const TTabletCumulativeCounter& rp) { Value = rp.Value; } @@ -375,6 +381,18 @@ class TCountersArray : TNonCopyable { return CountersQnt; } + void Populate(const TCountersArray& rp) { + if (CountersQnt != rp.CountersQnt) { + Reset(rp); + } else { + for (ui32 i = 0, e = CountersQnt; i < e; ++i) { + Counters[i].Populate(rp.Counters[i]); + } + } + } + + void ResetCounters(); + private: // void Reset(const TCountersArray& rp) { @@ -408,22 +426,33 @@ class TCountersArray : TNonCopyable { } } - void Populate(const TCountersArray& rp) { - if (CountersQnt != rp.CountersQnt) { - Reset(rp); - } else { - for (ui32 i = 0, e = CountersQnt; i < e; ++i) { - Counters[i].Populate(rp.Counters[i]); - } - } - } - // ui32 CountersQnt; TCountersHolder CountersHolder; T* Counters; }; +template <> +inline void TCountersArray::ResetCounters() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Set(0); + } +} + +template <> +inline void TCountersArray::ResetCounters() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Set(0); + } +} + +template <> +inline void TCountersArray::ResetCounters() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Clear(); + } +} + //////////////////////////////////////////// /// The TTabletCountersBase class //////////////////////////////////////////// @@ -539,6 +568,12 @@ class TTabletCountersBase { } } + void ResetCounters() { + SimpleCounters.ResetCounters(); + CumulativeCounters.ResetCounters(); + PercentileCounters.ResetCounters(); + } + private: // TTabletCountersBase(const TTabletCountersBase&);