Skip to content

Commit

Permalink
chore: mics. improvements to cluster id and shards setup (#2187)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Nov 21, 2023
1 parent 51f3609 commit 897f487
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 260 deletions.
69 changes: 51 additions & 18 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
quit(QuitFailure)
else: recordRes.get()

# Check the ENR sharding info for matching config cluster id
if conf.clusterId != 0:
let res = record.toTyped()
if res.isErr():
error "ENR setup failed", error = $res.get()
quit(QuitFailure)

let relayShard = res.get().relaySharding().valueOr:
error "no sharding info"
quit(QuitFailure)

if conf.clusterId != relayShard.clusterId:
error "cluster id mismatch"
quit(QuitFailure)

App(
version: git_version,
conf: conf,
Expand Down Expand Up @@ -234,7 +249,13 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)

WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
WakuDiscoveryV5.new(
app.rng,
discv5Conf,
some(app.record),
some(app.node.peerManager),
app.node.topicSubscriptionQueue,
)

## Init waku node instance

Expand Down Expand Up @@ -286,18 +307,17 @@ proc initNode(conf: WakuNodeConf,
ok(node)

proc setupWakuApp*(app: var App): AppResult[void] =

## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())

## Waku node
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
if initNodeRes.isErr():
return err("failed to init node: " & initNodeRes.error)

app.node = initNodeRes.get()

## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())

ok()

proc getPorts(listenAddrs: seq[MultiAddress]):
Expand Down Expand Up @@ -341,7 +361,17 @@ proc updateNetConfig(app: var App): AppResult[void] =
proc updateEnr(app: var App): AppResult[void] =

let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
return err(error)
return err("ENR setup failed: " & error)

if app.conf.clusterId != 0:
let tRecord = record.toTyped().valueOr:
return err("ENR setup failed: " & $error)

let relayShard = tRecord.relaySharding().valueOr:
return err("ENR setup failed: no sharding info")

if app.conf.clusterId != relayShard.clusterId:
return err("ENR setup failed: cluster id mismatch")

app.record = record
app.node.enr = record
Expand Down Expand Up @@ -377,6 +407,9 @@ proc setupProtocols(node: WakuNode,
## Optionally include persistent message storage.
## No protocols are started yet.

node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)

# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
Expand Down Expand Up @@ -587,25 +620,25 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,

proc startApp*(app: var App): AppResult[void] =

try:
(waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes)).isOkOr:
return err(error)
except CatchableError:
return err("exception starting node: " & getCurrentExceptionMsg())
let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes))
if nodeRes.isErr():
return err("exception starting node: " & nodeRes.error.msg)

nodeRes.get().isOkOr:
return err("exception starting node: " & error)

# Update app data that is set dynamically on node start
app.updateApp().isOkOr:
return err("Error in updateApp: " & $error)

if app.wakuDiscv5.isSome():
let wakuDiscv5 = app.wakuDiscv5.get()
let catchRes = catch: (waitFor wakuDiscv5.start())
let startRes = catchRes.valueOr:
return err("failed to start waku discovery v5: " & catchRes.error.msg)

let res = wakuDiscv5.start()
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)
startRes.isOkOr:
return err("failed to start waku discovery v5: " & error)

return ok()

Expand Down
33 changes: 17 additions & 16 deletions examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,32 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)

let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)

# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapEnrs = @[bootstrapNodeEnr],
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
rng = node.rng,
topics = @[],
)
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)

await node.start()
await node.mountRelay()
node.peerManager.start()

let discv5Res = wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error= discv5Res.error
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
Expand Down
33 changes: 17 additions & 16 deletions examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,32 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)

let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)

# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapEnrs = @[bootstrapNodeEnr],
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
rng = node.rng,
topics = @[],
)
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)

await node.start()
await node.mountRelay()
node.peerManager.start()

let discv5Res = wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error = discv5Res.error
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
Expand Down
32 changes: 27 additions & 5 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,38 @@ procSuite "Peer Manager":
await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "Peer manager drops conections to peers on different networks":
let clusterId1 = 1.uint32
let clusterId2 = 2.uint32
let clusterId3 = 3.uint32
let clusterId4 = 4.uint32

let
# different network
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId1)
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId3,
topics = @["/waku/2/rs/3/0"],
)

# same network
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node3 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0), clusterId = clusterId2)
node2 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
Port(0),
clusterId = clusterId4,
topics = @["/waku/2/rs/4/0"],
)

discard node1.mountMetadata(clusterId3)
discard node2.mountMetadata(clusterId4)
discard node3.mountMetadata(clusterId4)

# Start nodes
await allFutures([node1.start(), node2.start(), node3.start()])
Expand Down
47 changes: 26 additions & 21 deletions tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,26 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
builder.build().tryGet()


proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
proc newTestDiscv5(
privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record](),
queue = newAsyncEventQueue[SubscriptionEvent](30),
): WakuDiscoveryV5 =
let config = WakuDiscoveryV5Config(
privateKey: eth_keys.PrivateKey(privKey.skkey),
address: ValidIpAddress.init(bindIp),
port: Port(udpPort),
bootstrapRecords: bootstrapRecords,
)

let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))
let discv5 = WakuDiscoveryV5.new(
rng = rng(),
conf = config,
record = some(record),
queue = queue,
)

return discv5

Expand Down Expand Up @@ -122,13 +130,13 @@ procSuite "Waku Discovery v5":
bootstrapRecords = @[record1, record2]
)

let res1 = node1.start()
let res1 = await node1.start()
assert res1.isOk(), res1.error

let res2 = node2.start()
let res2 = await node2.start()
assert res2.isOk(), res2.error

let res3 = node3.start()
let res3 = await node3.start()
assert res3.isOk(), res3.error

## When
Expand Down Expand Up @@ -240,16 +248,16 @@ procSuite "Waku Discovery v5":
)

# Start nodes' discoveryV5 protocols
let res1 = node1.start()
let res1 = await node1.start()
assert res1.isOk(), res1.error

let res2 = node2.start()
let res2 = await node2.start()
assert res2.isOk(), res2.error

let res3 = node3.start()
let res3 = await node3.start()
assert res3.isOk(), res3.error

let res4 = node4.start()
let res4 = await node4.start()
assert res4.isOk(), res4.error

## Given
Expand Down Expand Up @@ -401,22 +409,20 @@ procSuite "Waku Discovery v5":
udpPort = udpPort,
)

let queue = newAsyncEventQueue[SubscriptionEvent](30)

let node = newTestDiscv5(
privKey = privKey,
bindIp = bindIp,
tcpPort = tcpPort,
udpPort = udpPort,
record = record
record = record,
queue = queue,
)

let res = node.start()
let res = await node.start()
assert res.isOk(), res.error

let queue = newAsyncEventQueue[SubscriptionEvent](0)

## When
asyncSpawn node.subscriptionsListener(queue)

## Then
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
Expand All @@ -442,14 +448,13 @@ procSuite "Waku Discovery v5":

queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit((kind: PubsubUnsub, topic: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == false
node.protocol.localNode.record.containsShard(shard2) == false
node.protocol.localNode.record.containsShard(shard3) == false
node.protocol.localNode.record.containsShard(shard3) == true

## Cleanup
await node.stop()
Expand Down

0 comments on commit 897f487

Please sign in to comment.