Skip to content

Commit

Permalink
chore: move discv5 out of node. (#1818)
Browse files Browse the repository at this point in the history
- Refactor discv5 start, stop & loop.
- Fix tests.
  • Loading branch information
SionoiS committed Jun 27, 2023
1 parent 52894a8 commit 62d3653
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 228 deletions.
93 changes: 51 additions & 42 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type
key: crypto.PrivateKey
record: Record

wakuDiscv5: Option[WakuDiscoveryV5]
peerStore: Option[WakuPeerStorage]
dynamicBootstrapNodes: seq[RemotePeerInfo]

Expand Down Expand Up @@ -196,6 +197,37 @@ proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =

ok()

## Setup DiscoveryV5

proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
let dynamicBootstrapEnrs = app.dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get())

var discv5BootstrapEnrs: seq[enr.Record]

# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in app.conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)

discv5BootstrapEnrs.add(dynamicBootstrapEnrs)

let discv5Config = DiscoveryConfig.init(app.conf.discv5TableIpLimit,
app.conf.discv5BucketIpLimit,
app.conf.discv5BitsPerHop)

let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift)

let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: app.netConf.bindIp,
port: discv5UdpPort,
privateKey: keys.PrivateKey(app.key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)

WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))

## Init waku node instance

Expand Down Expand Up @@ -225,39 +257,6 @@ proc initNode(conf: WakuNodeConf,
let pStorage = if peerStore.isNone(): nil
else: peerStore.get()

var wakuDiscv5 = none(WakuDiscoveryV5)

if conf.discv5Discovery:
let dynamicBootstrapEnrs = dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit,
conf.discv5BucketIpLimit,
conf.discv5BitsPerHop)
try:
wakuDiscv5 = some(WakuDiscoveryV5.new(
extIp = netConfig.extIp,
extTcpPort = netConfig.extPort,
extUdpPort = netConfig.discv5UdpPort,
bindIp = netConfig.bindIp,
discv5UdpPort = netConfig.discv5UdpPort.get(),
bootstrapEnrs = discv5BootstrapEnrs,
enrAutoUpdate = conf.discv5EnrAutoUpdate,
privateKey = keys.PrivateKey(nodekey.skkey),
flags = netConfig.wakuFlags.get(),
multiaddrs = netConfig.enrMultiaddrs,
rng = rng,
conf.topics,
discv5Config = discv5Config
))
except CatchableError:
return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg())

# Build waku node instance
var builder = WakuNodeBuilder.init()
builder.withRng(rng)
Expand All @@ -273,20 +272,25 @@ proc initNode(conf: WakuNodeConf,
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString)
)
builder.withWakuDiscv5(wakuDiscv5.get(nil))
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))

node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)

ok(node)

proc setupWakuNode*(app: var App): AppResult[void] =
proc setupWakuApp*(app: var App): AppResult[void] =

## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())

## Waku node
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
if initNodeRes.isErr():
return err("failed to init node: " & initNodeRes.error)

app.node = initNodeRes.get()

ok()


Expand Down Expand Up @@ -466,12 +470,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
except CatchableError:
return err("failed to start waku node: " & getCurrentExceptionMsg())

# Start discv5 based discovery service (discovery loop)
if conf.discv5Discovery:
let startDiscv5Res = await node.startDiscv5()
if startDiscv5Res.isErr():
return err("failed to start waku discovery v5: " & startDiscv5Res.error)

# Connect to configured static nodes
if conf.staticnodes.len > 0:
try:
Expand Down Expand Up @@ -501,7 +499,15 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,

return ok()

proc startNode*(app: App): Future[AppResult[void]] {.async.} =
proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if app.wakuDiscv5.isSome():
let res = await app.wakuDiscv5.get().start()

if res.isErr():
return err("failed to start waku discovery v5: " & res.error)

asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record))

return await startNode(
app.node,
app.conf,
Expand Down Expand Up @@ -624,5 +630,8 @@ proc stop*(app: App): Future[void] {.async.} =
if app.metricsServer.isSome():
await app.metricsServer.get().stop()

if app.wakuDiscv5.isSome():
await app.wakuDiscv5.get().stop()

if not app.node.isNil():
await app.node.stop()
4 changes: 2 additions & 2 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ when isMainModule:

debug "3/7 Initializing node"

let res4 = wakunode2.setupWakuNode()
let res4 = wakunode2.setupWakuApp()
if res4.isErr():
error "3/7 Initializing node failed", error=res4.error
quit(QuitFailure)
Expand All @@ -87,7 +87,7 @@ when isMainModule:

debug "5/7 Starting node and mounted protocols"

let res6 = waitFor wakunode2.startNode()
let res6 = waitFor wakunode2.startApp()
if res6.isErr():
error "5/7 Starting node and protocols failed", error=res6.error
quit(QuitFailure)
Expand Down
6 changes: 4 additions & 2 deletions examples/v2/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
discard bootstrapNodeEnr.fromURI(bootstrapNode)

# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
Expand All @@ -69,11 +69,13 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
await node.mountRelay()
node.peerManager.start()

let discv5Res = await node.startDiscv5()
let discv5Res = await wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error= discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
Expand Down
6 changes: 4 additions & 2 deletions examples/v2/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
discard bootstrapNodeEnr.fromURI(bootstrapNode)

# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
Expand All @@ -64,11 +64,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
await node.mountRelay()
node.peerManager.start()

let discv5Res = await node.startDiscv5()
let discv5Res = await wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error = discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
Expand Down
49 changes: 13 additions & 36 deletions tests/v2/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import
libp2p/crypto/crypto as libp2p_keys,
eth/keys as eth_keys
import
../../waku/v2/waku_node,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
./testlib/common,
Expand All @@ -33,27 +32,20 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
builder.build().tryGet()


proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey,
proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record]()): WakuNode =
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
let config = WakuDiscoveryV5Config(
privateKey: eth_keys.PrivateKey(privKey.skkey),
address: ValidIpAddress.init(bindIp),
port: Port(udpPort),
bootstrapRecords: bootstrapRecords,
)

let protocol = WakuDiscoveryV5.new(rng(), config, some(record))
let node = newTestWakuNode(
nodeKey = privKey,
bindIp = ValidIpAddress.init(bindIp),
bindPort = Port(tcpPort),
wakuDiscv5 = some(protocol)
)

return node
let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))

return discv5


procSuite "Waku Discovery v5":
Expand All @@ -73,7 +65,7 @@ procSuite "Waku Discovery v5":
tcpPort = tcpPort1,
udpPort = udpPort1,
)
let node1 = newTestDiscv5Node(
let node1 = newTestDiscv5(
privKey = privKey1,
bindIp = bindIp1,
tcpPort = tcpPort1,
Expand All @@ -96,7 +88,7 @@ procSuite "Waku Discovery v5":
udpPort = udpPort2,
)

let node2 = newTestDiscv5Node(
let node2 = newTestDiscv5(
privKey = privKey2,
bindIp = bindIp2,
tcpPort = tcpPort2,
Expand All @@ -119,7 +111,7 @@ procSuite "Waku Discovery v5":
udpPort = udpPort3,
)

let node3 = newTestDiscv5Node(
let node3 = newTestDiscv5(
privKey = privKey3,
bindIp = bindIp3,
tcpPort = tcpPort3,
Expand All @@ -131,11 +123,7 @@ procSuite "Waku Discovery v5":
await allFutures(node1.start(), node2.start(), node3.start())

## When
# Starting discv5 via `WakuNode.startDiscV5()` starts the discv5 background task.
await allFutures(node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5())

await sleepAsync(5.seconds) # Wait for discv5 discovery loop to run
let res = await node1.wakuDiscv5.findRandomPeers()
let res = await node3.findRandomPeers()

## Then
check:
Expand Down Expand Up @@ -209,15 +197,15 @@ procSuite "Waku Discovery v5":


# Nodes
let node1 = newTestDiscv5Node(
let node1 = newTestDiscv5(
privKey = privKey1,
bindIp = bindIp1,
tcpPort = tcpPort1,
udpPort = udpPort1,
record = record1,
bootstrapRecords = @[record2]
)
let node2 = newTestDiscv5Node(
let node2 = newTestDiscv5(
privKey = privKey2,
bindIp = bindIp2,
tcpPort = tcpPort2,
Expand All @@ -226,15 +214,15 @@ procSuite "Waku Discovery v5":
bootstrapRecords = @[record3, record4]
)

let node3 = newTestDiscv5Node(
let node3 = newTestDiscv5(
privKey = privKey3,
bindIp = bindIp3,
tcpPort = tcpPort3,
udpPort = udpPort3,
record = record3
)

let node4 = newTestDiscv5Node(
let node4 = newTestDiscv5(
privKey = privKey4,
bindIp = bindIp4,
tcpPort = tcpPort4,
Expand All @@ -243,11 +231,6 @@ procSuite "Waku Discovery v5":
)

# Start nodes' discoveryV5 protocols
require node1.wakuDiscV5.start().isOk()
require node2.wakuDiscV5.start().isOk()
require node3.wakuDiscV5.start().isOk()
require node4.wakuDiscV5.start().isOk()

await allFutures(node1.start(), node2.start(), node3.start(), node4.start())

## Given
Expand All @@ -264,13 +247,7 @@ procSuite "Waku Discovery v5":


## When
# # Do a random peer search with a predicate multiple times
# var peers = initHashSet[waku_enr.Record]()
# for i in 0..<10:
# for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate):
# peers.incl(peer)
await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run
let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate))
let peers = await node1.findRandomPeers(some(recordPredicate))

## Then
check:
Expand Down

0 comments on commit 62d3653

Please sign in to comment.