Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rospy: Connection / Memory Leak, adding conn. to closed _SubscriberImpl #544

Closed
rethink-rlinsalata opened this issue Jan 8, 2015 · 9 comments
Labels

Comments

@rethink-rlinsalata
Copy link
Contributor

This is the third of a 3 piece series of memory/thread leak issues (#520, #533) I've been investigating, that mostly arise from race conditions during intensive barrages of rapidly creating new subscriptions and unsubscribing in a rospy node to topics coming from an already heavily-loaded (robot) node.

A problem can occur where a subscriber / connection is "leaked", such that the objects that run the connection are orphaned and never cleaned up, and the socket connection is still held open by the leaked thread, which is continuing to run the TCP receive / callback loop, even though the originating "Subscriber" is "closed" or gone. Furthermore, new Subscribers to the same Topic and Publisher will not re-use those leaked, underlying connections like they normally would (since they are orphaned/leaked), thus leading to an ever increasing process memory in the rospy process that is Subscribing _AND_ in the process of the Publisher Node on the other end of the TCP socket connection (even a roscpp-based node) -- until one or both are killed by the kernel oom_killer.

The more technical description: The two objects in charge of the connection are the underlying, per-topic rospy.topics._SubscriberImpl and the TCP connection object rospy.impl.tcpros_base.TCPROSTransport. A major part of the problem seems to be that somehow (race condition?) there is a new TCPROSTransport connection object being created and added to the <_SubImpl>.connections[] array of a _SubImpl that has already been closed (_SubImpl.closed==True).

For the most direct evidence, we can see the bad/bug state by adding two log debug checks for the _SubImpl.closed flag in the _SubImpl.add_connections() and _SubImpl.receive_callback() functions, as seen in the diff here:
_SubscriberImpl.add_connection():

diff --cc clients/rospy/src/rospy/topics.py
index 43fab7a,43fab7a..803c23c
--- a/clients/rospy/src/rospy/topics.py
+++ b/clients/rospy/src/rospy/topics.py
@@@ -365,6 -365,6 +386,15 @@@
          """
          rospyinfo("topic[%s] adding connection to [%s], count %s"%(self.resolved_name, c.endpoint_id, len(self.connections)))
          with self.c_lock:
++            #LEAK-DBG
++            if self.closed:
++                rospyerr((
++                    "LEAK-ADDING topic[{}] adding connection to [{}],"
++                    " but _SubImpl ALREADY closed... maybe we shouldn't add a"
++                    " connection... with a current count {},").format(
++                    self.resolved_name, c.endpoint_id, len(self.connections)
++                ))
++
              # c_lock is to make add_connection thread-safe, but we
              # still make a copy of self.connections so that the rest of the
              # code can use self.connections in an unlocked manner

_SubscriberImpl.receive_callback():

diff --cc clients/rospy/src/rospy/topics.py
index 43fab7a,43fab7a..803c23c
--- a/clients/rospy/src/rospy/topics.py
+++ b/clients/rospy/src/rospy/topics.py
@@@ -698,6 -698,6 +728,15 @@@ class _SubscriberImpl(_TopicImpl)
          @param msgs: message data
          @type msgs: [L{Message}]
          """
++        #LEAK-DBG
++        if self.closed and len(self.connections) > 0:
++            rospywarn((
++                "LEAK-CB topic[{}] callback connection to [{}],"
++                " but _SubImpl ALREADY closed... "
++                " with a current count {},").format(
++                self.resolved_name, connection.endpoint_id, len(self.connections)
++            ))
++        
          # save reference to avoid lock
          callbacks = self.callbacks
          for msg in msgs:

Below is a more detailed walkthrough, but one of the major mysteries still is why and when the _SubImpl is getting closed, and from where and how is it happening in a way that allows this new connection to be added.


Details:

One of the relevant sections of code is the create_transport() function [A] in tcpros_pubsub.py (

sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)
):

        sub = rospy.impl.registration.get_topic_manager().get_subscriber_impl(resolved_name)

        #Create connection 
        protocol = TCPROSSub(resolved_name, sub.data_class, \
                             queue_size=sub.queue_size, buff_size=sub.buff_size,
                             tcp_nodelay=sub.tcp_nodelay)
        conn = TCPROSTransport(protocol, resolved_name)
        conn.set_endpoint_id(pub_uri);

        t = threading.Thread(name=resolved_name, target=robust_connect_subscriber, args=(conn, dest_addr, dest_port, pub_uri, sub.receive_callback,resolved_name))
        # don't enable this just yet, need to work on this logic
        #rospy.core._add_shutdown_thread(t)
        t.start()

        # Attach connection to _SubscriberImpl
        if sub.add_connection(conn): #pass tcp connection to handler
            return 1, "Connected topic[%s]. Transport impl[%s]"%(resolved_name, conn.__class__.__name__), dest_port
        else:
            conn.close()
            return 0, "ERROR: Race condition failure: duplicate topic subscriber [%s] was created"%(resolved_name), 0
  • You can see that the reference to the _SubImpl instance (I'll call <sub>) is captured near the top from the topic manager.
  • Then the new TCPROSTransport object is created (I'll call <conn>) and a separate thread is spun-up to establish the TCP socket connection after which at some point that thread will continue off on its own running the TCP msg <conn>.receive_loop() [B]. This connection thread was passed a reference to the <sub>.receive_callback() function which is called by the <conn>.receive_loop() and is 1 of 2 references to the _SubImpl object being held on to by the <conn> object.
  • Back in the original function that created the <conn> object, the function continues on after spawning off the thread, and adds the <conn> object to the _SubImpl object's <sub>.connections[] array, with a call to <sub>.add_connection(<conn>) [C].

Now at some point while all this is happening the _SubImpl object is closed -- (or potentially anytime after that first reference to <sub> is obtained, but before the <sub>.add_connection(<conn>) method is executed). _I could not figure out why or how this happens_ but I'm fairly sure about the timing. This means that the _SubImpl instance has no parent Subscriber (it was deleted), and the TopicManager no longer has a reference to it. The <sub>.connections[] array and the <sub>.callbacks[] have both been cleared out (len 0) and the <sub>.closed = True flag has been set.

However, after that first reference to <sub> was obtained at the top, at no other time does the <conn> nor the rest of the connection code check or care about the state of the _SubImpl object. There also appears to be nothing locking out the connection creation+callback+addition processes and the _SubImpl closing processes from interfering with each other. This means when the <sub>.add_connection(<conn>) function is run, the perfectly happy <conn> is being added to a _SubImpl object that is supposed to be closed = True but never checks the state, so you end up with a connections array of length 1 <sub>.connections[<conn>]. And when the TCP callback loop thread calls the <sub>.receive_callback() function, it just returns since there are zero <sub>.callbacks[] to iterate over, and neither the loop nor the callback check the state of the _SubImpl; the loop only checks the state of the socket, ros, and <conn>.done (which is False).

You can see a sample of the two objects from one of the leaked threads during a test run here (obtained with dowser):


Identifying the Bug State

Note that rosnode info (and the likes) will not show any information about the leaked topic connections, as they think they are closed. However, the leaked connections themselves can be seen:

  • using netstat, and identifying them by the PID of the rospy subscriber node, and either the machine name if you're using two machines (like a dev machine and a robot), or the port number of the topic endpoint.
    • (ex: $ netstat --numeric-ports -peev | grep <rospy subscriber node's PID>)
  • or even just ps on the command line by looking at the rospy node's Light-Weight Processes (LWP's, or threads) with timestamps that are "old" and don't go away (during Subscriber creation and unsubscriptions).
    • (ex: $ ps -L -p <rospy PID> -o lwp,lstart,stat,wchan)
@rethink-rlinsalata
Copy link
Contributor Author

DISCLAIMER: The internal use case where this series of 3 leak issues arose involves RobotWebTools/rosbridge_suite as the rospy node doing the subscribing and generating the leaked connections (the websocket_server, specifically). However, this Ticket is for the leakable state that rospy allows to happen, whether or not it is caused or just exposed by the rest of the test case. Just like the other two issues that were fixed in rospy, it is probably a combination of factors. The original, most reliable internal use case unfortunately touches a number of complicated components and uses an internal piece of code. My attempts to isolate the commands that trigger the issue and create a simplified, cut-out test script have shrunk the relevant code/cases down a lot, but still cannot reliably reproduce the code anywhere near to using a running, real robot as the publisher and/or using the original internal code interface classes.

Thus, this is why I don't have a good test script for the issue.

@rethink-rlinsalata
Copy link
Contributor Author

FYI: ADDITIONAL debug tools
I'm putting more detailed information about debug tools I used during these 3 issues here (really just for the sake of documenting them next time there's a python leak...)

Note that rosnode info (and the likes) will not show any information about the leaked topic connections, as they think they are gone. However, the leaked connections themselves can be seen:

  • using netstat: you can identify them by PID and port number or if using two computers (like a dev machine and a robot), by machine name too. (ex: $ netstat --numeric-ports -peev | grep <rospy subscriber node's PID>).
  • Or, you can see the leaked threads and objects that run them using dowser, by adding it to the top of your rospy sub program.
  • or gdb python, which is nice because it can be independently attached to an already running python process without interrupting: sudo apt-get install python-dbg and just sudo gdb python <PID>.
  • the psutil package was also useful early on while I was debugging these issues.
  • or even just ps on the command line by looking at the rospy node's Light-Weight Processes (LWP's) with timestamps that are "old" and don't go away (during Subscriber creation and unsubscriptions) (ex: $ ps -L -p <rospy PID> -o lwp,lstart,stat,wchan).

@rethink-rlinsalata
Copy link
Contributor Author

Oh, this also did not (noticeably) happen before we upgraded everything from 32-bit Groovy to 64-bit Indigo, but that could be another red-herring as we have seen many process loads increase after the conversion, and so the race condition might have just been more likely to present itself now that the system is more heavily loaded.

@wjwwood
Copy link
Member

wjwwood commented Jan 8, 2015

Thanks @rethink-rlinsalata for this detailed bug report, we'll see what we can do about it.

rethink-rlinsalata added a commit to rethink-rlinsalata/ros_comm that referenced this issue Jan 8, 2015
Protect against race condition where a new `TCPROSTransport`
connection is created and added to a `_SubscriberImpl` that is
already closed. Checks and returns False if closed so transport
creator can shutdown socket connection. Fixes issue ros#544.
@dirk-thomas
Copy link
Member

I finally have read through the code in more detail. I could imagine the following order of event:

  • the thread gets started: t.start()
  • the thread will run TCPROSTransport.receive_loop and from there could call close() on the transport anytime
  • and this happened before adding the connection in: sub.add_connection(conn)

It would also describe why it only rarely happens. Usually the thread will not immediately start after invoking start() but when Python first switches context. Under heavy load the chances that Python will actually start the thread before continuing with the add_connection() call are significantly higher.

If my guess is right I think the best approach is to delay starting the thread until after the connection has been added (#603). Could you please try if this makes it work in your complex use case?

dirk-thomas pushed a commit that referenced this issue Apr 16, 2015
Protect against race condition where a new `TCPROSTransport`
connection is created and added to a `_SubscriberImpl` that is
already closed. Checks and returns False if closed so transport
creator can shutdown socket connection. Fixes issue #544.
@dirk-thomas
Copy link
Member

Should be addressed by #603.

I moved forward and merged the patch to include it in the upcoming patch release. Otherwise it would likely take a long time until the next patch release.

@rethink-rlinsalata Please comment if you had a chance to test the change.

@krixkrix
Copy link

I am hitting this:

[rospy.internal][ERROR] 2018-03-16 13:10:58,758: ERROR: Race condition failure adding connection to closed subscriber
If you run into this please comment on https://github.com/ros/ros_comm/issues/544

Using ROS kinetic.
It is reproducible so let me know if you want details.

@dirk-thomas
Copy link
Member

dirk-thomas commented Mar 17, 2018

It is reproducible so let me know if you want details.

@krixkrix Please post these details.

@krixkrix
Copy link

Hmmm, turns out to be harder to reproduce today than I imagined. The error appeared when having a publisher and subscriber in two different threads in the same node, accessing the same topic. The publisher would disconnect, while the subscriber was connected and still waiting for its first message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants