Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/persqueue/pqtablet/partition/mirrorer/mirrorer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TMirrorer::TMirrorer(
, Config(config)
{
Counters.Populate(counters);
Counters.ResetCounters();
}

void TMirrorer::Bootstrap(const TActorContext& ctx) {
Expand Down Expand Up @@ -239,6 +240,8 @@ void TMirrorer::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext&
void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorContext& ctx) {
ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
ctx.Send(PartitionActor, new TEvPQ::TEvMirrorerCounters(Counters));
Counters.Cumulative().ResetCounters();
Counters.Percentile().ResetCounters();

if (ctx.Now() - LastStateLogTimestamp > LOG_STATE_INTERVAL) {
LastStateLogTimestamp = ctx.Now();
Expand Down
14 changes: 6 additions & 8 deletions ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,11 @@ bool TPartition::LastOffsetHasBeenCommited(const TUserInfoBase& userInfo) const
}

struct TMirrorerInfo {
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
TMirrorerInfo(const TActorId& actor)
: Actor(actor) {
Baseline.Populate(baseline);
}

TActorId Actor;
TTabletCountersBase Baseline;
};

const TString& TPartition::TopicName() const {
Expand Down Expand Up @@ -370,6 +368,7 @@ TPartition::TPartition(ui64 tabletId, const TPartitionId& partition, const TActo
, SamplingControl(samplingControl)
{
TabletCounters.Populate(*Counters);
TabletCounters.ResetCounters();
}

void TPartition::EmplaceResponse(TMessage&& message, const TActorContext& ctx) {
Expand Down Expand Up @@ -467,6 +466,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {

ctx.Schedule(WAKE_TIMEOUT, new TEvents::TEvWakeup());
ctx.Send(TabletActorId, new TEvPQ::TEvPartitionCounters(Partition, TabletCounters));
TabletCounters.Cumulative().ResetCounters();
TabletCounters.Percentile().ResetCounters();

ui64 usedStorage = GetUsedStorage(now);
if (usedStorage > 0) {
Expand Down Expand Up @@ -619,9 +620,7 @@ bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorCont

void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& /*ctx*/) {
if (Mirrorer) {
auto diff = ev->Get()->Counters.MakeDiffForAggr(Mirrorer->Baseline);
TabletCounters.Populate(*diff.Get());
ev->Get()->Counters.RememberCurrentStateAsBaseline(Mirrorer->Baseline);
TabletCounters.Populate(ev->Get()->Counters);
}
}

Expand Down Expand Up @@ -4232,8 +4231,7 @@ size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {

void TPartition::CreateMirrorerActor() {
Mirrorer = MakeHolder<TMirrorerInfo>(
RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters)),
TabletCounters
RegisterWithSameMailbox(CreateMirrorer(TabletId, TabletActorId, SelfId(), TopicConverter, Partition.InternalPartitionId, IsLocalDC, GetEndOffset(), Config.GetPartitionConfig().GetMirrorFrom(), TabletCounters))
);
}

Expand Down
27 changes: 12 additions & 15 deletions ydb/core/persqueue/pqtablet/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,7 @@ void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& confi
Partitions.emplace(std::piecewise_construct,
std::forward_as_tuple(partitionId),
std::forward_as_tuple(actorId,
GetPartitionKeyRange(config, partition),
*Counters));
GetPartitionKeyRange(config, partition)));
++OriginalPartitionsCount;
}

Expand Down Expand Up @@ -674,8 +673,7 @@ void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId)
{
Partitions.emplace(partitionId,
TPartitionInfo(TActorId(),
{},
*Counters));
{}));
NewSupportivePartitions.insert(partitionId);
}

Expand Down Expand Up @@ -1077,11 +1075,12 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
PQ_LOG_T("Handle TEvPQ::TEvPartitionCounters" <<
" PartitionId " << ev->Get()->Partition);

const auto& partitionId = ev->Get()->Partition;
auto& partitionId = ev->Get()->Partition;
auto& partition = GetPartitionInfo(partitionId);
auto diff = ev->Get()->Counters.MakeDiffForAggr(partition.Baseline);
ui64 cpuUsage = diff->Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
ui64 networkBytesUsage = diff->Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();

auto& counters = ev->Get()->Counters;
ui64 cpuUsage = counters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE].Get();
ui64 networkBytesUsage = counters.Cumulative()[COUNTER_PQ_TABLET_NETWORK_BYTES_USAGE].Get();
if (ResourceMetrics) {
if (cpuUsage > 0) {
ResourceMetrics->CPU.Increment(cpuUsage);
Expand All @@ -1093,17 +1092,15 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionCounters::TPtr& ev, const TActorConte
ResourceMetrics->TryUpdate(ctx);
}
}
Counters->Percentile().Populate(counters.Percentile());
Counters->Cumulative().Populate(counters.Cumulative());

Counters->Populate(*diff.Get());
ev->Get()->Counters.RememberCurrentStateAsBaseline(partition.Baseline);
partition.ReservedBytes = counters.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();

// restore cache's simple counters cleaned by partition's counters
SetCacheCounters(CacheCounters);
ui64 reservedSize = 0;
for (auto& p : Partitions) {
if (p.second.Baseline.Simple().Size() > 0) //there could be no counters from this partition yet
reservedSize += p.second.Baseline.Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Get();
}
ui64 reservedSize = std::accumulate(Partitions.begin(), Partitions.end(), 0ul,
[](ui64 sum, const auto& p) { return sum + p.second.ReservedBytes; });
Counters->Simple()[COUNTER_PQ_TABLET_RESERVED_BYTES_SIZE].Set(reservedSize);

// Features of the implementation of SimpleCounters. It is necessary to restore the value of
Expand Down
7 changes: 2 additions & 5 deletions ydb/core/persqueue/pqtablet/pq_impl_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ namespace NKikimr::NPQ {

struct TPartitionInfo {
TPartitionInfo(const TActorId& actor,
TMaybe<TPartitionKeyRange>&& keyRange,
const TTabletCountersBase& baseline)
TMaybe<TPartitionKeyRange>&& keyRange)
: Actor(actor)
, KeyRange(std::move(keyRange))
, InitDone(false)
{
Baseline.Populate(baseline);
}

TPartitionInfo(const TPartitionInfo& info)
Expand All @@ -21,14 +19,13 @@ struct TPartitionInfo {
, InitDone(info.InitDone)
, PendingRequests(info.PendingRequests)
{
Baseline.Populate(info.Baseline);
}

TActorId Actor;
TMaybe<TPartitionKeyRange> KeyRange;
bool InitDone;
TTabletCountersBase Baseline;
THashMap<TString, TTabletLabeledCountersBase> LabeledCounters;
size_t ReservedBytes = 0;

struct TPendingRequest {
TPendingRequest(ui64 cookie,
Expand Down
55 changes: 45 additions & 10 deletions ydb/core/tablet/tablet_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ class TTabletCumulativeCounter : public TTabletSimpleCounterBase{
void Initialize(const TTabletCumulativeCounter& rp) {
SetTo(rp);
}

void Set(ui64 value) {
Value = value;
}

void AdjustToBaseLine(const TTabletCumulativeCounter& baseLine) {
Y_DEBUG_ABORT_UNLESS(Value >= baseLine.Value);
Value -= baseLine.Value;
}

void SetTo(const TTabletCumulativeCounter& rp) {
Value = rp.Value;
}
Expand Down Expand Up @@ -375,6 +381,18 @@ class TCountersArray : TNonCopyable {
return CountersQnt;
}

void Populate(const TCountersArray<T>& rp) {
if (CountersQnt != rp.CountersQnt) {
Reset(rp);
} else {
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
Counters[i].Populate(rp.Counters[i]);
}
}
}

void ResetCounters();

private:
//
void Reset(const TCountersArray<T>& rp) {
Expand Down Expand Up @@ -408,22 +426,33 @@ class TCountersArray : TNonCopyable {
}
}

void Populate(const TCountersArray<T>& rp) {
if (CountersQnt != rp.CountersQnt) {
Reset(rp);
} else {
for (ui32 i = 0, e = CountersQnt; i < e; ++i) {
Counters[i].Populate(rp.Counters[i]);
}
}
}

//
ui32 CountersQnt;
TCountersHolder CountersHolder;
T* Counters;
};

template <>
inline void TCountersArray<TTabletSimpleCounter>::ResetCounters() {
for (ui32 i = 0; i < CountersQnt; ++i) {
Counters[i].Set(0);
}
}

template <>
inline void TCountersArray<TTabletCumulativeCounter>::ResetCounters() {
for (ui32 i = 0; i < CountersQnt; ++i) {
Counters[i].Set(0);
}
}

template <>
inline void TCountersArray<TTabletPercentileCounter>::ResetCounters() {
for (ui32 i = 0; i < CountersQnt; ++i) {
Counters[i].Clear();
}
}

////////////////////////////////////////////
/// The TTabletCountersBase class
////////////////////////////////////////////
Expand Down Expand Up @@ -539,6 +568,12 @@ class TTabletCountersBase {
}
}

void ResetCounters() {
SimpleCounters.ResetCounters();
CumulativeCounters.ResetCounters();
PercentileCounters.ResetCounters();
}

private:
//
TTabletCountersBase(const TTabletCountersBase&);
Expand Down
Loading