From 1652cc8387b1d515e5cd3da2034b7dcecda3bc74 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 21 Nov 2025 12:55:44 +0000 Subject: [PATCH 1/4] WIP --- .../pqtablet/partition/mirrorer/mirrorer.cpp | 2 ++ .../pqtablet/partition/partition.cpp | 2 ++ ydb/core/persqueue/pqtablet/pq_impl.cpp | 22 ++++++------- ydb/core/persqueue/pqtablet/pq_impl_types.h | 7 ++--- ydb/core/tablet/tablet_counters.h | 31 +++++++++++++++++++ 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp index 7874d3811781..d993ef3c42cf 100644 --- a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp @@ -41,6 +41,7 @@ TMirrorer::TMirrorer( , Config(config) { Counters.Populate(counters); + Counters.ResetToZero(); } void TMirrorer::Bootstrap(const TActorContext& ctx) { @@ -239,6 +240,7 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters)); + Counters.ResetToZero(); if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) { LastStateLogTimestamp = ctx.Now(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 81a94c95eb07..0281ff05c65a 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -370,6 +370,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo , SamplingControl(samplingControl) { TabletCounters.Populate(*Counters); + TabletCounters.ResetToZero(); } void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { @@ -467,6 +468,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup()); ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters)); + TabletCounters.ResetToZero(); ui64 usedStorage = GetUsedStorage(now); if (usedStorage > 0) { diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 7f7d627d3c03..432a59668f5c 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -636,8 +636,7 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi Partitions.emplace(std::piecewise_construct, std::forward_as_tuple(partitionId), std::forward_as_tuple(actorId, - GetPartitionKeyRange(config, partition), - *Counters)); + GetPartitionKeyRange(config, partition))); ++OriginalPartitionsCount; } @@ -674,8 +673,7 @@ void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId) { Partitions.emplace(partitionId, TPartitionInfo(TActorId(), - {}, - *Counters)); + {})); NewSupportivePartitions.insert(partitionId); } @@ -1077,11 +1075,12 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" << " PartitionId " << ev->Get()->Partition); - const auto& partitionId = ev->Get()->Partition; + auto& partitionId = ev->Get()->Partition; auto& partition = GetPartitionInfo(partitionId); - auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline); - ui64 cpuUsage = diff->Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get(); - ui64 networkBytesUsage = diff->Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get(); + + auto& counters = ev->Get()->Counters; + ui64 cpuUsage = counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get(); + ui64 networkBytesUsage = counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get(); if (ResourceMetrics) { if (cpuUsage > 0) { ResourceMetrics->CPU.Increment(cpuUsage); @@ -1093,16 +1092,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte ResourceMetrics->TryUpdate(ctx); } } + Counters->Populate(counters); - Counters->Populate(*diff.Get()); - ev->Get()->Counters.RememberCurrentStateAsBaseline(partition.Baseline); + partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get(); // restore cache's simple counters cleaned by partition's counters SetCacheCounters(CacheCounters); ui64 reservedSize = 0; for (auto& p : Partitions) { - if (p.second.Baseline.Simple().Size() > 0) //there could be no counters from this partition yet - reservedSize += p.second.Baseline.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get(); + reservedSize += p.second.ReservedBytes; } Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize); diff --git a/ydb/core/persqueue/pqtablet/pq_impl_types.h b/ydb/core/persqueue/pqtablet/pq_impl_types.h index 6b7cad3bbceb..731d68d4a383 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl_types.h +++ b/ydb/core/persqueue/pqtablet/pq_impl_types.h @@ -6,13 +6,11 @@ namespace NKikimr::NPQ { struct TPartitionInfo { TPartitionInfo(const TActorId& actor, - TMaybe&& keyRange, - const TTabletCountersBase& baseline) + TMaybe&& keyRange) : Actor(actor) , KeyRange(std::move(keyRange)) , InitDone(false) { - Baseline.Populate(baseline); } TPartitionInfo(const TPartitionInfo& info) @@ -21,14 +19,13 @@ struct TPartitionInfo { , InitDone(info.InitDone) , PendingRequests(info.PendingRequests) { - Baseline.Populate(info.Baseline); } TActorId Actor; TMaybe KeyRange; bool InitDone; - TTabletCountersBase Baseline; THashMap LabeledCounters; + size_t ReservedBytes = 0; struct TPendingRequest { TPendingRequest(ui64 cookie, diff --git a/ydb/core/tablet/tablet_counters.h b/ydb/core/tablet/tablet_counters.h index 6c58c7211ef8..74e097540ab8 100644 --- a/ydb/core/tablet/tablet_counters.h +++ b/ydb/core/tablet/tablet_counters.h @@ -129,10 +129,16 @@ class TTabletCumulativeCounter : public TTabletSimpleCounterBase{ void Initialize(const TTabletCumulativeCounter& rp) { SetTo(rp); } + + void Set(ui64 value) { + Value = value; + } + void AdjustToBaseLine(const TTabletCumulativeCounter& baseLine) { Y_DEBUG_ABORT_UNLESS(Value >= baseLine.Value); Value -= baseLine.Value; } + void SetTo(const TTabletCumulativeCounter& rp) { Value = rp.Value; } @@ -393,6 +399,8 @@ class TCountersArray : TNonCopyable { } } + void ResetToZero(); + // void AdjustToBaseLine(const TCountersArray& baseLine) { Y_DEBUG_ABORT_UNLESS(baseLine.CountersQnt == CountersQnt); @@ -424,6 +432,24 @@ class TCountersArray : TNonCopyable { T* Counters; }; +template <> +inline void TCountersArray::ResetToZero() { +} + +template <> +inline void TCountersArray::ResetToZero() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Set(0); + } +} + +template <> +inline void TCountersArray::ResetToZero() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Clear(); + } +} + //////////////////////////////////////////// /// The TTabletCountersBase class //////////////////////////////////////////// @@ -539,6 +565,11 @@ class TTabletCountersBase { } } + void ResetToZero() { + CumulativeCounters.ResetToZero(); + PercentileCounters.ResetToZero(); + } + private: // TTabletCountersBase(const TTabletCountersBase&); From f7891ce4a65685411b63884b3c47d91c61b76a3c Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 21 Nov 2025 13:25:43 +0000 Subject: [PATCH 2/4] fix --- .../pqtablet/partition/mirrorer/mirrorer.cpp | 5 ++-- .../pqtablet/partition/partition.cpp | 16 ++++------ ydb/core/tablet/tablet_counters.h | 30 ++++++++++++++----- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp index d993ef3c42cf..7a067043d1e4 100644 --- a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp @@ -41,7 +41,7 @@ TMirrorer::TMirrorer( , Config(config) { Counters.Populate(counters); - Counters.ResetToZero(); + Counters.ResetCounters(); } void TMirrorer::Bootstrap(const TActorContext& ctx) { @@ -240,7 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters)); - Counters.ResetToZero(); + Counters.ResetCumulativeCounters(); + Counters.ResetPercentileCounters(); if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) { LastStateLogTimestamp = ctx.Now(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 0281ff05c65a..5c58e9463989 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -174,13 +174,11 @@ bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const } struct TMirrorerInfo { - TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline) + TMirrorerInfo(const TActorId& actor) : Actor(actor) { - Baseline.Populate(baseline); } TActorId Actor; - TTabletCountersBase Baseline; }; const TString& TPartition::TopicName() const { @@ -370,7 +368,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo , SamplingControl(samplingControl) { TabletCounters.Populate(*Counters); - TabletCounters.ResetToZero(); + TabletCounters.ResetCounters(); } void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) { @@ -468,7 +466,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup()); ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters)); - TabletCounters.ResetToZero(); + TabletCounters.ResetCumulativeCounters(); + TabletCounters.ResetPercentileCounters(); ui64 usedStorage = GetUsedStorage(now); if (usedStorage > 0) { @@ -621,9 +620,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) { if (Mirrorer) { - auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline); - TabletCounters.Populate(*diff.Get()); - ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline); + TabletCounters.Populate(ev->Get()->Counters); } } @@ -4234,8 +4231,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { void TPartition::CreateMirrorerActor() { Mirrorer = MakeHolder( - RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)), - TabletCounters + RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)) ); } diff --git a/ydb/core/tablet/tablet_counters.h b/ydb/core/tablet/tablet_counters.h index 74e097540ab8..b21144672d9d 100644 --- a/ydb/core/tablet/tablet_counters.h +++ b/ydb/core/tablet/tablet_counters.h @@ -399,7 +399,7 @@ class TCountersArray : TNonCopyable { } } - void ResetToZero(); + void ResetCounter(); // void AdjustToBaseLine(const TCountersArray& baseLine) { @@ -433,18 +433,21 @@ class TCountersArray : TNonCopyable { }; template <> -inline void TCountersArray::ResetToZero() { +inline void TCountersArray::ResetCounter() { + for (ui32 i = 0; i < CountersQnt; ++i) { + Counters[i].Set(0); + } } template <> -inline void TCountersArray::ResetToZero() { +inline void TCountersArray::ResetCounter() { for (ui32 i = 0; i < CountersQnt; ++i) { Counters[i].Set(0); } } template <> -inline void TCountersArray::ResetToZero() { +inline void TCountersArray::ResetCounter() { for (ui32 i = 0; i < CountersQnt; ++i) { Counters[i].Clear(); } @@ -565,9 +568,22 @@ class TTabletCountersBase { } } - void ResetToZero() { - CumulativeCounters.ResetToZero(); - PercentileCounters.ResetToZero(); + void ResetCounters() { + SimpleCounters.ResetCounter(); + CumulativeCounters.ResetCounter(); + PercentileCounters.ResetCounter(); + } + + void ResetSimpleCounters() { + SimpleCounters.ResetCounter(); + } + + void ResetCumulativeCounters() { + CumulativeCounters.ResetCounter(); + } + + void ResetPercentileCounters() { + PercentileCounters.ResetCounter(); } private: From d4ccc74a59018a0303f163f891a2b56c42345ab0 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Nov 2025 06:16:17 +0000 Subject: [PATCH 3/4] fix --- .../pqtablet/partition/mirrorer/mirrorer.cpp | 4 +- .../pqtablet/partition/partition.cpp | 4 +- ydb/core/persqueue/pqtablet/pq_impl.cpp | 9 ++-- ydb/core/tablet/tablet_counters.h | 48 +++++++------------ 4 files changed, 26 insertions(+), 39 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp index 7a067043d1e4..36d14e45313b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp @@ -240,8 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters)); - Counters.ResetCumulativeCounters(); - Counters.ResetPercentileCounters(); + TabletCounters.Cumulative().ResetCounters(); + TabletCounters.Percentile().ResetCounters(); if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) { LastStateLogTimestamp = ctx.Now(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index c99e9955fb64..2f36deb48a98 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -466,8 +466,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup()); ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters)); - TabletCounters.ResetCumulativeCounters(); - TabletCounters.ResetPercentileCounters(); + TabletCounters.Cumulative().ResetCounters(); + TabletCounters.Percentile().ResetCounters(); ui64 usedStorage = GetUsedStorage(now); if (usedStorage > 0) { diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index 64f87646adf6..85effbb592c5 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -1092,16 +1092,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte ResourceMetrics->TryUpdate(ctx); } } - Counters->Populate(counters); + Counters->Percentile().Populate(counters.Percentile()); + Counters->Cumulative().Populate(counters.Cumulative()); partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get(); // restore cache's simple counters cleaned by partition's counters SetCacheCounters(CacheCounters); - ui64 reservedSize = 0; - for (auto& p : Partitions) { - reservedSize += p.second.ReservedBytes; - } + ui64 reservedSize = std::accumulate(Partitions.begin(), Partitions.end(), 0ul, + [](ui64 sum, const auto& p) { return sum + p.second.ReservedBytes; }); Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize); // Features of the implementation of SimpleCounters. It is necessary to restore the value of diff --git a/ydb/core/tablet/tablet_counters.h b/ydb/core/tablet/tablet_counters.h index b21144672d9d..61a82d62e860 100644 --- a/ydb/core/tablet/tablet_counters.h +++ b/ydb/core/tablet/tablet_counters.h @@ -381,6 +381,18 @@ class TCountersArray : TNonCopyable { return CountersQnt; } + void Populate(const TCountersArray& rp) { + if (CountersQnt != rp.CountersQnt) { + Reset(rp); + } else { + for (ui32 i = 0, e = CountersQnt; i < e; ++i) { + Counters[i].Populate(rp.Counters[i]); + } + } + } + + void ResetCounters(); + private: // void Reset(const TCountersArray& rp) { @@ -399,8 +411,6 @@ class TCountersArray : TNonCopyable { } } - void ResetCounter(); - // void AdjustToBaseLine(const TCountersArray& baseLine) { Y_DEBUG_ABORT_UNLESS(baseLine.CountersQnt == CountersQnt); @@ -416,16 +426,6 @@ class TCountersArray : TNonCopyable { } } - void Populate(const TCountersArray& rp) { - if (CountersQnt != rp.CountersQnt) { - Reset(rp); - } else { - for (ui32 i = 0, e = CountersQnt; i < e; ++i) { - Counters[i].Populate(rp.Counters[i]); - } - } - } - // ui32 CountersQnt; TCountersHolder CountersHolder; @@ -433,21 +433,21 @@ class TCountersArray : TNonCopyable { }; template <> -inline void TCountersArray::ResetCounter() { +inline void TCountersArray::ResetCounters() { for (ui32 i = 0; i < CountersQnt; ++i) { Counters[i].Set(0); } } template <> -inline void TCountersArray::ResetCounter() { +inline void TCountersArray::ResetCounters() { for (ui32 i = 0; i < CountersQnt; ++i) { Counters[i].Set(0); } } template <> -inline void TCountersArray::ResetCounter() { +inline void TCountersArray::ResetCounters() { for (ui32 i = 0; i < CountersQnt; ++i) { Counters[i].Clear(); } @@ -569,21 +569,9 @@ class TTabletCountersBase { } void ResetCounters() { - SimpleCounters.ResetCounter(); - CumulativeCounters.ResetCounter(); - PercentileCounters.ResetCounter(); - } - - void ResetSimpleCounters() { - SimpleCounters.ResetCounter(); - } - - void ResetCumulativeCounters() { - CumulativeCounters.ResetCounter(); - } - - void ResetPercentileCounters() { - PercentileCounters.ResetCounter(); + SimpleCounters.ResetCounters(); + CumulativeCounters.ResetCounters(); + PercentileCounters.ResetCounters(); } private: From 86cea6310921247ef20788e2e5818a09286d8a1e Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 24 Nov 2025 06:39:02 +0000 Subject: [PATCH 4/4] fix --- ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp index 36d14e45313b..efd426df672f 100644 --- a/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp @@ -240,8 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters)); - TabletCounters.Cumulative().ResetCounters(); - TabletCounters.Percentile().ResetCounters(); + Counters.Cumulative().ResetCounters(); + Counters.Percentile().ResetCounters(); if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) { LastStateLogTimestamp = ctx.Now();