Skip to content

Commit

Permalink
chore: add validator for dos protec metrics and move to app (#1704)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta committed May 2, 2023
1 parent 703c3ab commit 3e14686
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 258 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
../../waku/v2/waku_peer_exchange,
../../waku/v2/waku_relay/validators,
../../waku/v2/waku_store,
../../waku/v2/waku_lightpush,
../../waku/v2/waku_filter,
./wakunode2_validator_signed,
./config
import
../../waku/v2/node/message_cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ else:
import
chronicles,
chronos,
metrics,
stew/byteutils,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
Expand All @@ -14,8 +15,10 @@ import
secp256k1

import
./protocol,
../waku_core
../../waku/v2/waku_relay/protocol,
../../waku/v2/waku_core

declarePublicCounter waku_msg_validator_signed_outcome, "number of messages for each validation outcome", ["result"]

# Application level message hash
proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
Expand All @@ -27,25 +30,23 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())

# TODO: Other fields?

return ctx.finish()

proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) =
debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey

proc validator(topic: string, message: messages.Message): Future[errors.ValidationResult] {.async.} =
let msg = WakuMessage.decode(message.data)
var outcome = errors.ValidationResult.Reject

if msg.isOk():
let msgHash = SkMessage(topic.msgHash(msg.get))
let recoveredSignature = SkSignature.fromRaw(msg.get.meta)
if recoveredSignature.isErr():
# TODO: add metrics for accept/reject
return errors.ValidationResult.Reject
if recoveredSignature.get.verify(msgHash, publicTopicKey):
return errors.ValidationResult.Accept
else:
return errors.ValidationResult.Reject
return errors.ValidationResult.Reject
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, publicTopicKey):
outcome = errors.ValidationResult.Accept

waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
return outcome

w.addValidator(topic, validator)
4 changes: 3 additions & 1 deletion tests/all_tests_wakunode2.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## Wakunode2

import
./wakunode2/test_app
./wakunode2/test_app,
./wakunode2/test_validators

244 changes: 0 additions & 244 deletions tests/v2/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import
../../waku/v2/node/peer_manager,
../../waku/v2/waku_node,
../../waku/v2/waku_relay,
../../waku/v2/waku_relay/validators,
../testlib/testutils,
../testlib/common,
../testlib/wakucore,
Expand Down Expand Up @@ -248,249 +247,6 @@ suite "WakuNode - Relay":

await allFutures(nodes.mapIt(it.stop()))

# TODO: Test multiple protected topics

asyncTest "Spam protected topic accepts signed messages":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay for all nodes
await allFutures(nodes.mapIt(it.mountRelay()))

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# Connect the nodes in a full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk

# Connection triggers different actions, wait for them
await sleepAsync(500.millis)

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, data: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Each node publishes 10 signed messages
for i in 0..<5:
for j in 0..<10:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)

# Include signature
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(spamProtectedTopic, msg)

# Wait for gossip
await sleepAsync(2.seconds)

# 50 messages were sent to 5 peers = 250 messages
check:
msgReceived == 250

# No invalid messages were received by any peer
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0

# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

asyncTest "Spam protected topic rejects non-signed and wrongly-signed messages":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Non whitelisted secret key
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay with spam protected topics
await allFutures(nodes.mapIt(it.mountRelay()))

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# Connect the nodes in a full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Connection triggers different actions, wait for them
await sleepAsync(500.millis)

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Each node sends 10 messages, signed but with a non-whitelisted key (total = 50)
for i in 0..<5:
for j in 0..<10:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)

# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]

await nodes[i].publish(spamProtectedTopic, msg)

# Each node sends 10 messages that are not signed (total = 50)
for i in 0..<5:
for j in 0..<10:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[i].publish(spamProtectedTopic, unsignedMessage)

# Wait for gossip
await sleepAsync(2.seconds)

# Since we have a full mesh with 5 nodes and each one publishes 50+50 msgs
# there are 500 messages being sent.
# 100 are received ok in the handler (first hop)
# 400 are are wrong so rejected (rejected not relayed)
check:
msgReceived == 100

var msgRejected = 0
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int

check:
msgRejected == 400

await allFutures(nodes.mapIt(it.stop()))

asyncTest "Spam protected topic rejects a spammer node":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

# Protected topic and key to sign
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
let publicKey = secretKey.toPublicKey()
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable

# Non whitelisted secret key
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")

# Start all the nodes and mount relay with protected topic
await allFutures(nodes.mapIt(it.start()))

# Mount relay for all nodes
await allFutures(nodes.mapIt(it.mountRelay()))

var msgReceived = 0
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
msgReceived += 1

# Subscribe all nodes to the same topic/handler
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
await sleepAsync(500.millis)

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)

# nodes[0] is connected only to nodes[1]
let connOk1 = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
require connOk1

# rest of nodes[1..4] are connected in a full mesh
for i in 1..<5:
for j in 1..<5:
if i == j:
continue
let connOk2 = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk2

# Connection triggers different actions, wait for them
await sleepAsync(500.millis)

# nodes[0] spams 50 non signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
let unsignedMessage = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
await nodes[0].publish(spamProtectedTopic, unsignedMessage)

# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
for j in 0..<50:
var msg = WakuMessage(
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
version: 2, timestamp: now(), ephemeral: true)
# Sign the message with a wrong key
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
await nodes[0].publish(spamProtectedTopic, msg)

# Wait for gossip
await sleepAsync(2.seconds)

# only 100 messages are received (50 + 50) which demonstrate
# nodes[1] doest gossip invalid messages.
check:
msgReceived == 100

# peer1 got invalid messages from peer0
let p0Id = nodes[0].peerInfo.peerId
check:
nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedTopic].invalidMessageDeliveries == 100.0

# peer1 did not gossip further, so no other node rx invalid messages
for i in 0..<5:
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
if k == p0Id and i == 1:
continue
check:
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0

# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))

asyncTest "Messages are relayed between two websocket nodes":
let
nodeKey1 = generateSecp256k1Key()
Expand Down

0 comments on commit 3e14686

Please sign in to comment.