Skip to content

Commit

Permalink
chore(networking): get relay number of connections from protocol conn…
Browse files Browse the repository at this point in the history
…s/streams (#1609)
  • Loading branch information
alrevuelta committed Apr 12, 2023
1 parent b2dcb07 commit 73cbafa
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 14 deletions.
77 changes: 77 additions & 0 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,83 @@ procSuite "Peer Manager":
# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false

asyncTest "getNumConnections() returns expected number of connections per protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# create some connections/streams
require:
# some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterCodec)).isSome() == true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterCodec)).isSome() == true


# assert physical connections
check:
nodes[0].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 2
nodes[0].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 2

nodes[1].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[1].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

nodes[2].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 2
nodes[2].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[2].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 1

nodes[3].peerManager.getNumConnections(Direction.In, WakuRelayCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuRelayCodec) == 0
nodes[3].peerManager.getNumConnections(Direction.In, WakuFilterCodec) == 1
nodes[3].peerManager.getNumConnections(Direction.Out, WakuFilterCodec) == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Start them with relay + filter
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterCodec)).isSome() == true

check:
nodes[0].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[0].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 0
nodes[0].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 4

nodes[1].peerManager.getNumStreams(Direction.In, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.Out, WakuRelayCodec) == 1
nodes[1].peerManager.getNumStreams(Direction.In, WakuFilterCodec) == 4
nodes[1].peerManager.getNumStreams(Direction.Out, WakuFilterCodec) == 0

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
Expand Down
12 changes: 12 additions & 0 deletions waku/common/utils/sequence.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import std/sequtils

proc flatten*[T](a: seq[seq[T]]): seq[T] =
var aFlat = newSeq[T](0)
for subseq in a:
aFlat &= subseq
return aFlat
59 changes: 47 additions & 12 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import
chronos,
chronicles,
metrics,
libp2p/multistream
libp2p/multistream,
libp2p/muxers/muxer
import
../../protocol/waku_relay,
../../utils/peers,
../../utils/heartbeat,
./peer_store/peer_storage,
./waku_peer_store

Expand All @@ -22,7 +24,8 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
# TODO: Populate from PeerStore.Source when ready
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction", ["direction"]
declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
declarePublicGauge waku_service_peers, "Service peer protocol and multiaddress ", labels = ["protocol", "peerId"]

Expand Down Expand Up @@ -51,6 +54,9 @@ const
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)

# How often the peer store is updated with metrics
UpdateMetricsInterval = chronos.seconds(15)

type
PeerManager* = ref object of RootObj
switch*: Switch
Expand Down Expand Up @@ -252,7 +258,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
let direction = if event.initiator: Outbound else: Inbound
pm.peerStore[ConnectionBook][peerId] = Connected
pm.peerStore[DirectionBook][peerId] = direction
waku_connected_peers.inc(1, labelValues=[$direction])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
Expand All @@ -261,7 +266,6 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
elif event.kind == PeerEventKind.Left:
pm.peerStore[DirectionBook][peerId] = UnknownDirection
pm.peerStore[ConnectionBook][peerId] = CanConnect
waku_connected_peers.dec(1, labelValues=[$pm.peerStore[DirectionBook][peerId]])

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
Expand Down Expand Up @@ -430,27 +434,48 @@ proc connectToNodes*(pm: PeerManager,
# later.
await sleepAsync(chronos.seconds(5))

# Returns the amount of physical connections for a given direction
# containing at least one stream with the given protocol.
proc getNumConnections*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if peerConn.connection.transportDir == dir:
if streams.anyIt(it.protocol == protocol):
numConns += 1
return numConns

proc getNumStreams*(pm: PeerManager, dir: Direction, protocol: string): int =
var numConns = 0
for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
for stream in peerConn.getStreams():
if stream.protocol == protocol and stream.dir == dir:
numConns += 1
return numConns

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let maxConnections = pm.switch.connManager.inSema.size
let numInPeers = pm.switch.connectedPeers(lpstream.Direction.In).len
let numOutPeers = pm.switch.connectedPeers(lpstream.Direction.Out).len
let numConPeers = numInPeers + numOutPeers

# TODO: Enforce a given in/out peers ratio
let inRelayPeers = pm.getNumConnections(Direction.In, WakuRelayCodec)
let outRelayPeers = pm.getNumConnections(Direction.Out, WakuRelayCodec)
let totalRelayPeers = inRelayPeers + outRelayPeers

# Leave some room for service peers
if numConPeers >= (maxConnections - 5):
if totalRelayPeers >= (maxConnections - 5):
return

# TODO: Track only relay connections (nwaku/issues/1566)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)

info "Relay peer connections",
connectedPeers = numConPeers,
inRelayConns = inRelayPeers,
outRelayConns = outRelayPeers,
totalRelayConns = totalRelayPeers,
targetConnectedPeers = maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
Expand Down Expand Up @@ -532,8 +557,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await pm.connectToRelayPeers()
await sleepAsync(ConnectivityLoopInterval)

proc updateMetrics(pm: PeerManager) {.async.} =
heartbeat "Scheduling updateMetrics run", UpdateMetricsInterval:
for dir in @[Direction.In, Direction.Out]:
for proto in pm.peerStore.getWakuProtos():
let protoDirConns = pm.getNumConnections(dir, proto)
let protoDirStreams = pm.getNumStreams(dir, proto)
waku_connected_peers.set(protoDirConns.float64, labelValues = [$dir, proto])
waku_streams_peers.set(protoDirStreams.float64, labelValues = [$dir, proto])

proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.updateMetrics()
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()

Expand Down
13 changes: 11 additions & 2 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ else:
{.push raises: [].}

import
std/[tables, sequtils, sets, options, times, math],
std/[tables, sequtils, sets, options, times, math, strutils],
chronos,
eth/p2p/discoveryv5/enr,
libp2p/builders,
libp2p/peerstore

import
../../utils/peers
../../utils/peers,
../../../common/utils/sequence

export peerstore, builders

Expand Down Expand Up @@ -90,6 +91,14 @@ proc get*(peerStore: PeerStore,
numberFailedConn: peerStore[NumberFailedConnBook][peerId]
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
## Get the waku protocols of all the stored peers.
let wakuProtocols = toSeq(peerStore[ProtoBook].book.values())
.flatten()
.deduplicate()
.filterIt(it.startsWith("/vac/waku"))
return wakuProtocols

# TODO: Rename peers() to getPeersByProtocol()
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
## Get all the stored information of every peer.
Expand Down
29 changes: 29 additions & 0 deletions waku/v2/utils/heartbeat.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import sequtils
import chronos, chronicles

# Taken from: https://github.com/status-im/nim-libp2p/blob/master/libp2p/utils/heartbeat.nim

template heartbeat*(name: string, interval: Duration, body: untyped): untyped =
var nextHeartbeat = Moment.now()
while true:
body

nextHeartbeat += interval
let now = Moment.now()
if nextHeartbeat < now:
let
delay = now - nextHeartbeat
itv = interval
if delay > itv:
info "Missed multiple heartbeats", heartbeat = name,
delay = delay, hinterval = itv
else:
debug "Missed heartbeat", heartbeat = name,
delay = delay, hinterval = itv
nextHeartbeat = now + itv
await sleepAsync(nextHeartbeat - now)

0 comments on commit 73cbafa

Please sign in to comment.