Skip to content

Commit

Permalink
refactor(cbindings): Thread-safe communication between the main threa…
Browse files Browse the repository at this point in the history
…d and the Waku Thread (#1978)

* Thread-safe comms between main thread & Waku Thread - ChannelSPSCSingle.
* Renaming procs from 'new' to 'createShared'. They use the shared allocator.
* peer_manager_request: no need to use ptr WakuNode.
* waku_thread: moving the 'waitFor' to upper layer.
* waku_thread: `poll()` -> `waitFor sleepAsync(1)` to avoid risk of blocking.
* libwaku: thread-safe "sub-objects" in an inter-thread requests.
  When two threads send data each other, that data cannot contain any
  GC'ed type (string, seq, ref, closures) at any level.
* Allocating the 'configJson' in main thread and deallocating in Waku Thread.
  • Loading branch information
Ivansete-status committed Sep 18, 2023
1 parent 2e515a0 commit 72f9066
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 166 deletions.
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,8 @@
url = https://github.com/nitely/nim-unicodedb.git
ignore = untracked
branch = master
[submodule "vendor/nim-taskpools"]
path = vendor/nim-taskpools
url = https://github.com/status-im/nim-taskpools.git
ignore = untracked
branch = stable
29 changes: 29 additions & 0 deletions library/alloc.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,36 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]

proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret

proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0..<str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret

proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = cast[ptr T](allocShared(s.len))
copyMem(data, unsafeAddr s, s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)

proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0

proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0..<s.len:
ret.add(s.data[i])
return ret
52 changes: 31 additions & 21 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import
../../../waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/node_lifecycle_request,
./waku_thread/inter_thread_communication/peer_manager_request,
./waku_thread/inter_thread_communication/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc

################################################################################
Expand Down Expand Up @@ -79,9 +80,10 @@ proc waku_new(configJson: cstring,
return RET_ERR

let sendReqRes = waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
Expand Down Expand Up @@ -199,10 +201,11 @@ proc waku_relay_publish(pubSubTopic: cstring,
$pst

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
deallocShared(pst)

if sendReqRes.isErr():
Expand All @@ -214,24 +217,28 @@ proc waku_relay_publish(pubSubTopic: cstring,

proc waku_start() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.START_NODE))
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.START_NODE))

proc waku_stop() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.STOP_NODE))
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.STOP_NODE))

proc waku_relay_subscribe(
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
{.dynlib, exportc.} =

let pst = pubSubTopic.alloc()

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)

if sendReqRes.isErr():
Expand All @@ -247,10 +254,12 @@ proc waku_relay_unsubscribe(
{.dynlib, exportc.} =

let pst = pubSubTopic.alloc()

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.UNSUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)

if sendReqRes.isErr():
Expand All @@ -266,7 +275,8 @@ proc waku_connect(peerMultiAddr: cstring,
{.dynlib, exportc.} =

let connRes = waku_thread.sendRequestToWakuThread(
PeerManagementRequest.new(
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.CONNECT_TO,
$peerMultiAddr,
chronos.milliseconds(timeoutMs)))
Expand Down

This file was deleted.

21 changes: 0 additions & 21 deletions library/waku_thread/inter_thread_communication/request.nim

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../../events/[json_error_event,json_message_event,json_base_event],
../config,
./request
../../../../waku/common/enr/builder,
../../../../waku/waku_enr/capabilities,
../../../../waku/waku_enr/multiaddr,
../../../../waku/waku_enr/sharding,
../../../../waku/waku_core/message/message,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/node/waku_node,
../../../../waku/node/builder,
../../../../waku/node/config,
../../../../waku/waku_relay/protocol,
../../../events/[json_error_event,json_message_event,json_base_event],
../../../alloc,
../../config

type
NodeLifecycleMsgType* = enum
Expand All @@ -29,18 +29,25 @@ type
STOP_NODE

type
NodeLifecycleRequest* = ref object of InterThreadRequest
NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation

proc new*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = ""): T =
proc createShared*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = ""): ptr type T =

return NodeLifecycleRequest(operation: op, configJson: configJson)
var ret = createShared(T)
ret[].operation = op
ret[].configJson = configJson.alloc()
return ret

proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =
Future[Result[WakuNode, string]] {.async.} =

var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Expand Down Expand Up @@ -108,8 +115,10 @@ proc createNode(configJson: cstring):

return ok(newNode)

method process*(self: NodeLifecycleRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr NodeLifecycleRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

defer: destroyShared(self)

case self.operation:
of CREATE_NODE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,33 @@ import
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
./request
../../../../waku/node/waku_node,
../../../alloc

type
PeerManagementMsgType* = enum
CONNECT_TO

type
PeerManagementRequest* = ref object of InterThreadRequest
PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: string
peerMultiAddr: cstring
dialTimeout: Duration

proc new*(T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration): T =
proc createShared*(T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration): ptr type T =

return PeerManagementRequest(operation: op,
peerMultiAddr: peerMultiAddr,
dialTimeout: dialTimeout)
var ret = createShared(T)
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr.alloc()
ret[].dialTimeout = dialTimeout
return ret

proc destroyShared(self: ptr PeerManagementRequest) =
deallocShared(self[].peerMultiAddr)
deallocShared(self)

proc connectTo(node: WakuNode,
peerMultiAddr: string,
Expand All @@ -46,13 +52,15 @@ proc connectTo(node: WakuNode,

return ok()

method process*(self: PeerManagementRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr PeerManagementRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =

defer: destroyShared(self)

case self.operation:

of CONNECT_TO:
let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout)
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down

0 comments on commit 72f9066

Please sign in to comment.