Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak caused by publish_to_topic(...) operator #13

Open
bheijden opened this issue Aug 6, 2021 · 10 comments
Open

Memory leak caused by publish_to_topic(...) operator #13

bheijden opened this issue Aug 6, 2021 · 10 comments
Labels
bug Something isn't working

Comments

@bheijden
Copy link

bheijden commented Aug 6, 2021

Hi,

I'm observing that that the publish_to_topic(...) operator can cause a memory leak because the subscription inside the publish_to_topic(...) is not disposed when the subscription to a pipeline in which publish_to_topic(...) is used, is disposed.

To clarify what I mean:

s = Subject()
d = s.pipe(publish_to_topic(...)).subscribe(on_next=lambda x: print(x))  # d is the Disposable of the pipeline subscription

d.dispose()  # Disposing d does not dispose the subscription inside publish_to_topic(...)
@gavanderhoorn
Copy link
Member

Thanks for reporting.

@henrik7264: could you take a look?

@gavanderhoorn gavanderhoorn added the bug Something isn't working label Aug 13, 2021
@henrik7264
Copy link
Collaborator

henrik7264 commented Aug 16, 2021 via email

@henrik7264
Copy link
Collaborator

henrik7264 commented Aug 16, 2021 via email

@henrik7264
Copy link
Collaborator

@bheijden - first of all thanks a lot for taking interest in this project. It's much appreciated. I have taken a look at your issue, but I am not sure I understand you completely. Is it possible to describe a use case where you need to dispose the pipeline/stream, or maybe even better an example that demonstrates the need for disposing the pipeline/stream?

I have to admit that your issue will have to require a lot of research from my side to solve. But first, I am eager to hear your answer to the above question.

@bheijden
Copy link
Author

bheijden commented Aug 19, 2021

I am building an elaborate pipeline for a reinforcement learning simulation framework based on rxpy and ROS. After an episode terminates, I need to re-initialize parts of the pipeline to clear the states of stateful rx operators (e.g. combine latest, zip, ReplaySubjects, etc...). Perhaps a flushing routine could be created that flushes these states, but that seems a lot more complicated than simply disposing of the old pipeline and re-initializing a new one with "fresh" rx operators.

In addition to the memory leak, the current implementation will also create a new publisher every time you reinitialize a pipeline that contains the operator.

I've re-implemented the operator myself in ROS 1 (but can easily be adapted to ROS 2) to address the aforementioned problems. First, this implementation avoids a subscription inside the operator, thus relieving the need to dispose of any subscriptions hence resolving the memory leak. Also, the operators takes an already initialized publisher as an argument, so that we avoid creating a new publisher every time.

def publisher_to_topic(publisher):
    def _publisher_to_topic(source):
        def subscribe(observer, scheduler=None):
            def on_next(msg):
                publisher.publish(msg)
                observer.on_next(msg)

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)

        return rx.create(subscribe)

    return _publisher_to_topic

@henrik7264
Copy link
Collaborator

@bheijden - tanks a lot for sharing this information with us and thanks a lot for the code example. I will try to explain my view of the issue in the following text. It is unfortunately not an easy issue to solve even though the problem appears to be a small local problem. If you look at the code below (taken from the https://github.com/rosin-project/rxros2/blob/master/rxros2_py_examples/rxros2_py_examples/talker_old_style.py, slightly modified).

def main(args=None):
    rclpy.init(args=args)
    talker = rxros2.create_node("talker")

    rxros2.interval(1.0).pipe(
        rxros2.map(lambda i: mk_msg("Hello world " + str(i))),
        rxros2.publish_to_topic(talker, String, "/chatter"))

    rclpy.spin(talker)
    talker.destroy_node()
    rclpy.shutdown()

This is a good example of a RxROS python program – you may even see it as a template for how we have envisioned a RxROS program looks like. It is something we have used considerable time on designing this way. It consists of some initialization and creation of a node, then handling of several observable streams, and finally it consists of a spinner and code to close down the note/program.
There are two things to observe here:

  1. The interface to ROS2 is minimal. It consists of a small number of RxPy operators like for example publish_to_topic. The operators have by design been created to hide as many of the ROS2 operations as possible - like creating subscriptions of topics etc. The idea is that the programmer should use time on implementing the functionality of the node rather than be bothered with ROS2 functionality. The other idea is that the code will be extremely compact this way. The downside of this design is that the programmer does not have many possibilities for variability and for writing specialized code.
  2. The other strange part is the last two lines of code. These lines of code are never executed as the spinner is a blocking operation. Even in cases where the stream may complete the program will not terminate and the node is never properly closed/destroyed.

There is a third observation to make – not really from the example though, and that that is

  1. Handling of errors is not easy in RxROS. We have unfortunately not used much time on this topic, neither in the code examples nor in the documentation. On top of that I have actually very little experience in how to handle errors in reactive programming.

So, back to your issue, and why it is so hard to solve. I could argue (see the above observations) that RxROS is simply not designed to handle the situation you describe. The situation you describe has never been considered during the design of RxROS and fixing the issue will require a design change, code and documentation change. This is a major undertaking.

I could also argue that you are a bit naughty 😊 to use the dispose command. It sounds like a crude way to solve your issue. But as mentioned, I do unfortunately one have the knowledge or expertise to guide you in the right direction here. Well, the good thing is that you have already found a work around for the problem.

My mind is a bit split. On one hand we have some design ideas of how to use RxROS and on the other hand we have a user that is not able to use the provided functionality without running in to memory leaks. I do like your code example very much. There is some sound reasoning in separating the ROS functionality from the RxRy operators. But the fix requires a new interface to the RxROS operators.

@bheijden – in case we choose to expand the RxROS library/language with a new interface will you allow us to use your example/code?

@henrik7264
Copy link
Collaborator

Anyone, please feel free to comment on this issue/discussion.

@bheijden
Copy link
Author

Thanks for your elaborate answer. And your are free to use the example/code I've posted here.

Regarding the discussion, perhaps it is a matter of what users you wish to target? If you wish to cater ROS users by extending ROS with simple but effective reactive programming routines, supporting more advanced functionality like disposing of old pipelines and concurrency might only make the package less accessible to them. On the other hand, if the idea was to create a bridge between ROS and the rxpy package, so that all the benefits of rxpy could be exploited, I do think it is important that properly disposing of old subscriptions when a pipeline is disposed should be supported (or at least its limitations be documented). Else, users accustomed to rxpy might run into operator behavior they would not have expected.

The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends. I can see that using the dispose command seems crude, as using the on_complete() method would be more clean. However, this is complicated when dealing with infinite observables, and the completion of the observable depends on an event that cannot be anticipated on beforehand (e.g. by means of counting). Also, I am not sure whether the problem would be solved when calling on_complete() instead.

If it is a strict requirement that the interface of the operator must remain unchanged, you can keep the publisher creation inside the operator by modifying the code as shown below. This operator follows the same structure as most rxpy standard operators. This structure is also presented in the official documentation here.

def publish_to_topic(node: ROS2Node, topic_type: Any, topic_name: str, qos=10) -> Callable[[Observable], Observable]:
    """
    The to_topic operator will take each message from the stream and publish it to a specific ROS2 topic.
    :param node: An instance of a Node.
    :param topic_type: The type of the observable data elements.
    :param topic_name: The name of the ROS2 topic to publish the messages to.
    :param qos: The quality of service associated to ROS2 publisher.
    :return: The observable data stream it operates on, i.e. it is an identity operator.
    """
    def _publish_to_topic(source) -> Observable:
        publisher = node.create_publisher(topic_type, topic_name, qos)
             def on_next(msg):
                publisher.publish(msg)
                observer.on_next(msg)

            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)

        return rx.create(subscribe)
    return _publish_to_topic

Another issue with the current implementation has to do with cold observables (but perhaps it is documented somewhere that this operator should only be used with hot observables?). See the example below where I have defined both the current and alternative implementation of the operator:

import rx
from rx import operators as ops
import rospy
from std_msgs.msg import UInt64

rospy.init_node('rxeager', anonymous=True, log_level=rospy.INFO)

rospy.Subscriber('/publish_to_topic', UInt64, lambda msg: print(msg), queue_size=10000)

# Current implementation
def publish_to_topic(topic_type, topic_name):
    def _publish_to_topic(source):
        publisher = rospy.Publisher(topic_name, topic_type, queue_size=0)
        source.subscribe(on_next=lambda msg: publisher.publish(msg))
        return source
    return _publish_to_topic

# Alternative implementation
def publish_to_topic_v2(topic_type, topic_name):
    def _publish_to_topic(source):
        publisher = rospy.Publisher(topic_name, topic_type, queue_size=0)
        def subscribe(observer, scheduler=None):
            def on_next(msg):
                publisher.publish(msg)
                observer.on_next(msg)
            return source.subscribe(
                on_next,
                observer.on_error,
                observer.on_completed,
                scheduler)
        return rx.create(subscribe)
    return _publish_to_topic

stream = rx.of(1,2,3,4,5)
a = stream.pipe(ops.map(lambda i: UInt64(data=i)),
                          # publish_to_topic_v2(UInt64, '/publish_to_topic'),  
                          publish_to_topic(UInt64, '/publish_to_topic') # comment out this line, and un-comment the line above for the alternative implementation.
                          )

# require a sleep, so that ROS publisher can initialize properly.
rospy.sleep(1.0)

# We subscribe here. No message is received with "publish_to_topic", while we do receive with "publish_to_topic_v2"
d = a.subscribe(on_next=lambda i: None, on_completed=None)

for i in range(0,2):
    a.subscribe(on_next=lambda i: None, on_completed=None)

print("Finished!")

Using the current implementation of the operator, this leads to the following output:

Finished!

Process finished with exit code 0

while the alternative output gives:

Finished!
data: 1
data: 2
data: 3
data: 4
data: 5
data: 1
data: 2
data: 3
data: 4
data: 5
data: 1
data: 2
data: 3
data: 4
data: 5

Process finished with exit code 0

In the current implementation, the events of the cold observable are send immediately when the pipeline is created, while i would have expected the events of the source observable to only be send at the time of subscription (to an observer). However, now the events are send immediately which 1) does not leave enough time for the ROS publisher to properly initialize. Hence, the events of the observable stream are never send. 2) When actually subscribing later on, we actually do not publishing any messages as the subscription inside the operator has already completed. I can imagine that ROS users might not run into this issues, as they will probably use (infinite) hot observable sources, like in your example where rxros2.interval(1.0) is used. Nevertheless, it could be confusing for rxpy users.

@henrik7264
Copy link
Collaborator

@bheijden - thanks a lot for all the information - I am overwhelmed. It will take me some time to process all this information, but I will return.

@efrenbon
Copy link

@bheijden It looks like you've implemented a Python version of RXROS. Since the original author only provided a C++ version here, are you willing to open source your implementation? Thank you very much!

I am building an elaborate pipeline for a reinforcement learning simulation framework based on rxpy and ROS. After an episode terminates, I need to re-initialize parts of the pipeline to clear the states of stateful rx operators (e.g. combine latest, zip, ReplaySubjects, etc...). Perhaps a flushing routine could be created that flushes these states, but that seems a lot more complicated than simply disposing of the old pipeline and re-initializing a new one with "fresh" rx operators.

In addition to the memory leak, the current implementation will also create a new publisher every time you reinitialize a pipeline that contains the operator.

I've re-implemented the operator myself in ROS 1 (but can easily be adapted to ROS 2) to address the aforementioned problems. First, this implementation avoids a subscription inside the operator, thus relieving the need to dispose of any subscriptions hence resolving the memory leak. Also, the operators takes an already initialized publisher as an argument, so that we avoid creating a new publisher every time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants