Skip to content

Commit

Permalink
feat(discv5): topic subscriptions update discv5 filter predicate (#1918)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Aug 23, 2023
1 parent c369b32 commit 4539dfc
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 43 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)

return await startNode(
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error= discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error = discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ procSuite "Waku Peer Exchange":
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record))
asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record))
asyncSpawn disc1.searchLoop(node1.peerManager)
asyncSpawn disc2.searchLoop(node2.peerManager)

## When
var attempts = 10
Expand Down
86 changes: 48 additions & 38 deletions waku/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ type WakuDiscoveryV5* = ref object
conf: WakuDiscoveryV5Config
protocol*: protocol.Protocol
listening*: bool
predicate: Option[WakuDiscv5Predicate]

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering updated"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
let protocol = newProtocol(
Expand All @@ -64,7 +89,13 @@ proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscovery
enrUdpPort = none(Port),
)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false)
let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp)

proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
Expand Down Expand Up @@ -195,57 +226,29 @@ proc updateENRShards(wd: WakuDiscoveryV5,

return ok()

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering enabled"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
## Find random peers to connect to using Discovery v5
let discoveredNodes = await wd.protocol.queryRandom()

var discoveredRecords = discoveredNodes.mapIt(it.record)

# Filter out nodes that do not match the predicate
if pred.isSome():
discoveredRecords = discoveredRecords.filter(pred.get())
if overridePred.isSome():
discoveredRecords = discoveredRecords.filter(overridePred.get())
elif wd.predicate.isSome():
discoveredRecords = discoveredRecords.filter(wd.predicate.get())

return discoveredRecords

#TODO abstract away PeerManager
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} =
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
## Continuously add newly discovered nodes

info "Starting discovery v5 search"

let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

while wd.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await wd.findRandomPeers(shardPredOp)
let discoveredRecords = await wd.findRandomPeers()
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)

for peer in discoveredPeers:
Expand Down Expand Up @@ -305,6 +308,9 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub)
let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub)

if subs.len == 0 and unsubs.len == 0:
continue

let unsubRes = wd.updateENRShards(unsubs, false)
let subRes = wd.updateENRShards(subs, true)

Expand All @@ -314,8 +320,12 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
if unsubRes.isErr():
debug "ENR shard removal failed", reason= $unsubRes.error

if subRes.isOk() and unsubRes.isOk():
debug "ENR updated successfully"
if subRes.isErr() and unsubRes.isErr():
continue

debug "ENR updated successfully"

wd.predicate = shardingPredicate(wd.protocol.localNode.record)

topicSubscriptionQueue.unregister(key)

Expand Down

0 comments on commit 4539dfc

Please sign in to comment.