Skip to content

Commit

Permalink
chore: remove deprecated legacy filter protocol (#2507)
Browse files Browse the repository at this point in the history
* chore: remove deprecated legacy filter protocol

* fix: do not use legacy import in test

* fix: remove legacy test references

* fix: more test fixes, starting filter client

* fix: sigh. more references to remove.

* fix: fix dereferencing error

* fix: fix merge mess up

* fix: sigh. merge tool used tabs.

* fix: more peer manager tests needed fixing

---------

Co-authored-by: Hanno Cornelius <hanno@status.im>
Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 25, 2024
1 parent b5e2edb commit e861317
Show file tree
Hide file tree
Showing 27 changed files with 76 additions and 1,649 deletions.
23 changes: 1 addition & 22 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import
../../waku/waku_core,
../../waku/waku_lightpush/common,
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
../../waku/waku_store,
../../waku/waku_dnsdisc,
Expand Down Expand Up @@ -283,17 +282,6 @@ proc writeAndPrint(c: Chat) {.async.} =
c.nick = await readNick(c.transp)
echo "You are now known as " & c.nick
elif line.startsWith("/exit"):
if not c.node.wakuFilterLegacy.isNil():
echo "unsubscribing from content filters..."

let peerOpt = c.node.peerManager.selectPeer(WakuLegacyFilterCodec)
if peerOpt.isSome():
await c.node.legacyFilterUnsubscribe(
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = c.contentTopic,
peer = peerOpt.get(),
)

echo "quitting..."

try:
Expand Down Expand Up @@ -514,24 +502,15 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
let peerInfo = parsePeerInfo(conf.filternode)
if peerInfo.isOk():
await node.mountFilter()
await node.mountLegacyFilter()
await node.mountFilterClient()
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)

proc filterHandler(
pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async, gcsafe, closure.} =
trace "Hit filter handler", contentTopic = msg.contentTopic
chat.printReceivedMessage(msg)

await node.legacyFilterSubscribe(
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = chat.contentTopic,
filterHandler,
peerInfo.value,
)
# TODO: Here to support FilterV2 relevant subscription, but still
# Legacy Filter is concurrent to V2 untill legacy filter will be removed
# TODO: Here to support FilterV2 relevant subscription.
else:
error "Filter not mounted. Couldn't parse conf.filternode", error = peerInfo.error

Expand Down
3 changes: 0 additions & 3 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import
../../../waku/waku_core,
../../../waku/waku_node,
../../../waku/node/peer_manager,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/waku_store,
../../waku/factory/builder,
Expand Down Expand Up @@ -294,7 +293,6 @@ when isMainModule:

if conf.filter:
waitFor mountFilter(bridge.nodev2)
waitFor mountLegacyFilter(bridge.nodev2)

if conf.staticnodes.len > 0:
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
Expand All @@ -309,7 +307,6 @@ when isMainModule:
if conf.filternode != "":
let filterPeer = parsePeerInfo(conf.filternode)
if filterPeer.isOk():
bridge.nodev2.peerManager.addServicePeer(filterPeer.value, WakuLegacyFilterCodec)
bridge.nodev2.peerManager.addServicePeer(
filterPeer.value, WakuFilterSubscribeCodec
)
Expand Down
10 changes: 2 additions & 8 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
Expand All @@ -48,7 +47,6 @@ import
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush/common,
../../waku/waku_filter,
../../waku/waku_filter_v2,
../../waku/factory/node_factory,
../../waku/factory/internal_config,
Expand Down Expand Up @@ -368,12 +366,8 @@ proc startRestServer(
"/relay endpoints are not available. Please check your configuration: --relay"

## Filter REST API
if conf.filternode != "" and app.node.wakuFilterClient != nil and
app.node.wakuFilterClientLegacy != nil:
let legacyFilterCache = MessageCache.init()
rest_legacy_filter_api.installLegacyFilterRestApiHandlers(
server.router, app.node, legacyFilterCache
)
if conf.filternode != "" and
app.node.wakuFilterClient != nil:

let filterCache = MessageCache.init()

Expand Down
4 changes: 0 additions & 4 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ import
# Waku v2 tests
./test_wakunode,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_peer_store_extended,
./test_message_cache,
./test_peer_manager,
Expand All @@ -77,7 +74,6 @@ import
./wakunode_rest/test_rest_serdes,
./wakunode_rest/test_rest_store,
./wakunode_rest/test_rest_filter,
./wakunode_rest/test_rest_legacy_filter,
./wakunode_rest/test_rest_lightpush,
./wakunode_rest/test_rest_admin,
./wakunode_rest/test_rest_cors
Expand Down
1 change: 0 additions & 1 deletion tests/node/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ suite "Waku Filter - End to End":
await allFutures(server.start(), client.start())

await server.mountFilter()
await server.mountLegacyFilter()
await client.mountFilterClient()

client.wakuFilterClient.registerPushHandler(messagePushHandler)
Expand Down
79 changes: 35 additions & 44 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import
../../waku/waku_core,
../../waku/waku_enr/capabilities,
../../waku/waku_relay/protocol,
../../waku/waku_filter_v2/common,
../../waku/waku_store/common,
../../waku/waku_filter/protocol,
../../waku/waku_lightpush/common,
../../waku/waku_peer_exchange,
../../waku/waku_metadata,
Expand Down Expand Up @@ -63,11 +63,10 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

# Dial node2 from node1
let conn = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec
nodes[1].peerInfo.toRemotePeerInfo(), WakuFilterSubscribeCodec
)
await sleepAsync(chronos.milliseconds(500))

Expand Down Expand Up @@ -107,13 +106,13 @@ procSuite "Peer Manager":

# Dial non-existent peer from node1
let conn1 =
await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuLegacyFilterCodec)
await nodes[0].peerManager.dialPeer(nonExistentPeer, WakuStoreCodec)
check:
conn1.isNone()

# Dial peer not supporting given protocol
let conn2 = await nodes[0].peerManager.dialPeer(
nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec
nodes[1].peerInfo.toRemotePeerInfo(), WakuStoreCodec
)
check:
conn2.isNone()
Expand All @@ -134,20 +133,17 @@ procSuite "Peer Manager":

await node.start()

await node.mountFilterClient()
node.mountStoreClient()

node.peerManager.addServicePeer(storePeer.toRemotePeerInfo(), WakuStoreCodec)
node.peerManager.addServicePeer(
filterPeer.toRemotePeerInfo(), WakuLegacyFilterCodec
filterPeer.toRemotePeerInfo(), WakuFilterSubscribeCodec
)

# Check peers were successfully added to peer manager
check:
node.peerManager.peerStore.peers().len == 2
node.peerManager.peerStore.peers(WakuLegacyFilterCodec).allIt(
node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).allIt(
it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and
it.protocols.contains(WakuLegacyFilterCodec)
it.protocols.contains(WakuFilterSubscribeCodec)
)
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(
it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and
Expand Down Expand Up @@ -762,39 +758,36 @@ procSuite "Peer Manager":
let
node =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
peers = toSeq(1 .. 5)
peers = toSeq(1 .. 4)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk())
.mapIt(it.value)

require:
peers.len == 5
peers.len == 4

# service peers
node.peerManager.addServicePeer(peers[0], WakuStoreCodec)
node.peerManager.addServicePeer(peers[1], WakuLegacyFilterCodec)
node.peerManager.addServicePeer(peers[2], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[3], WakuPeerExchangeCodec)
node.peerManager.addServicePeer(peers[1], WakuLightPushCodec)
node.peerManager.addServicePeer(peers[2], WakuPeerExchangeCodec)

# relay peers (should not be added)
node.peerManager.addServicePeer(peers[4], WakuRelayCodec)
node.peerManager.addServicePeer(peers[3], WakuRelayCodec)

# all peers are stored in the peerstore
check:
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId)
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId)

# but the relay peer is not
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[4].peerId) == false
node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == false

# all service peers are added to its service slot
check:
node.peerManager.serviceSlots[WakuStoreCodec].peerId == peers[0].peerId
node.peerManager.serviceSlots[WakuLegacyFilterCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[2].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[3].peerId
node.peerManager.serviceSlots[WakuLightPushCodec].peerId == peers[1].peerId
node.peerManager.serviceSlots[WakuPeerExchangeCodec].peerId == peers[2].peerId

# but the relay peer is not
node.peerManager.serviceSlots.hasKey(WakuRelayCodec) == false
Expand All @@ -809,51 +802,50 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# create some connections/streams
require:
check:
# some relay connections
(await nodes[0].peerManager.connectRelay(pInfos[1])) == true
(await nodes[0].peerManager.connectRelay(pInfos[2])) == true
(await nodes[1].peerManager.connectRelay(pInfos[2])) == true

(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[2], WakuFilterSubscribeCodec)).isSome() ==
true

# isolated dial creates a relay conn under the hood (libp2p behaviour)
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuLegacyFilterCodec)).isSome() ==
(await nodes[2].peerManager.dialPeer(pInfos[3], WakuFilterSubscribeCodec)).isSome() ==
true

# assert physical connections
check:
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuRelayCodec)[1].len == 2

nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 2
nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 0
nodes[0].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 2

nodes[1].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[1].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0

nodes[2].peerManager.connectedPeers(WakuRelayCodec)[0].len == 2
nodes[2].peerManager.connectedPeers(WakuRelayCodec)[1].len == 1

nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[2].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 1

nodes[3].peerManager.connectedPeers(WakuRelayCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuRelayCodec)[1].len == 0

nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuLegacyFilterCodec)[1].len == 0
nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[0].len == 1
nodes[3].peerManager.connectedPeers(WakuFilterSubscribeCodec)[1].len == 0

asyncTest "getNumStreams() returns expected number of connections per protocol":
# Create 2 nodes
Expand All @@ -865,28 +857,27 @@ procSuite "Peer Manager":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
await allFutures(nodes.mapIt(it.mountFilter()))
await allFutures(nodes.mapIt(it.mountLegacyFilter()))

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

require:
# multiple streams are multiplexed over a single connection.
# note that a relay connection is created under the hood when dialing a peer (libp2p behaviour)
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuLegacyFilterCodec)).isSome() ==
(await nodes[0].peerManager.dialPeer(pInfos[1], WakuFilterSubscribeCodec)).isSome() ==
true

check:
nodes[0].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[0].peerManager.getNumStreams(WakuLegacyFilterCodec) == (0, 4)
nodes[0].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (0, 4)

nodes[1].peerManager.getNumStreams(WakuRelayCodec) == (1, 1)
nodes[1].peerManager.getNumStreams(WakuLegacyFilterCodec) == (4, 0)
nodes[1].peerManager.getNumStreams(WakuFilterSubscribeCodec) == (4, 0)

test "selectPeer() returns the correct peer":
# Valid peer id missing the last digit
Expand All @@ -909,7 +900,7 @@ procSuite "Peer Manager":
# Add a peer[0] to the peerstore
pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs
pm.peerStore[ProtoBook][peers[0].peerId] =
@[WakuRelayCodec, WakuStoreCodec, WakuLegacyFilterCodec]
@[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec]

# When no service peers, we get one from the peerstore
let selectedPeer1 = pm.selectPeer(WakuStoreCodec)
Expand All @@ -918,7 +909,7 @@ procSuite "Peer Manager":
selectedPeer1.get().peerId == peers[0].peerId

# Same for other protocol
let selectedPeer2 = pm.selectPeer(WakuLegacyFilterCodec)
let selectedPeer2 = pm.selectPeer(WakuFilterSubscribeCodec)
check:
selectedPeer2.isSome() == true
selectedPeer2.get().peerId == peers[0].peerId
Expand Down

0 comments on commit e861317

Please sign in to comment.