Skip to content

Commit

Permalink
feat: running validators in /relay/v1/messages/{pubsubTopic} (#2373)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer committed Feb 1, 2024
1 parent 3e65cc1 commit 59d8b62
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 55 deletions.
4 changes: 3 additions & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,15 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())

# Add validation keys to protected topics
var subscribedProtectedTopics : seq[ProtectedTopic]
for topicKey in conf.protectedTopics:
if topicKey.topic notin pubsubTopics:
warn "protected topic not in subscribed pubsub topics, skipping adding validator",
protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics
continue
subscribedProtectedTopics.add(topicKey)
notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key
node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topicKey.topic), topicKey.key)
node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics)

# Enable Rendezvous Discovery protocol when Relay is enabled
try:
Expand Down
34 changes: 21 additions & 13 deletions apps/wakunode2/wakunode2_validator_signed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const MessageWindowInSec = 5*60 # +- 5 minutes

import
../../waku/waku_relay/protocol,
../../waku/waku_core
../../waku/waku_core,
./external_config

declarePublicCounter waku_msg_validator_signed_outcome, "number of messages for each validation outcome", ["result"]

Expand Down Expand Up @@ -49,21 +50,28 @@ proc withinTimeWindow*(msg: WakuMessage): bool =
return true
return false

proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) =
debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey
proc addSignedTopicsValidator*(w: WakuRelay, protectedTopics: seq[ProtectedTopic]) =
debug "adding validator to signed topics"

proc validator(topic: string, msg: WakuMessage): Future[errors.ValidationResult] {.async.} =
var outcome = errors.ValidationResult.Reject

for protectedTopic in protectedTopics:

if msg.timestamp != 0:
if msg.withinTimeWindow():
let msgHash = SkMessage(topic.msgHash(msg))
let recoveredSignature = SkSignature.fromRaw(msg.meta)
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, publicTopicKey):
outcome = errors.ValidationResult.Accept
if(protectedTopic.topic == topic):
if msg.timestamp != 0:
if msg.withinTimeWindow():
let msgHash = SkMessage(topic.msgHash(msg))
let recoveredSignature = SkSignature.fromRaw(msg.meta)
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, protectedTopic.key):
outcome = errors.ValidationResult.Accept

waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
return outcome
if outcome != errors.ValidationResult.Accept:
debug "signed topic validation failed", topic=topic, publicTopicKey=protectedTopic.key
waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
return outcome

w.addValidator(topic, validator)
return errors.ValidationResult.Accept

w.addValidator(validator, "signed topic validation failed")
4 changes: 2 additions & 2 deletions tests/waku_relay/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,14 @@ suite "Waku Relay":
proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))

otherNode.addValidator(pubsubTopic, len4Validator)
otherNode.addValidator(len4Validator)
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
await sleepAsync(500.millis)
check:
otherNode.isSubscribed(pubsubTopic)

# Given a subscribed node with a validator
node.addValidator(pubsubTopic, len4Validator)
node.addValidator(len4Validator)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
await sleepAsync(500.millis)
check:
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ suite "WakuNode - Relay":
completionFutValidatorAcc.complete(true)
return ValidationResult.Accept

node2.wakuRelay.addValidator(pubSubTopic, validator)
node2.wakuRelay.addValidator(validator)

var completionFut = newFuture[bool]()
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
Expand Down
13 changes: 10 additions & 3 deletions tests/wakunode2/test_validators.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import
libp2p/multihash,
secp256k1
import
../../apps/wakunode2/external_config,
../../apps/wakunode2/wakunode2_validator_signed,
../../waku/waku_core,
../../waku/node/peer_manager,
Expand Down Expand Up @@ -42,8 +43,10 @@ suite "WakuNode2 - Validators":

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics : seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)

# Connect the nodes in a full mesh
for i in 0..<5:
Expand Down Expand Up @@ -114,8 +117,10 @@ suite "WakuNode2 - Validators":

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics : seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)

# Connect the nodes in a full mesh
for i in 0..<5:
Expand Down Expand Up @@ -232,8 +237,10 @@ suite "WakuNode2 - Validators":

# Add signed message validator to all nodes. They will only route signed messages
for node in nodes:
var signedTopics : seq[ProtectedTopic]
for topic, publicKey in topicsPublicKeys:
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
signedTopics.add(ProtectedTopic(topic: topic, key: publicKey))
node.wakuRelay.addSignedTopicsValidator(signedTopics)

# nodes[0] is connected only to nodes[1]
let connOk1 = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
Expand Down
49 changes: 47 additions & 2 deletions tests/wakunode_rest/test_rest_relay.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[sequtils,tempfiles],
std/[sequtils, strformat, tempfiles],
stew/byteutils,
stew/shims/net,
testutils/unittests,
Expand All @@ -21,7 +21,8 @@ import
../../waku/waku_relay,
../../../waku/waku_rln_relay,
../testlib/wakucore,
../testlib/wakunode
../testlib/wakunode,
../resources/payloads

proc testWakuNode(): WakuNode =
let
Expand Down Expand Up @@ -480,6 +481,50 @@ suite "Waku v2 Rest API - Relay":
$response.contentType == $MIMETYPE_TEXT
response.data == "Failed to publish. Autosharding error: invalid format: topic must start with slash"

await restServer.stop()
await restServer.closeWait()
await node.stop()

asyncTest "Post a message larger than maximum size - POST /relay/v1/messages/{topic}":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay()
await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1")))

# RPC server setup
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = RestServerRef.init(restAddress, restPort).tryGet()

restPort = restServer.server.address.port # update with bound port for client use

let cache = MessageCache.init()

installRelayApiHandlers(restServer.router, node, cache)
restServer.start()

let client = newRestHttpClient(initTAddress(restAddress, restPort))

node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1

# When
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
payload: base64.encode(getByteSequence(MaxWakuMessageSize)), # Message will be bigger than the max size
contentTopic: some(DefaultContentTopic),
timestamp: some(int64(2022))
))

# Then
check:
response.status == 400
$response.contentType == $MIMETYPE_TEXT
response.data == fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSizeStr}"

await restServer.stop()
await restServer.closeWait()
await node.stop()
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ proc mountRlnRelay*(node: WakuNode,

# register rln validator as default validator
debug "Registering RLN validator"
node.wakuRelay.addDefaultValidator(validator)
node.wakuRelay.addValidator(validator, "RLN validation failed")

node.wakuRlnRelay = rlnRelay

Expand Down
12 changes: 2 additions & 10 deletions waku/waku_api/rest/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,8 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
if not success:
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")

# validate the message before sending it
let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message)
if result == MessageValidationResult.Invalid:
return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof")
elif result == MessageValidationResult.Spam:
return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later")
elif result == MessageValidationResult.Valid:
debug "RLN proof validated successfully", pubSubTopic=pubSubTopic
else:
return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result")
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)

# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message", pubSubTopic=pubSubTopic, rln=not node.wakuRlnRelay.isNil()
Expand Down
56 changes: 34 additions & 22 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ else:
{.push raises: [].}

import
std/strformat,
stew/results,
sequtils,
chronos,
Expand Down Expand Up @@ -124,14 +125,11 @@ type
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
# a map of PubsubTopic's => seq[WakuValidatorHandler] that are
# called in order every time a message is received on a given pubsub topic
wakuValidators: Table[PubsubTopic, seq[WakuValidatorHandler]]
# a map that stores whether the ordered validator has been inserted
# for a given PubsubTopic
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
# a map of validators to error messages to return when validation fails
validatorInserted: Table[PubsubTopic, bool]
# seq of validators that are called for every pubsub topic
wakuDefaultValidators: seq[WakuValidatorHandler]

proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand Down Expand Up @@ -180,14 +178,9 @@ proc new*(T: type WakuRelay,
return ok(w)

proc addValidator*(w: WakuRelay,
topic: varargs[string],
handler: WakuValidatorHandler) {.gcsafe.} =
for t in topic:
w.wakuValidators.mgetOrPut(t, @[]).add(handler)

proc addDefaultValidator*(w: WakuRelay,
handler: WakuValidatorHandler) {.gcsafe.} =
w.wakuDefaultValidators.add(handler)
handler: WakuValidatorHandler,
errorMessage: string = "") {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

method start*(w: WakuRelay) {.async.} =
debug "start"
Expand Down Expand Up @@ -216,19 +209,38 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
let msg = msgRes.get()

# now sequentially validate the message
for validator in w.wakuDefaultValidators:
for (validator, _) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
return validatorRes

if w.wakuValidators.hasKey(pubsubTopic):
for validator in w.wakuValidators[pubsubTopic]:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
return validatorRes
return ValidationResult.Accept
return wrappedValidator

proc isValidSize(message: WakuMessage): Future[Result[void, string]] {.async.} =
let messageSizeBytes = uint64(message.encode().buffer.len)

if(messageSizeBytes > MaxWakuMessageSize):
let message = fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSizeStr}"
debug "Invalid Waku Message", error=message
return err(message)

return ok()

proc validateMessage*(w: WakuRelay, pubsubTopic: string, msg: WakuMessage):
Future[Result[void, string]] {.async.} =

(await msg.isValidSize()).isOkOr:
return err(error)

for (validator, message) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
if message.len > 0:
return err(message)
else:
return err("Validator failed")
return ok()

proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
debug "subscribe", pubsubTopic=pubsubTopic

Expand Down

0 comments on commit 59d8b62

Please sign in to comment.