Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cbindings): Thread-safe communication between the main thread and the Waku Thread #1978

Merged
merged 7 commits into from
Sep 18, 2023

Conversation

Ivansete-status
Copy link
Collaborator

@Ivansete-status Ivansete-status commented Aug 31, 2023

Description

This PR is part of a PR suite aimed to follow Jacek's recommendations re thread-safe communication.

Concretely,

  • Start using ChannelSPSCSingle so that the main thread and the Waku Thread communicate safely.
  • The two threads communicate by sending each other a ptr whose memory is being allocated in the thread-shared space.
  • One thread allocates in transmission, and the other thread deallocates in reception.
  • A request-response communication takes place.

This change is motivated by the next comment:
#1865 (comment)

Changes

Issue

Closes #1878

@github-actions
Copy link

github-actions bot commented Aug 31, 2023

You can find the image built from this PR at

quay.io/wakuorg/nwaku-pr:1978

Built from eb1cd3a


case request[].reqType
of LIFECYCLE:
waitFor cast[ptr NodeLifecycleRequest](request[].reqContent).process(node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that mixing pointer and async could cause problems but IDK how async works in Nim so 🤷

I guess blocking is the right thing to do here. Is blocking on a async proc the same as a sync proc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, afaik, the waitFor is "sync", and it keeps there making the dispatcher progress until the process is done. The reqContent pointer will only be deallocated by the process proc when it completes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitFor turns async code into sync code indeed - but a problem with this setup is that the async loop will only be running while waitFor is running - this means that it relies on a stead flow of requests in order to process networking buffers and other waku activity (timers, etc).

To solve this, process itself must be async and this should be turned into await process - the "main" loop then needs to call poll / runForever

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve this, process itself must be async and this should be turned into await process - the "main" loop then needs to call poll / runForever

Thanks for the comment @arnetheduck!
The waitFor is aimed to make the dispatcher to progress so that the request can be handled.
That process proc is called in

let resultResponse = InterThreadRequest.process(request, addr node)

On the other hand, for periods of no requests (no calls to any libwaku function,) the dispatcher progresses thanks to directly calling poll in:

Kindly let me know if that's fine from your point of view :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - this runs the risk of getting stuck in poll because poll itself will block until there is activity, and if there is no activity it will simply block forever - in fact, I'm not sure how this works at all - it should get stuck there and never perform any loop iterations - I'm probably missing some other detail which wakes up poll but this looks like a significant risk with the setup.

There are two ways to solve this: introducing a timer / sleepAsync (waitFor sleepAsync(1.millis) instead of poll) or using a [ThreadSignal]. Introducing a timer is the easier way to make this code correct.

Using a signal has the advantage of using fewer resources, but is slightly more difficult to implement. I recommend leaving it for a separate PR - the way to use that is to create a signal for every channel (this is the way channels are normally implemented, with a "notification mechanism"), and every time an item is added to the channel, the signal is fired - here's a simple example: https://gist.github.com/arnetheduck/b6a7ac8f4b85490d26d464674e09d57d#file-threadsynctask-nim

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in general" I would look for ways / strive to avoid calling waitFor in "inner" proc's and rather structure the code in such a way that the waitFor happens only at the "outer" layer - the waitFor strategy is not incorrect, but it's unusual and runs the risk of accidentally ending up with nested waitFor calls (calling waitFor from an async function) which is not supported.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in general" I would look for ways / strive to avoid calling waitFor in "inner" proc's...

Okay perfect. I enhanced that in my last commit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - this runs the risk of getting stuck in poll because poll itself will block...

Ok thanks! We'll apply waitFor sleepAsync(1) for now and will implement the enhancement with signal in further PRs. I think the poll() doesn't get stuck because the Relay protocol is continuously dispatching network events.

var ret = cast[ptr PeerManagementRequest](
allocShared0(sizeof(PeerManagementRequest)))
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string is a garbage-collected type - here, a copy must be taken with the shared allocator (and later it must be deallocated) - no garbage collected types allowed in objects constructed with create: https://status-im.github.io/nim-style-guide/interop.html#garbage-collected-types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(type needs changing to cstring)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is the trickiest part.

I was assuming that a thread-safe communication was ensured by just sending ptr types, given that the ptr type is not tracked by the GC by default. I also wanted to avoid the overhead of serializing/parsing JSON objects.

My assumption was that the next was secure:

  1. Thread A creates a ptr of the request in the shared space.
  2. Thread A sends the address of the request object to Thread B.
  3. Thread B handles the request and deallocates the memory from the shared space.

The next is used to communicate both threads:

    reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
    respChannel: ChannelSPSCSingle[ptr InterThreadResponse]

The RelayRequest type is the most complex example:

type
  RelayRequest* = object
    operation: RelayMsgType
    pubsubTopic: PubsubTopic
    relayEventCallback: WakuRelayHandler
    message: WakuMessage

... which is created by Thread A in

... and deallocated by Thread B in:

Isn't that a thread-safe approach to sending ptr types over?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerMultiAddr is a string which is a garbage-collected type (together with seq, ref and closures). Both the "main object" and all its fields need to be thread safe, ie non-ref.

This means that we manually allocate a copy of everything on createShared and release it - I suggest implementing a destroyShared function for every createShared which deallocates all fields and finally the main object - in the future, these can be turned into proper destructors but they are good for now.

@github-actions
Copy link

github-actions bot commented Sep 6, 2023

You can find the experimental image built from this PR at

quay.io/wakuorg/nwaku-pr:1978-experimental

## Waiting for the response
var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
while recvOk == false:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of this loop, a signal can be used here too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay!, we'll apply the signal enhancements in a future PR.

resp = ctx.respChannel.tryRecv()
os.sleep(1)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it happen that requests are answered out-of-order because of async?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it happen that requests are answered out-of-order because of async?

Good point! I believe this couldn't happen as requests are attended sequentially, thanks to the waitFor in

waitFor InterThreadRequest.process(request, addr node)

On the other hand, once we will apply the "signal" approach we'll have a better synchronization between both threads.

@arnetheduck
Copy link
Contributor

LGTM! looking forward to the next round

@Ivansete-status Ivansete-status merged commit 72f9066 into master Sep 18, 2023
16 checks passed
@Ivansete-status Ivansete-status deleted the thread-safe-comms-libwaku branch September 18, 2023 07:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

chore: apply thread-safe enhancements in C-bindings
3 participants