Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype Pub/Sub Intra-Process Comms #59

Closed
4 tasks done
wjwwood opened this issue Jun 15, 2015 · 16 comments
Closed
4 tasks done

Prototype Pub/Sub Intra-Process Comms #59

wjwwood opened this issue Jun 15, 2015 · 16 comments
Assignees

Comments

@wjwwood
Copy link
Member

wjwwood commented Jun 15, 2015

The current thinking is that this should be done by using DDS topics which ferry pointers to the published message which the subscriber can access directly without serialization and deserialization. This mechanism is thought to be the best solution because it will allow us to mimic the interprocess DDS topic's QoS more easily. However, it still needs more investigation to show that it is sufficiently efficient and that it actually helps us mimic the external DDS topic's QoS.

AC:

This issue is intentionally scoped to pub/sub to simplify the issue. It is my belief that if it works for pub/sub it will work for services as well.

@wjwwood wjwwood self-assigned this Jun 15, 2015
@wjwwood wjwwood added the ready Work is about to start (Kanban column) label Jun 15, 2015
@dirk-thomas
Copy link
Member

It should be made sure that at the end this is introspectable from the outside and not hidden like e.g. nodelets in ROS 1.

Update: to clarify what I mean with introspection here: it should be possible to know from the outside if the optimized intra-process communication is being used.

@wjwwood
Copy link
Member Author

wjwwood commented Jun 15, 2015

Well that should be avoided by there always being parity on the internal and external topics. I don't imagine a situation where it would make sense to subscribe to one of the proposed internal topics from another process. I guess you might do that for some kind of diagnostic, but the contents of the message would not be useful to another process.

Either way it's worth explicitly mentioning I guess.

Edit: typo

@wjwwood
Copy link
Member Author

wjwwood commented Jun 15, 2015

After talking with @dirk-thomas about it, he means that the graph API should indicate which node/topic end points (publishers and subscribers) are using the intra-process comms and which are using interprocess comms. I agree that this information should be accessible, but how we expose it depends on the as yet undefined graph API.

@wjwwood wjwwood added in progress Actively being worked on (Kanban column) and removed ready Work is about to start (Kanban column) labels Jun 19, 2015
@wjwwood
Copy link
Member Author

wjwwood commented Jul 14, 2015

Ok, I've been doing some prototyping and I've come up with a few approaches, each with pros and cons which I'm still exploring:

Reference Counting Only

Create a shared ptr of a copy of the users message and artificially increase the reference count by the number of subscribers. Send the address of this shared pointer to all subscribers with a publish and drop the publishers copy of the shared pointer, decreasing the reference count by one. Each time a subscriber gets the message with the address it dereferences the shared ptr and uses it, eventually dropping it and the reference count by 1. Once all subscribers are done with it, the memory is automatically released.

  • Concerns:
    • If the message with the pointer is dropped by the publisher or any of the subscribers then the memory is leaked because the reference count is not decreased.
    • What happens if a subscriber comes up between checking how many subscribers there are and the publisher sending the message with the pointer out?

Ack Counting Only

Create a shared ptr of a copy of the users message and put it in a vector of pointers waiting for ACK's. Send the address to subscribers with a publish. Each subscriber copies the shared ptr and sends an ACK to the publisher through another topic. Once the publisher receives ACK's for all the subscribers it drops its copy of the shared ptr. Once all subscribers have also dropped their copies the memory is reclaimed.

  • Concerns:
    • How big does the vector for un-ACK'ed shared ptrs need to be?

A follow on option is to have a limited sized ring buffer for storing messages waiting for ACK's and dropping the oldest one when the queue is too full.

  • Concerns:
    • Is there a race condition on dropping shared ptrs from the ring buffer due to space and subscribers accessing it?

@tfoote
Copy link
Contributor

tfoote commented Jul 15, 2015

Assuming we're possibly having unreliable transport, passing the raw pointer addresses seems a little unsafe. We should probably have an object which manages the pointers and we pass UUID through DDS which can then be queried from the object which is holding the pointers. (If the reference has been dropped it will give an error message and the delivery is considered a failure.) If there's a hit on the UUID the shared pointer is returned to the subscriber and discared in the storage object.

Similarly that object can be responsible for managing the queued shared pointers in a way that is consistent with the publisher's QOS settings. When a message times out or overflows the queue the shared pointer associated with the UUID is dropped.

@wjwwood
Copy link
Member Author

wjwwood commented Jul 21, 2015

Here is a summary of our discussion of this issue in our last meeting:

One issue that came up about both the reference counting approach and ack counting approach was: what do you do if DDS drops either the original message or in the case of the ack counting the ack message? Ignoring it would result in a memory leak, but there appears to be no way of getting notified when a sample is lost.

A suggestion for fixing this was to change the QoS's history setting to keep all. This has the issue of using lots of memory (and eventually running out) when a subscriber is slower than the publisher. This would be the case even if the user said their publisher should be best effort. However, it would prevent messages from being dropped until the system resources were exhausted (or the DDS QoS resource limits were hit), but then they would be dropped silently (assuming best effort). You could argue that if you're out of system resources then it doesn't matter, but to try and address this is was also suggested that we always use the Reliable QoS setting. So in addition to the history keep all side effects, the reliable setting would cause publish to block when the intra-process queue was full (system or configuration limits were met).

For a user that sets up a best effort publisher in rclcpp, there are two strange behaviors:

  • The system will keep more than queue_size messages in the heap.
  • The publish call can block even when subscribers external to the process are ready to receive data and you have selected best effort.

This basically makes non-blocking, best effort intra-process impossible.

So another option (at least for the bidirectional ack counting) was to have a custom ring buffer of shared_ptr on the publisher side, with a size >= queue_size. It would keep a reference to the "in flight" messages until the buffer was full and then it would start dropping the oldest ones. This way even if a DDS message is lost for some reason the memory isn't leaked permanently. There are some thread safety issues here, but its seems like it should be solvable with locks (inefficient / not real-time safe) or lock free structures (really inefficient but real-time safe) or maybe nothing if the shared_ptr implementation is naturally thread safe (this is a configuration in boost, I haven't looked into shared_ptr just yet). Also the "ack"ing doesn't have to come via DDS, it can be just a function call on a structure shared by the nodes (currently the "rclcpp::Context" object would serve this role). If that "ack" is not received due to DDS dropping the message either in the datawriter or one of the datareaders then the message will just hang around until it is overwritten in the ring buffer by a newer message.

P.S. I did look up about the weak_ptr/shared_ptr thread safety, according to this page about weak_ptr.lock():

http://en.cppreference.com/w/cpp/memory/weak_ptr/lock

Effectively returns expired() ? shared_ptr() : shared_ptr(*this), executed atomically.

Seems to indicate that as long as we send pointers to weak_ptr's to subscribers and then they dereference it and then call lock on it, it should be safe from the race condition of gaining ownership when the original owner (the publisher) could drop it at any point.

P.S.S. Passing weak_ptr's along doesn't help the memory issues and therefore doesn't help the thread safety. Basically there's no good time to delete the actual weak_ptr (not even the data it refers to).

(this is just a dump of my personal notes on the subject. That's why it reads like a stream of consciousness, sorry about that. I've been working on some other ideas, more to come soon...)

@wjwwood
Copy link
Member Author

wjwwood commented Aug 19, 2015

Ok I think this is ready for review, I updated the OP with links to follow on issues.

@wjwwood wjwwood added in review Waiting for review (Kanban column) and removed in progress Actively being worked on (Kanban column) labels Aug 19, 2015
@dirk-thomas
Copy link
Member

+1 for all PRs (beside the micro comments)

@jacquelinekay
Copy link
Contributor

@jacquelinekay
Copy link
Contributor

Looks like rclcpp does not compile on Linux. I was seeing this locally but it's confirmed on Jenkins. At first glance it looks like a problem with standard library includes...

@wjwwood
Copy link
Member Author

wjwwood commented Aug 19, 2015

Ok, I think I have a fix for that.

I'm going to make sure all outstanding comments are addressed, rebase, and run the style checker, then I'll start new CI jobs.

@wjwwood
Copy link
Member Author

wjwwood commented Aug 20, 2015

I had an uncommitted fix which broke the other jobs, here are three new ones:

@wjwwood
Copy link
Member Author

wjwwood commented Aug 21, 2015

@jacquelinekay
Copy link
Contributor

+1 for squash and merge on all branches

@wjwwood
Copy link
Member Author

wjwwood commented Aug 21, 2015

Ok the CI jobs look good.

@wjwwood wjwwood closed this as completed Aug 21, 2015
@wjwwood wjwwood removed the in review Waiting for review (Kanban column) label Aug 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants