Skip to content

Commit

Permalink
fix(2491): Fix metadata protocol disconnecting light nodes (#2533)
Browse files Browse the repository at this point in the history
* Fix metadata protocol disconnecting light nodes.
* Implement test cases.
  • Loading branch information
AlejandroCabeza committed Mar 19, 2024
1 parent 7aea2d4 commit 33774fa
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 2 deletions.
127 changes: 127 additions & 0 deletions tests/node/peer_manager/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import
chronicles,
std/[options, tables, strutils],
stew/shims/net,
chronos,
testutils/unittests

import
../../../waku/[node/waku_node, waku_core],
../../waku_lightpush/[lightpush_utils],
../../testlib/[wakucore, wakunode, futures, testasync],
../../../../waku/node/peer_manager/peer_manager

suite "Peer Manager":
suite "onPeerMetadata":
var
listenPort {.threadvar.}: Port
listenAddress {.threadvar.}: IpAddress
serverKey {.threadvar.}: PrivateKey
clientKey {.threadvar.}: PrivateKey
clusterId {.threadvar.}: uint64
shardTopic0 {.threadvar.}: string
shardTopic1 {.threadvar.}: string

asyncSetup:
listenPort = Port(0)
listenAddress = ValidIpAddress.init("0.0.0.0")
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
clusterId = 1
shardTopic0 = "/waku/2/rs/" & $clusterId & "/0"
shardTopic1 = "/waku/2/rs/" & $clusterId & "/1"

asyncTest "light client is not disconnected":
# Given two nodes with the same shardId
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1])

# And both mount metadata and filter
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountFilterClient()
await server.mountFilter()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
discard await client.filterSubscribe(
some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo
)
await sleepAsync(FUTURE_TIMEOUT)

check:
server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)

asyncTest "relay with same shardId is not disconnected":
# Given two nodes with the same shardId
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic0])

# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
await sleepAsync(FUTURE_TIMEOUT)

check:
server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)

asyncTest "relay with different shardId is disconnected":
# Given two nodes with different shardIds
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1])

# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
await sleepAsync(FUTURE_TIMEOUT)

check:
not server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
not client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)
9 changes: 7 additions & 2 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import
metrics,
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver
libp2p/nameresolving/nameresolver,
libp2p/peerstore

import
../../common/nimchronos,
../../common/enr,
Expand Down Expand Up @@ -369,7 +371,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
$clusterId
break guardClauses

if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
if (
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
):
reason = "no shards in common"
break guardClauses

Expand Down
2 changes: 2 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ proc filterSubscribe*(
contentTopics = contentTopics,
peer = remotePeer.peerId

when (contentTopics is ContentTopic):
let contentTopics = @[contentTopics]
let subRes = await node.wakuFilterClient.subscribe(
remotePeer, pubsubTopic.get(), contentTopics
)
Expand Down

0 comments on commit 33774fa

Please sign in to comment.