Skip to content

Commit

Permalink
feat: integrate new filter protocol, other improvements (#1637)
Browse files Browse the repository at this point in the history
  • Loading branch information
jm-clius committed Apr 11, 2023
1 parent 1cfb251 commit 418efca
Show file tree
Hide file tree
Showing 15 changed files with 497 additions and 279 deletions.
4 changes: 4 additions & 0 deletions tests/all_tests_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import
./v2/waku_relay/test_waku_relay,
./v2/waku_relay/test_wakunode_relay

# Waku filter test suite
import
./v2/waku_filter_v2/test_waku_filter,
./v2/waku_filter_v2/test_waku_filter_protocol

import
# Waku v2 tests
Expand Down
8 changes: 4 additions & 4 deletions tests/v2/test_waku_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import
./testlib/wakucore


proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} =
proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilterLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager, rng, timeout)
proto = WakuFilterLegacy.new(peerManager, rng, timeout)

await proto.start()
switch.mount(proto)

return proto

proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClientLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(peerManager, rng)
proto = WakuFilterClientLegacy.new(peerManager, rng)

await proto.start()
switch.mount(proto)
Expand Down
6 changes: 6 additions & 0 deletions tests/v2/waku_filter_v2/test_waku_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ suite "Waku Filter - end to end":
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed

# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())

asyncTest "subscribe to multiple content topics and unsubscribe all":
# Given
var
Expand Down Expand Up @@ -373,3 +376,6 @@ suite "Waku Filter - end to end":
check:
pushedMsgPubsubTopic3 == DefaultPubsubTopic
pushedMsg3 == msg3

# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
184 changes: 183 additions & 1 deletion tests/v2/waku_filter_v2/test_waku_filter_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options,sets,strutils,tables],
std/[options,sequtils,sets,strutils,tables],
testutils/unittests,
chronos,
chronicles,
Expand All @@ -10,6 +10,7 @@ import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_filter_v2,
../../../waku/v2/protocol/waku_filter_v2/rpc,
../../../waku/v2/protocol/waku_filter_v2/subscriptions,
../../../waku/v2/protocol/waku_message,
../testlib/common,
../testlib/wakucore
Expand Down Expand Up @@ -197,6 +198,187 @@ suite "Waku Filter - handling subscribe requests":
response4.statusCode == 200
response4.statusDesc.get() == "OK"

asyncTest "subscribe errors":
## Tests most common error paths while subscribing

# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()

## Incomplete filter criteria

# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic]
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[]
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)

# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")

## Max content topics per request exceeded

# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it]))
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)

# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")

## Max filter criteria exceeded

# When
let
filterCriteria = toSeq(1 .. MaxCriteriaPerSubscription + 1).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))

wakuFilter.subscriptions[peerId] = filterCriteria.toHashSet()

let
reqTooManyFilterCriteria = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyFilterCriteria)

# Then
check:
response4.requestId == reqTooManyFilterCriteria.requestId
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response4.statusDesc.get().contains("peer has reached maximum number of filter criteria")

## Max subscriptions exceeded

# When
wakuFilter.subscriptions.clear()
for _ in 1 .. MaxTotalSubscriptions:
wakuFilter.subscriptions[PeerId.random().get()] = @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()

let
reqTooManySubscriptions = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqTooManySubscriptions)

# Then
check:
response5.requestId == reqTooManySubscriptions.requestId
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response5.statusDesc.get().contains("node has reached maximum number of subscriptions")

asyncTest "unsubscribe errors":
## Tests most common error paths while unsubscribing

# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()

## Incomplete filter criteria

# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic]
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[]
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)

# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")

## Max content topics per request exceeded

# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it]))
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)

# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")

## Subscription not found - unsubscribe

# When
let
reqSubscriptionNotFound = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound)

# Then
check:
response4.requestId == reqSubscriptionNotFound.requestId
response4.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response4.statusDesc.get().contains("peer has no subscriptions")

## Subscription not found - unsubscribe all

# When
let
reqUnsubscribeAll = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL
)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll)

# Then
check:
response5.requestId == reqUnsubscribeAll.requestId
response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response5.statusDesc.get().contains("peer has no subscriptions")

asyncTest "ping subscriber":
# Given
let
Expand Down
2 changes: 1 addition & 1 deletion tests/v2/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ procSuite "WakuNode - Store":
await sleepAsync(100.millis)

# Send filter push message to server from source node
await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
await filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message)

# Wait for the server filter to receive the push message
require await filterFut.withTimeout(5.seconds)
Expand Down
6 changes: 3 additions & 3 deletions tests/v2/wakunode_jsonrpc/test_jsonrpc_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":

check:
# Light node has not yet subscribed to any filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0

let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
Expand All @@ -70,13 +70,13 @@ procSuite "Waku v2 JSON-RPC API - Filter":
check:
response == true
# Light node has successfully subscribed to 4 content topics
node2.wakuFilterClient.getSubscriptionsCount() == 4
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 4

response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully unsubscribed from all filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0

## Cleanup
await server.stop()
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/jsonrpc/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
connected: it.connectedness == Connectedness.Connected))
peers.add(relayPeers)

if not node.wakuFilter.isNil():
if not node.wakuFilterLegacy.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/waku_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proc startMetricsLog*() =
let pxPeers = collectorAsF64(waku_px_peers)
let lightpushPeers = collectorAsF64(waku_lightpush_peers)
let filterPeers = collectorAsF64(waku_filter_peers)
let filterSubscribers = collectorAsF64(waku_filter_subscribers)
let filterSubscribers = collectorAsF64(waku_legacy_filter_subscribers)

info "Total connections initiated", count = $freshConnCount
info "Total messages", count = totalMessages
Expand Down

0 comments on commit 418efca

Please sign in to comment.