Skip to content

Commit

Permalink
fix(bindings): base64 payload and key for content topic (#2435)
Browse files Browse the repository at this point in the history
* fix(bindings): base64 payload and key for content topic
* fix(bindings): store userData for event callback
* fix(bindings): json message serialization
* fix(bindings): add messageHash to the event callback
* fix(bindings): add meta field
* refactor(bindings): simplify error handling
* fix: handle undefined keys
  • Loading branch information
richard-ramos committed Feb 20, 2024
1 parent 652abf8 commit d01585e
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 108 deletions.
18 changes: 0 additions & 18 deletions library/events/json_error_event.nim

This file was deleted.

86 changes: 74 additions & 12 deletions library/events/json_message_event.nim
@@ -1,16 +1,63 @@

import
std/json
system,
std/[json,sequtils]
import
stew/[byteutils,results]
import
../../waku/common/base64,
../../waku/waku_core/message,
../../waku/waku_core/message/message,
./json_base_event

type JsonMessage = ref object
# https://rfc.vac.dev/spec/36/#jsonmessage-type
payload: string
contentTopic: string
version: uint
timestamp: int64
type
JsonMessage* = ref object
# https://rfc.vac.dev/spec/36/#jsonmessage-type
payload*: Base64String
contentTopic*: string
version*: uint
timestamp*: int64
ephemeral*: bool
meta*: Base64String
proof*: Base64String

func fromJsonNode*(T: type JsonMessage, jsonContent: JsonNode): JsonMessage =
# Visit https://rfc.vac.dev/spec/14/ for further details
JsonMessage(
payload: Base64String(jsonContent["payload"].getStr()),
contentTopic: jsonContent["contentTopic"].getStr(),
version: uint32(jsonContent{"version"}.getInt()),
timestamp: int64(jsonContent{"timestamp"}.getBiggestInt()),
ephemeral: jsonContent{"ephemeral"}.getBool(),
meta: Base64String(jsonContent{"meta"}.getStr()),
proof: Base64String(jsonContent{"proof"}.getStr())
)

proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] =
let payload = base64.decode(self.payload).valueOr:
return err("invalid payload format: " & error)

let meta = base64.decode(self.meta).valueOr:
return err("invalid meta format: " & error)

let proof = base64.decode(self.proof).valueOr:
return err("invalid proof format: " & error)

ok(WakuMessage(
payload: payload,
meta: meta,
contentTopic: self.contentTopic,
version: uint32(self.version),
timestamp: self.timestamp,
ephemeral: self.ephemeral,
proof: proof,
))

proc `%`*(value: Base64String): JsonNode =
%(value.string)

proc `%`*(value: WakuMessageHash): JsonNode =
%(to0xHex(value))

type JsonMessageEvent* = ref object of JsonEvent
pubsubTopic*: string
Expand All @@ -23,18 +70,33 @@ proc new*(T: type JsonMessageEvent,
# Returns a WakuMessage event as indicated in
# https://rfc.vac.dev/spec/36/#jsonmessageevent-type

var payload = newString(len(msg.payload))
copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload))
var payload = newSeq[byte](len(msg.payload))
if len(msg.payload) != 0:
copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload))

var meta = newSeq[byte](len(msg.meta))
if len(msg.meta) != 0:
copyMem(addr meta[0], unsafeAddr msg.meta[0], len(msg.meta))

var proof = newSeq[byte](len(msg.proof))
if len(msg.proof) != 0:
copyMem(addr proof[0], unsafeAddr msg.proof[0], len(msg.proof))

let msgHash = computeMessageHash(pubSubTopic, msg)
let msgHashHex = to0xHex(msgHash)

return JsonMessageEvent(
eventType: "message",
pubSubTopic: pubSubTopic,
messageId: "TODO",
messageId: msgHashHex,
wakuMessage: JsonMessage(
payload: payload,
payload: base64.encode(payload),
contentTopic: msg.contentTopic,
version: msg.version,
timestamp: int64(msg.timestamp)
timestamp: int64(msg.timestamp),
ephemeral: msg.ephemeral,
meta: base64.encode(meta),
proof: base64.encode(proof),
)
)

Expand Down
57 changes: 27 additions & 30 deletions library/libwaku.nim
Expand Up @@ -9,10 +9,12 @@ import
chronicles,
chronos
import
../../waku/common/base64,
../../waku/waku_core/message/message,
../../waku/node/waku_node,
../../waku/waku_core/topics/pubsub_topic,
../../../waku/waku_relay/protocol,
./events/json_base_event,
./events/json_message_event,
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
Expand Down Expand Up @@ -43,16 +45,22 @@ const RET_MISSING_CALLBACK: cint = 2
proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =
return proc (pubsubTopic: PubsubTopic, msg: WakuMessage): Future[system.void]{.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if not isNil(ctx[].eventCallback):
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), nil)
except Exception,CatchableError:
let msg = "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), nil)
else:
if isNil(ctx[].eventCallback):
error "eventCallback is nil"
return

if isNil(ctx[].eventUserData):
error "eventUserData is nil"
return

try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData)
except Exception,CatchableError:
let msg = "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData)


### End of not-exported components
################################################################################
Expand Down Expand Up @@ -106,8 +114,10 @@ proc waku_version(ctx: ptr Context,
return RET_OK

proc waku_set_event_callback(ctx: ptr Context,
callback: WakuCallBack) {.dynlib, exportc.} =
callback: WakuCallBack,
userData: pointer) {.dynlib, exportc.} =
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

proc waku_content_topic(ctx: ptr Context,
appName: cstring,
Expand Down Expand Up @@ -186,33 +196,20 @@ proc waku_relay_publish(ctx: ptr Context,
return RET_MISSING_CALLBACK

let jwm = jsonWakuMessage.alloc()
var jsonContent:JsonNode
var jsonMessage:JsonMessage
try:
jsonContent = parseJson($jwm)
let jsonContent = parseJson($jwm)
jsonMessage = JsonMessage.fromJsonNode(jsonContent)
except JsonParsingError:
deallocShared(jwm)
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
finally:
deallocShared(jwm)

deallocShared(jwm)

var wakuMessage: WakuMessage
try:
var version = 0'u32
if jsonContent.hasKey("version"):
version = (uint32) jsonContent["version"].getInt()

wakuMessage = WakuMessage(
# Visit https://rfc.vac.dev/spec/14/ for further details
payload: jsonContent["payload"].getStr().toSeq().mapIt(byte (it)),
contentTopic: $jsonContent["content_topic"].getStr(),
version: version,
timestamp: getTime().toUnix(),
ephemeral: false
)
except KeyError:
let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}"
let wakuMessage = jsonMessage.toWakuMessage().valueOr:
let msg = fmt"Problem building the WakuMessage: {error}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

Expand Down

0 comments on commit d01585e

Please sign in to comment.