Skip to content

Commit

Permalink
feat: limit relay connections below max conns (#1813)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Jul 4, 2023
1 parent 661638d commit 17b24cd
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 45 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ proc initNode(conf: WakuNodeConf,
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString)
)
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)

node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

Expand Down
3 changes: 1 addition & 2 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ type

maxRelayPeers* {.
desc: "Maximum allowed number of relay peers."
defaultValue: 50
name: "max-relay-peers" }: uint16
name: "max-relay-peers" }: Option[int]

peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore."
Expand Down
10 changes: 5 additions & 5 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

# Create 15 peers and add them to the peerstore
Expand Down Expand Up @@ -660,7 +660,7 @@ procSuite "Peer Manager":
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
Expand Down Expand Up @@ -709,7 +709,7 @@ procSuite "Peer Manager":
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxRelayPeers = 5,
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil)

Expand All @@ -721,7 +721,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

let pm = PeerManager.new(
Expand All @@ -730,7 +730,7 @@ procSuite "Peer Manager":
.withMaxConnections(5)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = 5,
maxRelayPeers = some(5),
storage = nil)

asyncTest "colocationLimit is enforced by pruneConnsByIp()":
Expand Down
9 changes: 1 addition & 8 deletions waku/v2/node/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.record.isNone():
return err("node record is required")

# fallbck to max connections if not set
var maxRelayPeers: int
if builder.maxRelayPeers.isNone():
maxRelayPeers = builder.switchMaxConnections.get(builders.MaxConnections)
else:
maxRelayPeers = builder.maxRelayPeers.get()

var switch: Switch
try:
switch = newWakuSwitch(
Expand All @@ -176,7 +169,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = maxRelayPeers,
maxRelayPeers = builder.maxRelayPeers,
)

var node: WakuNode
Expand Down
63 changes: 34 additions & 29 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const
PrunePeerStoreInterval = chronos.minutes(5)

# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)
LogAndMetricsInterval = chronos.minutes(3)

# Max peers that we allow from the same IP
ColocationLimit = 5
Expand All @@ -71,7 +71,8 @@ type
storage: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
outPeersTarget*: int
outRelayPeersTarget: int
inRelayPeersTarget: int
ipTable*: Table[string, seq[PeerId]]
colocationLimit*: int
started: bool
Expand Down Expand Up @@ -343,7 +344,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =

proc new*(T: type PeerManager,
switch: Switch,
maxRelayPeers: int = 50,
maxRelayPeers: Option[int] = none(int),
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
Expand All @@ -358,16 +359,22 @@ proc new*(T: type PeerManager,
maxConnections = maxConnections
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")

if maxRelayPeers > maxConnections:
error "Max number of relay peers can't be greater the max amount of connections",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers
raise newException(Defect, "Max number of relay peers can't be greater the max amount of connections")

if maxRelayPeers == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer wont be contribute to service peers",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers
var maxRelayPeersValue = 0
if maxRelayPeers.isSome():
if maxRelayPeers.get() > maxConnections:
error "Max number of relay peers can't be greater than the max amount of connections",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers.get()
raise newException(Defect, "Max number of relay peers can't be greater than the max amount of connections")

if maxRelayPeers.get() == maxConnections:
warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers",
maxConnections = maxConnections,
maxRelayPeers = maxRelayPeers.get()
maxRelayPeersValue = maxRelayPeers.get()
else:
# Leave by default 20% of connections for service peers
maxRelayPeersValue = maxConnections - (maxConnections div 5)

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
Expand All @@ -376,15 +383,18 @@ proc new*(T: type PeerManager,
maxBackoff=backoff
raise newException(Defect, "Max backoff time can't be over 1 week")

let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)

let pm = PeerManager(switch: switch,
peerStore: switch.peerStore,
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
outPeersTarget: max(maxConnections div 2, 10),
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
maxRelayPeers: maxRelayPeers)
colocationLimit: colocationLimit)

proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
Expand Down Expand Up @@ -571,16 +581,12 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outPeersTarget
let inPeersTarget = maxConnections - pm.outRelayPeersTarget

if inRelayPeers.len > inPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len-inPeersTarget)

if outRelayPeers.len >= pm.outPeersTarget:
return
if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)

# Leave some room for service peers
if totalRelayPeers >= (maxConnections - 5):
if outRelayPeers.len >= pm.outRelayPeersTarget:
return

let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
Expand Down Expand Up @@ -670,15 +676,14 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outPeersTarget
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
let totalConnections = pm.switch.connManager.getConnections().len

info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $inPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outPeersTarget,
totalRelayConns = totalRelayPeers,
maxConnections = maxConnections,
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len

Expand Down

0 comments on commit 17b24cd

Please sign in to comment.