Skip to content

Commit c3da29f

Browse files
feat: shard-specific metrics tracking (#3520)
1 parent 5640232 commit c3da29f

File tree

8 files changed

+99
-9
lines changed

8 files changed

+99
-9
lines changed

waku/discovery/waku_discv5.nim

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
1414

1515
export protocol, waku_enr
1616

17-
declarePublicGauge waku_discv5_discovered, "number of nodes discovered"
17+
declarePublicGauge waku_discv5_discovered_per_shard,
18+
"number of nodes discovered by each shard", labels = ["shard"]
1819
declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"]
1920

2021
logScope:
@@ -231,7 +232,20 @@ proc findRandomPeers*(
231232
elif wd.predicate.isSome():
232233
discoveredRecords = discoveredRecords.filter(wd.predicate.get())
233234

234-
waku_discv5_discovered.inc(discoveredRecords.len)
235+
# Increment metric for each discovered record's shards
236+
for record in discoveredRecords:
237+
let typedRecord = record.toTyped().valueOr:
238+
# If we can't parse the record, skip it
239+
waku_discv5_errors.inc(labelValues = ["ParseFailure"])
240+
continue
241+
242+
let relayShards = typedRecord.relaySharding().valueOr:
243+
# If no relay sharding info, skip it
244+
waku_discv5_errors.inc(labelValues = ["NoShardInfo"])
245+
continue
246+
247+
for shardId in relayShards.shardIds:
248+
waku_discv5_discovered_per_shard.inc(labelValues = [$shardId])
235249

236250
return discoveredRecords
237251

waku/node/peer_manager/peer_manager.nim

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
3434
declarePublicGauge waku_connected_peers,
3535
"Number of physical connections per direction and protocol",
3636
labels = ["direction", "protocol"]
37+
declarePublicGauge waku_connected_peers_per_shard,
38+
"Number of physical connections per shard", labels = ["shard"]
3739
declarePublicGauge waku_streams_peers,
3840
"Number of streams per direction and protocol", labels = ["direction", "protocol"]
3941
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
@@ -778,6 +780,16 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
778780
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
779781
)
780782

783+
for shard in pm.wakuMetadata.shards.items:
784+
waku_connected_peers_per_shard.set(0.0, labelValues = [$shard])
785+
786+
for shard in pm.wakuMetadata.shards.items:
787+
let connectedPeers =
788+
peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard))
789+
waku_connected_peers_per_shard.set(
790+
connectedPeers.len.float64, labelValues = [$shard]
791+
)
792+
781793
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
782794
return proc(online: bool) {.gcsafe, raises: [].} =
783795
pm.online = online

waku/waku_archive/archive.nim

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ proc handleMessage*(
119119
let insertDuration = getTime().toUnixFloat() - insertStartTime
120120
waku_archive_insert_duration_seconds.observe(insertDuration)
121121

122+
let shard = RelayShard.parseStaticSharding(pubsubTopic).valueOr:
123+
DefaultRelayShard
124+
125+
waku_archive_messages_per_shard.inc(labelValues = [$shard.shardId])
126+
122127
trace "message archived",
123128
msg_hash = msgHashHex,
124129
pubsubTopic = pubsubTopic,

waku/waku_archive/archive_metrics.nim

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import metrics
44

55
declarePublicGauge waku_archive_messages, "number of historical messages", ["type"]
6+
declarePublicGauge waku_archive_messages_per_shard,
7+
"number of historical messages per shard ", ["shard"]
68
declarePublicGauge waku_archive_errors, "number of store protocol errors", ["type"]
79
declarePublicGauge waku_archive_queries, "number of store queries received"
810
declarePublicHistogram waku_archive_insert_duration_seconds,

waku/waku_relay/protocol.nim

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,31 @@ import
2323
from ../waku_core/codecs import WakuRelayCodec
2424
export WakuRelayCodec
2525

26+
type ShardMetrics = object
27+
count: float64
28+
sizeSum: float64
29+
avgSize: float64
30+
maxSize: float64
31+
2632
logScope:
2733
topics = "waku relay"
2834

35+
declareCounter waku_relay_network_bytes,
36+
"total traffic per topic, distinct gross/net and direction",
37+
labels = ["topic", "type", "direction"]
38+
39+
declarePublicGauge(
40+
waku_relay_max_msg_bytes_per_shard,
41+
"Maximum length of messages seen per shard",
42+
labels = ["shard"],
43+
)
44+
45+
declarePublicGauge(
46+
waku_relay_avg_msg_bytes_per_shard,
47+
"Average length of messages seen per shard",
48+
labels = ["shard"],
49+
)
50+
2951
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
3052
const TopicParameters = TopicParams(
3153
topicWeight: 1,
@@ -58,10 +80,6 @@ const TopicParameters = TopicParams(
5880
invalidMessageDeliveriesDecay: 0.5,
5981
)
6082

61-
declareCounter waku_relay_network_bytes,
62-
"total traffic per topic, distinct gross/net and direction",
63-
labels = ["topic", "type", "direction"]
64-
6583
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
6684
const GossipsubParameters = GossipSubParams.init(
6785
pruneBackoff = chronos.minutes(1),
@@ -137,6 +155,7 @@ type
137155
topicsHealth*: Table[string, TopicHealth]
138156
onTopicHealthChange*: TopicHealthChangeHandler
139157
topicHealthLoopHandle*: Future[void]
158+
msgMetricsPerShard*: Table[string, ShardMetrics]
140159

141160
# predefinition for more detailed results from publishing new message
142161
type PublishOutcome* {.pure.} = enum
@@ -176,6 +195,7 @@ proc logMessageInfo*(
176195
onRecv: bool,
177196
) =
178197
let msg_hash = computeMessageHash(topic, msg).to0xHex()
198+
let payloadSize = float64(msg.payload.len)
179199

180200
if onRecv:
181201
notice "received relay message",
@@ -185,7 +205,7 @@ proc logMessageInfo*(
185205
from_peer_id = remotePeerId,
186206
topic = topic,
187207
receivedTime = getNowInNanosecondTime(),
188-
payloadSizeBytes = msg.payload.len
208+
payloadSizeBytes = payloadSize
189209
else:
190210
notice "sent relay message",
191211
my_peer_id = w.switch.peerInfo.peerId,
@@ -194,7 +214,19 @@ proc logMessageInfo*(
194214
to_peer_id = remotePeerId,
195215
topic = topic,
196216
sentTime = getNowInNanosecondTime(),
197-
payloadSizeBytes = msg.payload.len
217+
payloadSizeBytes = payloadSize
218+
219+
var shardMetrics = w.msgMetricsPerShard.getOrDefault(topic, ShardMetrics())
220+
shardMetrics.count += 1
221+
shardMetrics.sizeSum += payloadSize
222+
if payloadSize > shardMetrics.maxSize:
223+
shardMetrics.maxSize = payloadSize
224+
shardMetrics.avgSize = shardMetrics.sizeSum / shardMetrics.count
225+
w.msgMetricsPerShard[topic] = shardMetrics
226+
227+
waku_relay_max_msg_bytes_per_shard.set(shardMetrics.maxSize, labelValues = [topic])
228+
229+
waku_relay_avg_msg_bytes_per_shard.set(shardMetrics.avgSize, labelValues = [topic])
198230

199231
proc initRelayObservers(w: WakuRelay) =
200232
proc decodeRpcMessageInfo(

waku/waku_rln_relay/protocol_metrics.nim

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ func generateBucketsForHistogram*(length: int): seq[float64] =
1717
return buckets
1818

1919
declarePublicCounter(
20-
waku_rln_messages_total, "number of messages published on the rln content topic"
20+
waku_rln_messages_total, "number of messages seen by the rln relay"
2121
)
22+
2223
declarePublicCounter(waku_rln_spam_messages_total, "number of spam messages detected")
2324
declarePublicCounter(
2425
waku_rln_invalid_messages_total, "number of invalid messages detected", ["type"]

waku/waku_store/client.nim

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const DefaultPageSize*: uint = 20
1313
type WakuStoreClient* = ref object
1414
peerManager: PeerManager
1515
rng: ref rand.HmacDrbgContext
16+
storeMsgMetricsPerShard*: Table[string, float64]
1617

1718
proc new*(
1819
T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
@@ -46,6 +47,17 @@ proc sendStoreRequest(
4647
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
4748
return err(StoreError.new(res.statusCode, res.statusDesc))
4849

50+
if req.pubsubTopic.isSome():
51+
let topic = req.pubsubTopic.get()
52+
if not self.storeMsgMetricsPerShard.hasKey(topic):
53+
self.storeMsgMetricsPerShard[topic] = 0
54+
self.storeMsgMetricsPerShard[topic] += float64(req.encode().buffer.len)
55+
56+
waku_relay_fleet_store_msg_size_bytes.inc(
57+
self.storeMsgMetricsPerShard[topic], labelValues = [topic]
58+
)
59+
waku_relay_fleet_store_msg_count.inc(1.0, labelValues = [topic])
60+
4961
return ok(res)
5062

5163
proc query*(

waku/waku_store/protocol_metrics.nim

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ declarePublicGauge waku_store_queries, "number of store queries received"
1010
declarePublicGauge waku_store_time_seconds,
1111
"Time in seconds spent by each store phase", labels = ["phase"]
1212

13+
declarePublicGauge(
14+
waku_relay_fleet_store_msg_size_bytes,
15+
"Total size of messages stored by fleet store nodes per shard",
16+
labels = ["shard"],
17+
)
18+
19+
declarePublicGauge(
20+
waku_relay_fleet_store_msg_count,
21+
"Number of messages stored by fleet store nodes per shard",
22+
labels = ["shard"],
23+
)
24+
1325
# Error types (metric label values)
1426
const
1527
DialFailure* = "dial_failure"

0 commit comments

Comments
 (0)