Skip to content

Commit

Permalink
feat: shard aware relay peer management (#2332)
Browse files Browse the repository at this point in the history
note that this feature is behind a config flag. `--relay-shard-manager`
  • Loading branch information
SionoiS committed Jan 30, 2024
1 parent e04e35e commit edca1df
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 198 deletions.
6 changes: 2 additions & 4 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import std/[strformat, strutils, times, json, options, random]
import std/[strformat, strutils, times, options, random]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, results],
nimcrypto/pbkdf2,
metrics,
metrics/chronos_httpserver
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
Expand All @@ -22,11 +21,10 @@ import libp2p/[switch, # manage transports, a single entry poi
peerinfo, # manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
nameresolving/dnsresolver]# define DNS resolution
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
Expand Down
6 changes: 4 additions & 2 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
Expand Down Expand Up @@ -294,7 +294,9 @@ proc initNode(conf: WakuNodeConf,
agentString = some(conf.agentString)
)
builder.withColocationLimit(conf.colocationLimit)
builder.withPeerManagerConfig(maxRelayPeers = conf.maxRelayPeers)
builder.withPeerManagerConfig(
maxRelayPeers = conf.maxRelayPeers,
shardAware = conf.relayShardedPeerManagement,)

node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

Expand Down
5 changes: 5 additions & 0 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ type
defaultValue: false
name: "relay-peer-exchange" }: bool

relayShardedPeerManagement* {.
desc: "Enable experimental shard aware peer manager for relay protocol: true|false",
defaultValue: false
name: "relay-shard-manager" }: bool

rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
defaultValue: false
Expand Down
206 changes: 186 additions & 20 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, sequtils, times],
std/[options, sequtils, times, sugar],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -21,10 +21,12 @@ import
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_node,
../../waku/waku_relay,
../../waku/waku_store,
../../waku/waku_filter,
../../waku/waku_lightpush,
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
./testlib/common,
Expand Down Expand Up @@ -129,7 +131,6 @@ procSuite "Peer Manager":

await node.stop()


asyncTest "Peer manager keeps track of connections":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
Expand Down Expand Up @@ -226,18 +227,34 @@ procSuite "Peer Manager":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

require:
(await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true
let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"

check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs

# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))

check:
Expand All @@ -247,19 +264,101 @@ procSuite "Peer Manager":
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")

await node3.start()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()

await node3.peerManager.connectToRelayPeers()

await sleepAsync(chronos.milliseconds(500))

check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "Sharded peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023))

node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

await node1.start()
await node2.start()

await node1.mountRelay()
await node2.mountRelay()

let peerInfo2 = node2.switch.peerInfo
var remotePeerInfo2 = peerInfo2.toRemotePeerInfo()
remotePeerInfo2.enr = some(node2.enr)

let is12Connected = await node1.peerManager.connectRelay(remotePeerInfo2)
assert is12Connected == true, "Node 1 and 2 not connected"

check:
node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs

# wait for the peer store update
await sleepAsync(chronos.milliseconds(500))

check:
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
let node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(56037),
peerStorage = storage
)

node3.mountMetadata(0).expect("Mounted Waku Metadata")

await node3.start()

check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.mountRelay()

await node3.peerManager.manageRelayPeers()

await sleepAsync(chronos.milliseconds(500))

check:
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
Expand Down Expand Up @@ -298,9 +397,9 @@ procSuite "Peer Manager":
topics = @["/waku/2/rs/4/0"],
)

discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)
node1.mountMetadata(clusterId3).expect("Mounted Waku Metadata")
node2.mountMetadata(clusterId4).expect("Mounted Waku Metadata")
node3.mountMetadata(clusterId4).expect("Mounted Waku Metadata")

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand All @@ -319,7 +418,6 @@ procSuite "Peer Manager":
conn2.isNone
conn3.isSome


# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
Expand Down Expand Up @@ -378,14 +476,28 @@ procSuite "Peer Manager":

asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)

# Start them
await allFutures(nodes.mapIt(it.start()))
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))

# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
Expand Down Expand Up @@ -416,6 +528,60 @@ procSuite "Peer Manager":

await allFutures(nodes.mapIt(it.stop()))

asyncTest "Sharded peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
let nodes =
toSeq(0..<4)
.mapIt(
newTestWakuNode(
nodeKey = generateSecp256k1Key(),
bindIp = ValidIpAddress.init("0.0.0.0"),
bindPort = Port(0),
wakuFlags = some(CapabilitiesBitfield.init(@[Relay]))
)
)

# Start them
discard nodes.mapIt(it.mountMetadata(0))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.start()))

# Get all peer infos
let peerInfos = collect:
for i in 0..nodes.high:
let peerInfo = nodes[i].switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(nodes[i].enr)
peerInfo

# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1])
nodes[0].peerManager.addPeer(peerInfos[2])
nodes[0].peerManager.addPeer(peerInfos[3])

# Connect to relay peers
await nodes[0].peerManager.manageRelayPeers()

check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3

# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)

# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)

# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected

await allFutures(nodes.mapIt(it.stop()))

asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
let nodes = toSeq(0..<4).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
Expand Down
1 change: 1 addition & 0 deletions tests/test_waku_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/client,
../../waku/waku_lightpush/protocol_metrics,
../../waku/waku_lightpush/rpc,
Expand Down
8 changes: 2 additions & 6 deletions tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ import
std/options,
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/switch
chronos
import
../../waku/waku_core,
../../waku/waku_lightpush,
../../waku/waku_lightpush/common,
../../waku/node/peer_manager,
../../waku/waku_node,
./testlib/common,
./testlib/wakucore,
./testlib/wakunode

Expand Down

0 comments on commit edca1df

Please sign in to comment.