Skip to content

Commit

Permalink
feat: discovery peer filtering for relay shard (#1804)
Browse files Browse the repository at this point in the history
Add discv6 predicate that filter peer by static shard.

Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
  • Loading branch information
SionoiS and jm-clius committed Jun 20, 2023
1 parent 5d4fa3c commit a4da87b
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 6 deletions.
79 changes: 77 additions & 2 deletions tests/v2/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ procSuite "Waku Discovery v5":
await allFutures(node1.start(), node2.start(), node3.start(), node4.start())

## Given
let recordPredicate = proc(record: waku_enr.Record): bool =
let recordPredicate: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool =
let typedRecord = record.toTyped()
if typedRecord.isErr():
return false
Expand All @@ -270,7 +270,7 @@ procSuite "Waku Discovery v5":
# for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate):
# peers.incl(peer)
await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run
let peers = await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate)
let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate))

## Then
check:
Expand Down Expand Up @@ -309,3 +309,78 @@ procSuite "Waku Discovery v5":
assert emptyRes.isOk(), emptyRes.error
assert emptyRes.value.isNone(), $emptyRes.value

asyncTest "filter peer per static shard":
## Given
let recordCluster21 = block:
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()

let
shardCluster: uint16 = 21
shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()

let recordRes = builder.build()
require recordRes.isOk()
recordRes.tryGet()

let recordCluster22Indices1 = block:
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()

let
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]

let shards = RelayShards.init(shardCluster, shardIndices)

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()

let recordRes = builder.build()
require recordRes.isOk()
recordRes.tryGet()

let recordCluster22Indices2 = block:
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()

let
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()

let recordRes = builder.build()
require recordRes.isOk()
recordRes.tryGet()

## When
let predicateCluster21Op = shardingPredicate(recordCluster21)
require predicateCluster21Op.isSome()
let predicateCluster21 = predicateCluster21Op.get()

let predicateCluster22Op = shardingPredicate(recordCluster22Indices1)
require predicateCluster22Op.isSome()
let predicateCluster22 = predicateCluster22Op.get()

## Then
check:
predicateCluster21(recordCluster21) == true
predicateCluster21(recordCluster22Indices1) == false
predicateCluster21(recordCluster22Indices2) == false
predicateCluster22(recordCluster21) == false
predicateCluster22(recordCluster22Indices1) == true
predicateCluster22(recordCluster22Indices2) == false


4 changes: 3 additions & 1 deletion waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -844,9 +844,11 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =

info "starting discv5 discovery loop"

let shardPredOp = shardingPredicate(node.enr)

while node.wakuDiscv5.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await node.wakuDiscv5.findRandomPeers()
let discoveredRecords = await node.wakuDiscv5.findRandomPeers(shardPredOp)
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)

for peer in discoveredPeers:
Expand Down
30 changes: 27 additions & 3 deletions waku/v2/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,39 @@ proc closeWait*(wd: WakuDiscoveryV5) {.async.} =
wd.listening = false
await wd.protocol.closeWait()

proc findRandomPeers*(wd: WakuDiscoveryV5, pred: WakuDiscv5Predicate = nil): Future[seq[waku_enr.Record]] {.async.} =
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering enabled"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
## Find random peers to connect to using Discovery v5
let discoveredNodes = await wd.protocol.queryRandom()

var discoveredRecords = discoveredNodes.mapIt(it.record)

# Filter out nodes that do not match the predicate
if not pred.isNil():
discoveredRecords = discoveredRecords.filter(pred)
if pred.isSome():
discoveredRecords = discoveredRecords.filter(pred.get())

return discoveredRecords

Expand Down

0 comments on commit a4da87b

Please sign in to comment.