1+ #include " read_balancer.h"
2+ #include " read_balancer__metrics.h"
3+
4+ #include < ydb/core/base/appdata.h>
5+ #include < ydb/core/persqueue/common/percentiles.h>
6+ #include < ydb/core/protos/counters_pq.pb.h>
7+ #include < ydb/core/protos/pqconfig.pb.h>
8+ #include < ydb/core/tablet/tablet_counters_protobuf.h>
9+ #include < ydb/library/actors/core/actor.h>
10+ #include < ydb/library/persqueue/topic_parser/topic_parser.h>
11+
12+ namespace NKikimr ::NPQ {
13+
14+
15+ namespace {
16+
17+ template <const NProtoBuf::EnumDescriptor* SimpleDesc ()>
18+ constexpr std::string GetLabels() {
19+ auto desc = NAux::GetLabeledCounterOpts<SimpleDesc>();
20+ auto groupNames = desc->GetGroupNames ();
21+
22+ std::string labels;
23+ for (size_t i = 0 ; i < desc->GetGroupNamesSize (); ++i) {
24+ if (i) {
25+ labels.push_back (' |' );
26+ }
27+ labels.append (groupNames[i]);
28+ }
29+
30+ return labels;
31+ }
32+
33+
34+ template <const NProtoBuf::EnumDescriptor* SimpleDesc ()>
35+ struct TMetricCollector {
36+ using TConfig = TProtobufTabletLabeledCounters<SimpleDesc>;
37+
38+ void Collect (const auto & values) {
39+ Collect (values.begin (), values.end ());
40+ }
41+
42+ void Collect (auto begin, auto end) {
43+ ssize_t in_size = std::distance (begin, end);
44+ AFL_ENSURE (in_size >= 0 )(" in_size" , in_size);
45+
46+ ssize_t count = std::min<ssize_t >(Counters.GetCounters ().Size (), in_size);
47+ for (ssize_t i = 0 ; i < count; ++i) {
48+ Counters.GetCounters ()[i] = *begin;
49+ ++begin;
50+ }
51+
52+ // here, the aggregation function configured in the protofile for each counter is used for each counter.
53+ Aggregator.AggregateWith (Counters);
54+ }
55+
56+ TConfig Counters = TConfig (GetLabels<SimpleDesc>(), 0 , " " );
57+ TTabletLabeledCountersBase Aggregator;
58+ };
59+
60+ struct TConsumerMetricCollector {
61+ TMetricCollector<EClientLabeledCounters_descriptor> ClientLabeledCounters;
62+ // TMetricCollector<EMLPConsumerLabeledCounters_descriptor> MLPConsumerLabeledCounters;
63+ // TMetricCollector MLPMessageLockAttemptsCounter;
64+ };
65+
66+ struct TTopicMetricCollector {
67+ TTopicMetricCollector (absl::flat_hash_map<ui32, TTopicMetrics::TPartitionMetrics> partitionMetrics)
68+ : ExistedPartitionMetrics(std::move(partitionMetrics))
69+ {
70+ }
71+
72+ absl::flat_hash_map<ui32, TTopicMetrics::TPartitionMetrics> ExistedPartitionMetrics;
73+
74+ TTopicMetrics TopicMetrics;
75+
76+ TMetricCollector<EPartitionLabeledCounters_descriptor> PartitionLabeledCounters;
77+ TMetricCollector<EPartitionExtendedLabeledCounters_descriptor> PartitionExtendedLabeledCounters;
78+ TMetricCollector<EPartitionKeyCompactionLabeledCounters_descriptor> PartitionKeyCompactionLabeledCounters;
79+
80+ std::unordered_map<TString, TConsumerMetricCollector> Consumers;
81+
82+ void Collect (const NKikimrPQ::TStatusResponse::TPartResult& partitionStatus) {
83+ TopicMetrics.TotalDataSize += partitionStatus.GetPartitionSize ();
84+ TopicMetrics.TotalUsedReserveSize += partitionStatus.GetUsedReserveSize ();
85+
86+ TopicMetrics.TotalAvgWriteSpeedPerSec += partitionStatus.GetAvgWriteSpeedPerSec ();
87+ TopicMetrics.MaxAvgWriteSpeedPerSec = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerSec , partitionStatus.GetAvgWriteSpeedPerSec ());
88+ TopicMetrics.TotalAvgWriteSpeedPerMin += partitionStatus.GetAvgWriteSpeedPerMin ();
89+ TopicMetrics.MaxAvgWriteSpeedPerMin = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerMin , partitionStatus.GetAvgWriteSpeedPerMin ());
90+ TopicMetrics.TotalAvgWriteSpeedPerHour += partitionStatus.GetAvgWriteSpeedPerHour ();
91+ TopicMetrics.MaxAvgWriteSpeedPerHour = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerHour , partitionStatus.GetAvgWriteSpeedPerHour ());
92+ TopicMetrics.TotalAvgWriteSpeedPerDay += partitionStatus.GetAvgWriteSpeedPerDay ();
93+ TopicMetrics.MaxAvgWriteSpeedPerDay = Max<ui64>(TopicMetrics.MaxAvgWriteSpeedPerDay , partitionStatus.GetAvgWriteSpeedPerDay ());
94+
95+ auto & partitionMetrics = TopicMetrics.PartitionMetrics [partitionStatus.GetPartition ()];
96+ partitionMetrics.DataSize = partitionStatus.GetPartitionSize ();
97+ partitionMetrics.UsedReserveSize = partitionStatus.GetUsedReserveSize ();
98+
99+ Collect (partitionStatus.GetAggregatedCounters ());
100+
101+ ExistedPartitionMetrics.erase (partitionStatus.GetPartition ());
102+ }
103+
104+ void Collect (const NKikimrPQ::TAggregatedCounters& counters) {
105+ PartitionLabeledCounters.Collect (counters.GetValues ());
106+ PartitionExtendedLabeledCounters.Collect (counters.GetExtendedCounters ().GetValues ());
107+ PartitionKeyCompactionLabeledCounters.Collect (counters.GetCompactionCounters ().GetValues ());
108+
109+ for (const auto & consumer : counters.GetConsumerAggregatedCounters ()) {
110+ Consumers[consumer.GetConsumer ()].ClientLabeledCounters .Collect (consumer.GetValues ());
111+ }
112+
113+ // TODO MLP
114+ // for (const auto& consumer : counters.GetMLPConsumerCounters()) {
115+ // auto& collector = Consumers[consumer.GetConsumer()];
116+ // collector.MLPConsumerLabeledCounters.Collect(consumer.GetCountersValues());
117+ // collector.MLPMessageLockAttemptsCounter.Collect(consumer.GetMessageLocksValues());
118+ // }
119+ }
120+
121+ void Finish () {
122+ for (auto & [partitionId, partitionMetrics] : ExistedPartitionMetrics) {
123+ TopicMetrics.TotalDataSize += partitionMetrics.DataSize ;
124+ TopicMetrics.TotalUsedReserveSize += partitionMetrics.UsedReserveSize ;
125+
126+ TopicMetrics.PartitionMetrics [partitionId] = partitionMetrics;
127+ }
128+ }
129+ };
130+
131+ template <const NProtoBuf::EnumDescriptor* SimpleDesc ()>
132+ TCounters InitializeCounters(
133+ NMonitoring::TDynamicCounterPtr root,
134+ const TString& databasePath,
135+ const std::vector<std::pair<TString, TString>>& subgroups = {},
136+ bool skipPrefix = true
137+ ) {
138+ auto group = root;
139+
140+ for (const auto & subgroup : subgroups) {
141+ group = group->GetSubgroup (subgroup.first , subgroup.second );
142+ }
143+
144+ using TConfig = TProtobufTabletLabeledCounters<SimpleDesc>;
145+ auto config = std::make_unique<TConfig>(GetLabels<SimpleDesc>(), 0 , databasePath);
146+
147+ std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> result;
148+ for (size_t i = 0 ; i < config->GetCounters ().Size (); ++i) {
149+ TString name = config->GetNames ()[i];
150+ if (skipPrefix) {
151+ TStringBuf nameBuf = name;
152+ nameBuf.SkipPrefix (" PQ/" );
153+ name = nameBuf;
154+ }
155+ result.push_back (name.empty () ? nullptr : group->GetExpiringNamedCounter (" name" , name, false ));
156+ }
157+
158+ return {
159+ .Config = std::move (config),
160+ .Counters = std::move (result)
161+ };
162+ }
163+
164+ void SetCounters (TCounters& counters, const auto & metrics) {
165+ ui64 now = TAppData::TimeProvider->Now ().MilliSeconds ();
166+ auto & aggregatedCounters = metrics.Aggregator .GetCounters ();
167+
168+ for (size_t i = 0 ; i < counters.Counters .size (); ++i) {
169+ if (!counters.Counters [i]) {
170+ continue ;
171+ }
172+ if (aggregatedCounters.Size () == i) {
173+ break ;
174+ }
175+
176+ auto value = aggregatedCounters[i].Get ();
177+ const auto & type = counters.Config ->GetCounterType (i);
178+ if (type == TLabeledCounterOptions::CT_TIMELAG) {
179+ value = value < now ? now - value : 0 ;
180+ }
181+
182+ counters.Counters [i]->Set (value);
183+ }
184+ }
185+
186+ }
187+
188+ TTopicMetricsHandler::TTopicMetricsHandler () = default ;
189+ TTopicMetricsHandler::~TTopicMetricsHandler () = default ;
190+
191+ const TTopicMetrics& TTopicMetricsHandler::GetTopicMetrics () const {
192+ return TopicMetrics;
193+ }
194+
195+ void TTopicMetricsHandler::Initialize (const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) {
196+ if (DynamicCounters) {
197+ return ;
198+ }
199+
200+ TStringBuf name = TStringBuf (topicPath);
201+ name.SkipPrefix (database.DatabasePath );
202+ name.SkipPrefix (" /" );
203+
204+ bool isServerless = AppData (ctx)->FeatureFlags .GetEnableDbCounters (); // TODO: find out it via describe
205+ DynamicCounters = AppData (ctx)->Counters ->GetSubgroup (" counters" , isServerless ? " topics_serverless" : " topics" )
206+ ->GetSubgroup (" host" , " " )
207+ ->GetSubgroup (" database" , database.DatabasePath )
208+ ->GetSubgroup (" cloud_id" , database.CloudId )
209+ ->GetSubgroup (" folder_id" , database.FolderId )
210+ ->GetSubgroup (" database_id" , database.DatabaseId )
211+ ->GetSubgroup (" topic" , TString (name));
212+
213+ ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter (" name" , " topic.partition.active_count" , false );
214+ InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter (" name" , " topic.partition.inactive_count" , false );
215+
216+ PartitionLabeledCounters = InitializeCounters<EPartitionLabeledCounters_descriptor>(DynamicCounters, database.DatabasePath );
217+ PartitionExtendedLabeledCounters = InitializeCounters<EPartitionExtendedLabeledCounters_descriptor>(DynamicCounters, database.DatabasePath , {}, true );
218+ InitializeKeyCompactionCounters (database.DatabasePath , tabletConfig);
219+ InitializeConsumerCounters (database.DatabasePath , tabletConfig, ctx);
220+ }
221+
222+ void TTopicMetricsHandler::InitializeConsumerCounters (const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig, const NActors::TActorContext& ctx) {
223+ for (const auto & consumer : tabletConfig.GetConsumers ()) {
224+ auto metricsConsumerName = NPersQueue::ConvertOldConsumerName (consumer.GetName (), ctx);
225+
226+ auto & counters = ConsumerCounters[consumer.GetName ()];
227+ counters.ClientLabeledCounters = InitializeCounters<EClientLabeledCounters_descriptor>(DynamicCounters, databasePath, {{" consumer" , metricsConsumerName}});
228+
229+ if (consumer.GetType () == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP) {
230+ // metrics.MLPClientLabeledCounters = InitializeCounters<EMLPConsumerLabeledCounters_descriptor>(DynamicCounters, databasePath, "topic|consumer", {{"consumer", metricsConsumerName}});
231+ // metrics.MLPMessageLockAttemptsCounter = InitializeCounters<EMLPMessageLockAttemptsLabeledCounters_descriptor>(DynamicCounters, databasePath, {{"consumer", metricsConsumerName}});
232+ }
233+ }
234+
235+ std::unordered_set<std::string_view> existedConsumers;
236+ for (const auto & consumer : tabletConfig.GetConsumers ()) {
237+ existedConsumers.insert (consumer.GetName ());
238+ }
239+ for (auto it = ConsumerCounters.begin (); it != ConsumerCounters.end ();) {
240+ if (!existedConsumers.contains (it->first )) {
241+ it = ConsumerCounters.erase (it);
242+ } else {
243+ ++it;
244+ }
245+ }
246+ }
247+
248+ void TTopicMetricsHandler::InitializeKeyCompactionCounters (const TString& databasePath, const NKikimrPQ::TPQTabletConfig& tabletConfig) {
249+ if (tabletConfig.GetEnableCompactification ()) {
250+ PartitionKeyCompactionLabeledCounters = InitializeCounters<EPartitionKeyCompactionLabeledCounters_descriptor>(DynamicCounters, databasePath, {}, true );
251+ } else {
252+ PartitionKeyCompactionLabeledCounters.Counters .clear ();
253+ }
254+ }
255+
256+ void TTopicMetricsHandler::UpdateConfig (const NKikimrPQ::TPQTabletConfig& tabletConfig, const TDatabaseInfo& database, const TString& topicPath, const NActors::TActorContext& ctx) {
257+ Y_UNUSED (topicPath);
258+
259+ if (!DynamicCounters) {
260+ return ;
261+ }
262+
263+ InitializeKeyCompactionCounters (database.DatabasePath , tabletConfig);
264+ InitializeConsumerCounters (database.DatabasePath , tabletConfig, ctx);
265+
266+ size_t inactiveCount = std::count_if (tabletConfig.GetAllPartitions ().begin (), tabletConfig.GetAllPartitions ().end (), [](auto & p) {
267+ return p.GetStatus () == NKikimrPQ::ETopicPartitionStatus::Inactive;
268+ });
269+
270+ ActivePartitionCountCounter->Set (tabletConfig.GetAllPartitions ().size () - inactiveCount);
271+ InactivePartitionCountCounter->Set (inactiveCount);
272+ }
273+
274+ void TTopicMetricsHandler::InitializePartitions (ui32 partitionId, ui64 dataSize, ui64 usedReserveSize) {
275+ TopicMetrics.PartitionMetrics [partitionId] = {
276+ .DataSize = dataSize,
277+ .UsedReserveSize = usedReserveSize
278+ };
279+
280+ TopicMetrics.TotalDataSize += dataSize;
281+ TopicMetrics.TotalUsedReserveSize += usedReserveSize;
282+ }
283+
284+ void TTopicMetricsHandler::Handle (NKikimrPQ::TStatusResponse_TPartResult&& partitionStatus) {
285+ PartitionStatuses[partitionStatus.GetPartition ()] = std::move (partitionStatus);
286+ }
287+
288+ void TTopicMetricsHandler::UpdateMetrics () {
289+ if (!DynamicCounters || PartitionStatuses.empty ()) {
290+ return ;
291+ }
292+
293+ TTopicMetricCollector collector (TopicMetrics.PartitionMetrics );
294+ for (auto & [_, partitionStatus] : PartitionStatuses) {
295+ collector.Collect (partitionStatus);
296+ }
297+ collector.Finish ();
298+ PartitionStatuses.clear ();
299+
300+ TopicMetrics = collector.TopicMetrics ;
301+
302+ SetCounters (PartitionLabeledCounters, collector.PartitionLabeledCounters );
303+ SetCounters (PartitionExtendedLabeledCounters, collector.PartitionExtendedLabeledCounters );
304+ SetCounters (PartitionKeyCompactionLabeledCounters, collector.PartitionKeyCompactionLabeledCounters );
305+
306+ for (auto & [consumer, consumerCounters] : ConsumerCounters) {
307+ auto it = collector.Consumers .find (consumer);
308+ if (it == collector.Consumers .end ()) {
309+ continue ;
310+ }
311+
312+ auto & consumerMetrics = it->second ;
313+
314+ SetCounters (consumerCounters.ClientLabeledCounters , consumerMetrics.ClientLabeledCounters );
315+ // if (!consumerCounters.MLPClientLabeledCounters.Counters.empty()) {
316+ // SetCounters(consumerCounters.MLPClientLabeledCounters, consumerMetrics.MLPConsumerLabeledCounters);
317+ // TODO MLPMessageLockAttemptsCounter
318+ // }
319+ }
320+ }
321+
322+ }
0 commit comments