Skip to content

Commit

Permalink
chore: Refactor of FilterV2 subscription management with Time-to-live…
Browse files Browse the repository at this point in the history
… maintenance (#2341)

* Refactor of FilterV2 subscription handling and maintenance with addition subscription time-to-live support.
Fixed all tests and reworked where subscription handling changes needed it.
Adapted REST API /admin filter subscription retrieve to new filter subscription structure.

* Fix tests and PR comments

* Added filter v2 subscription timeout tests and fixed

* Fix review comments and suggestions. No functional change.

* Remove leftover echoes from test_rest_admin

* Fix failed legacy filter tests due to separation of mounting the filters.

* Small fixes, fix naming typo, removed duplicated checks in test
  • Loading branch information
NagyZoltanPeter committed Jan 16, 2024
1 parent 3d816c0 commit c335840
Show file tree
Hide file tree
Showing 23 changed files with 1,123 additions and 545 deletions.
3 changes: 2 additions & 1 deletion apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
let peerInfo = parsePeerInfo(conf.filternode)
if peerInfo.isOk():
await node.mountFilter()
await node.mountLegacyFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)

Expand Down Expand Up @@ -507,7 +508,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "A spam message is found and discarded"
chat.prompt = false
showChatPrompt(chat)

echo "rln-relay preparation is in progress..."

let rlnConf = WakuRlnConfig(
Expand Down
1 change: 1 addition & 0 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ when isMainModule:

if conf.filter:
waitFor mountFilter(bridge.nodev2)
waitFor mountLegacyFilter(bridge.nodev2)

if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
Expand Down
20 changes: 14 additions & 6 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
)

WakuDiscoveryV5.new(
app.rng,
app.rng,
discv5Conf,
some(app.record),
some(app.node.peerManager),
Expand Down Expand Up @@ -326,7 +326,7 @@ proc setupWakuApp*(app: var App): AppResult[void] =
ok()

proc getPorts(listenAddrs: seq[MultiAddress]):
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =

var tcpPort, websocketPort = none(Port)

Expand Down Expand Up @@ -548,7 +548,15 @@ proc setupProtocols(node: WakuNode,
# Filter setup. NOTE Must be mounted after relay
if conf.filter:
try:
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
except CatchableError:
return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg())

try:
await mountFilter(node,
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
maxFilterPeers = conf.filterMaxPeersToServe,
maxFilterCriteriaPerPeer = conf.filterMaxCriteria)
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())

Expand Down Expand Up @@ -724,7 +732,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon

let filterCache = MessageCache.init()

let filterDiscoHandler =
let filterDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
else: none(DiscoveryHandler)
Expand All @@ -739,7 +747,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"

## Store REST API
let storeDiscoHandler =
let storeDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
else: none(DiscoveryHandler)
Expand All @@ -749,7 +757,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
## Light push API
if conf.lightpushnode != "" and
app.node.wakuLightpushClient != nil:
let lightDiscoHandler =
let lightDiscoHandler =
if app.wakuDiscv5.isSome():
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
else: none(DiscoveryHandler)
Expand Down
20 changes: 18 additions & 2 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type
defaultValue: false,
name: "execute" .}: bool


of noCommand:
## Application-level configuration
protectedTopics* {.
Expand Down Expand Up @@ -221,7 +222,7 @@ type
desc: "Rln relay identity commitment key as a Hex string",
defaultValue: ""
name: "rln-relay-id-commitment-key" }: string

rlnRelayTreePath* {.
desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
defaultValue: ""
Expand Down Expand Up @@ -304,10 +305,25 @@ type
name: "filternode" }: string

filterTimeout* {.
desc: "Timeout for filter node in seconds.",
desc: "Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.",
defaultValue: 14400 # 4 hours
name: "filter-timeout" }: int64

filterSubscriptionTimeout* {.
desc: "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
defaultValue: 300 # 5 minutes
name: "filter-subscription-timeout" }: int64

filterMaxPeersToServe* {.
desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.",
defaultValue: 1000
name: "filter-max-peers-to-serve" }: uint32

filterMaxCriteria* {.
desc: "Maximum number of pubsub- and content topic combination per peers at a time. Only for v2 filter protocol.",
defaultValue: 1000
name: "filter-max-criteria" }: uint32

## Lightpush config

lightpush* {.
Expand Down
32 changes: 16 additions & 16 deletions tests/node/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{.used.}
{.used.}

import
std/[
options,
tables,
options,
tables,
sequtils
],
stew/shims/net as stewNet,
Expand All @@ -12,7 +12,7 @@ import
chronicles,
os,
libp2p/[
peerstore,
peerstore,
crypto/crypto
]

Expand Down Expand Up @@ -44,7 +44,7 @@ suite "Waku Filter - End to End":
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
var messagePushHandler {.threadvar.}: FilterPushHandler

asyncSetup:
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler = proc(
Expand Down Expand Up @@ -84,8 +84,8 @@ suite "Waku Filter - End to End":
# Then the subscription is successful
check:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.len == 1
server.wakuFilter.subscriptions.hasKey(clientPeerId)
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
server.wakuFilter.subscriptions.isSubscribed(clientPeerId)

# When sending a message to the subscribed content topic
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
Expand All @@ -106,7 +106,7 @@ suite "Waku Filter - End to End":
# Then the unsubscription is successful
check:
unsubscribeResponse.isOk()
server.wakuFilter.subscriptions.len == 0
server.wakuFilter.subscriptions.subscribedPeerCount() == 0

# When sending a message to the previously subscribed content topic
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
Expand All @@ -116,7 +116,7 @@ suite "Waku Filter - End to End":
# Then the message is not pushed to the client
check:
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)

asyncTest "Client Node can't receive Push from Server Node, via Relay":
# Given the server node has Relay enabled
await server.mountRelay()
Expand All @@ -127,7 +127,7 @@ suite "Waku Filter - End to End":
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.len == 1
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# When a server node gets a Relay message
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
Expand All @@ -141,7 +141,7 @@ suite "Waku Filter - End to End":
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))

waitFor server.start()
waitFor server.mountRelay()

Expand All @@ -162,8 +162,8 @@ suite "Waku Filter - End to End":
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.len == 1
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# And the client node reboots
waitFor client.stop()
waitFor client.start()
Expand All @@ -189,8 +189,8 @@ suite "Waku Filter - End to End":
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.len == 1
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# And the client node reboots
waitFor client.stop()
waitFor client.start()
Expand All @@ -209,7 +209,7 @@ suite "Waku Filter - End to End":
)
check:
subscribeResponse2.isOk()
server.wakuFilter.subscriptions.len == 1
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# When a message is sent to the subscribed content topic, via Relay
pushHandlerFuture = newPushHandlerFuture()
Expand Down
3 changes: 3 additions & 0 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
Expand Down Expand Up @@ -528,6 +529,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

Expand Down Expand Up @@ -579,6 +581,7 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

Expand Down
1 change: 1 addition & 0 deletions tests/test_wakunode_filter_legacy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ suite "WakuNode - Filter":
waitFor allFutures(server.start(), client.start())

waitFor server.mountFilter()
waitFor server.mountLegacyFilter()
waitFor client.mountFilterClient()

## Given
Expand Down

0 comments on commit c335840

Please sign in to comment.