Skip to content

Commit

Permalink
Merge be0e331 into 54b5222
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Aug 13, 2024
2 parents 54b5222 + be0e331 commit 3d5c539
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 13 deletions.
7 changes: 7 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,13 @@ type WakuNodeConf* = object
name: "peer-exchange-node"
.}: string

## Rendez vous
rendezvous* {.
desc: "Enable waku rendezvous discovery server",
defaultValue: false,
name: "rendezvous"
.}: bool

## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
Expand Down
13 changes: 6 additions & 7 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,6 @@ proc setupProtocols(
protectedTopic = topicKey.topic, publicKey = topicKey.key
node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics)

# Enable Rendezvous Discovery protocol when Relay is enabled
try:
await mountRendezvous(node)
except CatchableError:
return
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())

# Keepalive mounted on all nodes
try:
await mountLibp2pPing(node)
Expand Down Expand Up @@ -353,6 +346,12 @@ proc setupProtocols(
return
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)

# Enable Rendezvous Discovery protocol
try:
await mountRendezvous(node, conf.rendezvous)
except CatchableError:
return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())

return ok()

## Start node
Expand Down
22 changes: 16 additions & 6 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/rendezvous,
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
Expand All @@ -39,6 +38,7 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_rendezvous/protocol,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
Expand Down Expand Up @@ -107,7 +107,7 @@ type
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
rendezvous*: RendezVous
wakuRendezvous*: WakuRendezVous
announcedAddresses*: seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
Expand Down Expand Up @@ -1214,19 +1214,20 @@ proc startKeepalive*(node: WakuNode) =

asyncSpawn node.keepaliveLoop(defaultKeepalive)

proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
proc mountRendezvous*(node: WakuNode, enabled: bool) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

node.rendezvous = RendezVous.new(node.switch)
node.wakuRendezvous =
WakuRendezVous.new(node.switch, node.peerManager, node.enr, enabled)

if node.started:
try:
await node.rendezvous.start()
await node.wakuRendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()

try:
node.switch.mount(node.rendezvous)
node.switch.mount(node.wakuRendezvous)
except LPError:
error "failed to mount rendezvous", error = getCurrentExceptionMsg()

Expand Down Expand Up @@ -1286,6 +1287,12 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()

if not node.wakuRendezvous.isNil():
try:
await node.wakuRendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()

## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
Expand Down Expand Up @@ -1324,6 +1331,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stop()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_rendezvous.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import ./waku_rendezvous/protocol

export protocol
28 changes: 28 additions & 0 deletions waku/waku_rendezvous/common.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{.push raises: [].}

import chronos

import ../waku_enr/capabilities

const DiscoverLimit* = 1000
const DefaultRegistrationInterval* = 1.minutes

proc computeNamespace*(clusterId: uint16, shard: uint16): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard

return namespace

proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard
namespace &= '/'
namespace &= $cap

return namespace
224 changes: 224 additions & 0 deletions waku/waku_rendezvous/protocol.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
{.push raises: [].}

import
std/[sugar, options],
results,
chronos,
chronicles,
metrics,
libp2p/protocols/rendezvous,
libp2p/switch,
libp2p/utility

import
../node/peer_manager,
../common/enr,
../waku_enr/capabilities,
../waku_enr/sharding,
../waku_core/peers as peers,
./common

logScope:
topics = "waku rendez vous"

declarePublicCounter peerFound, "number of peers found via rendezvous"

type WakuRendezVous* = ref object of RendezVous
peerManager: PeerManager
relayShard: RelayShards
capabilities: seq[Capabilities]

periodicRegistrationFut: Future[void]

enabled: bool

proc advertise(
self: WakuRendezVous, namespace: string, ttl: Duration = MinimumDuration
): Future[Result[void, string]] {.async.} =
## Register with all rendez vous peers under a namespace

let catchable = catch:
await procCall RendezVous(self).advertise(namespace, ttl)

if catchable.isErr():
return err(catchable.error.msg)

return ok()

proc request(
self: WakuRendezVous, namespace: string, count: int = DiscoverLimit
): Future[Result[seq[PeerRecord], string]] {.async.} =
## Request all records from all rendez vous peers with matching a namespace

let catchable = catch:
await RendezVous(self).request(namespace, count)

if catchable.isErr():
return err(catchable.error.msg)

return ok(catchable.get())

proc requestLocally(self: WakuRendezVous, namespace: string): seq[PeerRecord] =
RendezVous(self).requestLocally(namespace)

proc unsubscribeLocally(self: WakuRendezVous, namespace: string) =
RendezVous(self).unsubscribeLocally(namespace)

proc unsubscribe(
self: WakuRendezVous, namespace: string
): Future[Result[void, string]] {.async.} =
## Unsubscribe from all rendez vous peers including locally

let catchable = catch:
await RendezVous(self).unsubscribe(namespace)

if catchable.isErr():
return err(catchable.error.msg)

return ok()

proc getRelayShards(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()

proc new*(
T: type WakuRendezVous,
switch: Switch,
peerManager: PeerManager,
enr: Record,
enabled: bool,
): T =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
RelayShards(clusterID: 0, shardIds: @[])

let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager,
relayshard: relayshard,
capabilities: capabilities,
enabled: enabled,
)

RendezVous(wrv).setup(switch)

debug "waku rendezvous initialized",
cluster = relayshard.clusterId,
shards = relayshard.shardIds,
capabilities = capabilities

return wrv

proc advertisementNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
for cap in self.capabilities:
computeNamespace(self.relayShard.clusterId, shard, cap)

return namespaces

proc requestNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
for cap in Capabilities:
computeNamespace(self.relayShard.clusterId, shard, cap)

return namespaces

proc shardOnlyNamespaces(self: WakuRendezVous): seq[string] =
let namespaces = collect(newSeq):
for shard in self.relayShard.shardIds:
computeNamespace(self.relayShard.clusterId, shard)

return namespaces

proc advertiseAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()

let futs = collect(newSeq):
for namespace in namespaces:
self.advertise(namespace, 1.minutes)

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to advertise", error = res.error

debug "waku rendezvous advertisements finished", adverCount = handles.len

proc requestAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()

let futs = collect(newSeq):
for namespace in namespaces:
self.request(namespace)

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to request", error = res.error
else:
for peer in res.get():
peerFound.inc()
self.peerManager.addPeer(peer)

debug "waku rendezvous requests finished", requestCount = handles.len

proc unsubcribeAll*(self: WakuRendezVous) {.async.} =
let namespaces = self.shardOnlyNamespaces()

let futs = collect(newSeq):
for namespace in namespaces:
self.unsubscribe(namespace)

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "failed to unsubcribe", error = res.error

debug "waku rendezvous unsubscriptions finished", unsubCount = handles.len

return

proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval

# infinite loop
while true:
# default ttl of registration is the same as default interval
await self.advertiseAll()

await sleepAsync(DefaultRegistrationInterval)

proc start*(self: WakuRendezVous) {.async.} =
await self.requestAll()

if not self.enabled:
return

debug "starting waku rendezvous discovery"

# start registering forever
self.periodicRegistrationFut = self.periodicRegistration()

proc stopWait*(self: WakuRendezVous) {.async.} =
if not self.enabled:
return

debug "stopping waku rendezvous discovery"

if not self.periodicRegistrationFut.isNil():
await self.periodicRegistrationFut.cancelAndWait()

0 comments on commit 3d5c539

Please sign in to comment.