Skip to content

Commit

Permalink
chore(networking): disconnect due to colocation ip in conn handler (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed Jun 28, 2023
1 parent add294a commit e12c979
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 62 deletions.
25 changes: 12 additions & 13 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -743,29 +743,28 @@ procSuite "Peer Manager":

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1

# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])

# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
nodes[0].peerManager.updateIpTable()
# they are also prunned 
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# table is updated and we have 4 conns (2in 2out)
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
nodes[0].peerManager.switch.connManager.getConnections().len == 4
nodes[0].peerManager.peerStore.peers().len == 4

await nodes[0].peerManager.pruneConnsByIp()

# peers are pruned, max 1 conn per ip
nodes[0].peerManager.updateIpTable()
# we should have 4 peers (2in/2out) but due to collocation limit
# they are pruned to max 1
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1

await allFutures(nodes.mapIt(it.stop()))
83 changes: 34 additions & 49 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ const
# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)

# Prune by ip interval
PruneByIpInterval = chronos.seconds(30)

# Max peers that we allow from the same IP
ColocationLimit = 5

Expand Down Expand Up @@ -285,6 +282,18 @@ proc canBeConnected*(pm: PeerManager,
# Initialisation #
##################

proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if pm.switch.connManager.getConnections().hasKey(peerId):
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len != 0:
let observedAddr = conns[0].connection.observedAddr
let ip = observedAddr.get.getHostname()
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
return some(ip)
return none(string)

# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
Expand All @@ -302,36 +311,36 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

let peersBehindIp = pm.ipTable[ip.get]
if peersBehindIp.len > pm.colocationLimit:
# in theory this should always be one, but just in case
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect

# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break

pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)

proc updateIpTable*(pm: PeerManager) =
# clean table
pm.ipTable = initTable[string, seq[PeerId]]()

# populate ip->peerIds from existing out/in connections
for peerId, conn in pm.switch.connManager.getConnections():
if conn.len == 0:
continue

# we may want to enable it only in inbound peers
#if conn[0].connection.transportDir != In:
# continue

# assumes just one physical connection per peer
let observedAddr = conn[0].connection.observedAddr
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)


proc new*(T: type PeerManager,
switch: Switch,
maxRelayPeers: int = 50,
Expand Down Expand Up @@ -556,24 +565,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
let connsToPrune = min(amount, inRelayPeers.len)

for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)

proc pruneConnsByIp*(pm: PeerManager) {.async.} =
## prunes connections based on ip colocation, allowing no more
## than ColocationLimit inbound connections from same ip
##

# update the table tracking ip and the connected peers
pm.updateIpTable()

# trigger disconnections based on colocationLimit
for ip, peersInIp in pm.ipTable.pairs:
if peersInIp.len > pm.colocationLimit:
let connsToPrune = peersInIp.len - pm.colocationLimit
for peerId in peersInIp[0..<connsToPrune]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
await pm.switch.disconnect(peerId)
pm.peerStore.delete(peerId)
asyncSpawn(pm.switch.disconnect(p))

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
Expand Down Expand Up @@ -658,12 +650,6 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
debug "Starting prune peer by ip loop"
while pm.started:
await pm.pruneConnsByIp()
await sleepAsync(PruneByIpInterval)

# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
Expand Down Expand Up @@ -709,7 +695,6 @@ proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.pruneConnsByIpLoop()
asyncSpawn pm.logAndMetrics()

proc stop*(pm: PeerManager) =
Expand Down

0 comments on commit e12c979

Please sign in to comment.