Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect publisher-based logic can cause non-terminating leaked threads on bad subscribers #533

Closed
rethink-rlinsalata opened this issue Nov 19, 2014 · 9 comments
Labels

Comments

@rethink-rlinsalata
Copy link
Contributor

The robust_connect_subscriber() can get in a state where the local subscriber side of the connection becomes invalid in some way, but the publisher still exists - which leads to never-ending attempts to reconnect.

The reconnect logic in robust_connect_subscriber() of tcpros_pubsub.py overwrites the flag to attempt reconnect based solely on the state of the target publisher, which can hide "FATAL" Exception errors from "Unknown errors" in the function's call to TCPROSTransport().connect() of tcpros_base.py that close the TCPROSTransport instance and according to the Exception handling shouldn't reconnect ("# FATAL: no reconnection as error is unknown"). This leads to a 'leaked' thread that starts a never-ending loop of always failing to connect, sleeping for an exponentially increasing amount of time, and then over-riding the 'no reconnection' state and always trying to reconnect - and the thread is never cleaned up, even when the parent Subscriber that started it goes away.


The chain of events/calls goes like this:

    1. In tcpros_pubsub.py, TCPROSHandler().create_transport() starts the thread with the target of robust_connect_subscriber(...), at ~L250.
    1. robust_connect_subscriber() starts a while loop based on if the TCPROSTransport instance (conn) is not done, and then does a try/except wrapped call to connect(), at ~L168
    interval = 0.5
    while conn.socket is None and not conn.done and not rospy.is_shutdown():
        try:
            conn.connect(dest_addr, dest_port, pub_uri, timeout=60.)
        except rospy.exceptions.TransportInitError as e:
            rospyerr("unable to create subscriber transport: %s.  Will try again in %ss", e, interval)
            interval = interval * 2
            time.sleep(interval)

            # check to see if publisher state has changed
            conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)
    1. In tcpros_base.py, if the connect() function throws a (standard) Exception type exception, it is caught in the following lines, logged as an "Unknown error" and then the TCPROSTransport instance is closed (which sets the done variable to True and various children to None), with a comment indicating there should be no reconnection. ~L563-569
    def connect(..):
        ...
        except Exception as e:
            #logerr("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, str(e)))
            rospywarn("Unknown error initiating TCP/IP socket to %s:%s (%s): %s"%(dest_addr, dest_port, endpoint_id, traceback.format_exc()))            
    
            # FATAL: no reconnection as error is unknown
            self.close()
            raise TransportInitError(str(e)) #re-raise i/o error
    1. However, this request to NOT reconnect is ignored, because the Exception is turned into a TransportInitError before being re-raised, and more so, in the exception handling for the TransportInitError the conn.done flag is overwritten based only on the state of the publisher in this line of tcpros_pubsub.py robust_connect_subscriber() (~L175)
            # check to see if publisher state has changed
            conn.done = not check_if_still_publisher(resolved_topic_name, pub_uri)

    https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_pubsub.py#L175

    1. Every subsequent iteration it tries to reconnect after sleeping (0.5*2^i secs), it always fails in the read_header() step of the connect() function when it tries to access a member of a None valued protocol, self.protocol.buff_size (~ tcpros_base.py#L618), printing the following error:
    [rospy.internal][WARNING] 2014-11-11 20:39:42,119: Unknown error initiating TCP/IP socket to pv1106:39758 (http://pv1106:34792/): Traceback (most recent call last):
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 557, in connect
        self.read_header()
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 618, in read_header
        self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
    AttributeError: 'NoneType' object has no attribute 'buff_size'
    
    [rospy.internal][INFO] 2014-11-11 20:39:42,119: topic[/robot/limb/left/endpoint_state] removing connection to http://pv1106:34792/
    [rospy.internal][ERROR] 2014-11-11 20:39:42,119: unable to create subscriber transport: 'NoneType' object has no attribute 'buff_size'.  Will try again in 64.0s
    

Multiple instances of these loose reconnect threads can exists in the same python process for the same topic. Here are some samples of the initial error messages when the loop first begins, in four unique threads (Note that there are plenty of other subscribers that worked fine; this is an intermittent problem):

    [rospy.internal][WARNING] 2014-11-11 20:03:14,955: Unknown error initiating TCP/IP socket to pv1106:39758 (http://pv1106:34792/): Traceback (most recent call last):
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 558, in connect
        self.local_endpoint = self.socket.getsockname()
    AttributeError: 'NoneType' object has no attribute 'getsockname'

    [rospy.internal][ERROR] 2014-11-11 20:03:14,955: unable to create subscriber transport: 'NoneType' object has no attribute 'getsockname'.  Will try again in 0.5s


    [rospy.internal][WARNING] 2014-11-11 20:37:34,963: Unknown error initiating TCP/IP socket to pv1106:39758 (http://pv1106:34792/): Traceback (most recent call last):
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 557, in connect
        self.read_header()
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 618, in read_header
        self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/tools/rosgraph/src/rosgraph/network.py", line 360, in read_ros_handshake_header
        d = sock.recv(buff_size)
    error: [Errno 104] Connection reset by peer

    [rospy.internal][ERROR] 2014-11-11 20:37:34,964: unable to create subscriber transport: [Errno 104] Connection reset by peer.  Will try again in 0.5s


    [rospy.internal][WARNING] 2014-11-11 20:40:51,936: Unknown error initiating TCP/IP socket to pv1106:39758 (http://pv1106:34792/): Traceback (most recent call last):
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 557, in connect
        self.read_header()
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 618, in read_header
        self._validate_header(read_ros_handshake_header(sock, self.read_buff, self.protocol.buff_size))
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/tools/rosgraph/src/rosgraph/network.py", line 362, in read_ros_handshake_header
        raise ROSHandshakeException("connection from sender terminated before handshake header received. %s bytes were received. Please check sender for additional details."%b.tell())
    ROSHandshakeException: connection from sender terminated before handshake header received. 0 bytes were received. Please check sender for additional details.

    [rospy.internal][ERROR] 2014-11-11 20:40:51,936: unable to create subscriber transport: connection from sender terminated before handshake header received. 0 bytes were received. Please check sender for additional details..  Will try again in 0.5s


    [rospy.internal][WARNING] 2014-11-11 20:42:13,706: Unknown error initiating TCP/IP socket to pv1106:39758 (http://pv1106:34792/): Traceback (most recent call last):
      File "/data/users/rlinsalata/dev/desk/bugs/10848_mem-leak/rosbridge/catkin_ws/src/ros_comm/clients/rospy/src/rospy/impl/tcpros_base.py", line 555, in connect
        self.socket.connect((dest_addr, dest_port))
      File "/usr/lib/python2.7/socket.py", line 224, in meth
        return getattr(self._sock,name)(*args)
    error: [Errno 104] Connection reset by peer

    [rospy.internal][ERROR] 2014-11-11 20:42:13,706: unable to create subscriber transport: [Errno 104] Connection reset by peer.  Will try again in 0.5s

These lines in robust_connect_subscriber() are also not the only places I've seen this try/sleep*2/try again code (e.g. _reconnect() in tcpros_base.py) however, it is the only place I've seen the conn.done being overwritten and logging that it is going to be sleeping -- so I'm not sure if any other sleeps are going on forever too.

@dirk-thomas
Copy link
Member

Thanks for the detailed report. Is there a reproducible way to trigger this behavior? I would like to be able to at least verify that a potential patch improves the behavior?

@dirk-thomas
Copy link
Member

If I follow you description based on just reading the code I would propose the following approach:

In the file tcpros_pubsub.py in the function robust_connect_subscriber when catching the TransportInitError it needs to check if conn has been closed due to an unknown error.

It could do that right after the exception has been caught (after this line:

except rospy.exceptions.TransportInitError as e:
) (see branch issue533: https://github.com/ros/ros_comm/tree/issue533):

if conn.protocol is None:
    conn.done = True
    break

Checking the protocol member for None is not great but do you agree that this should fix the described problem?

Anyway it would be nice to "confirm" that this actually fixes it...

@dirk-thomas
Copy link
Member

@esteve @tfoote @wjwwood Please review the proposed patch. Since I don't have any reproducible test I couldn't verify its effectiveness.

@esteve
Copy link
Member

esteve commented Dec 15, 2014

+1

@wjwwood
Copy link
Member

wjwwood commented Dec 15, 2014

+1, it's hard to know what the real consequences are with just the description of the issue and no tests, but it seems like a straight forward description of the issue and matching change.

@tfoote
Copy link
Member

tfoote commented Dec 15, 2014

+0 what william said

@dirk-thomas
Copy link
Member

If nobody can come up with a reproducible example I will go ahead and merge the proposed fix (#538) since it is expected to address the described problem and I don't see a way it can be harmful.

@rethink-rlinsalata
Copy link
Contributor Author

Thanks for your fix and looking at this. I've tested the fix (via latest indigo-devel) most of the morning and it appears to have resolved the bug. I will continue to test overnight, but I was able to see many of the same errors that previously triggered the "Unknown error initiating..." and none of them started the problematic infinite sleep loop, nor were the direct cause of any leaked threads I could see.

Checking for an (unknown error) closed conn in the exception handling there looks good - I wasn't sure what member/flag to check; the protocol member seems like a good candidate.

My only concern in the fix is the part reaching down and setting the done flag directly. Since the variable is used as a check for the close() function whether it should do clean-up, and close() is called from multiple places in the Transport classes, it would seem that perhaps the line setting conn.done = True should rather call conn.close(); or assume that conn.done was already set during the call to close() in the line preceding where the exception is initially raised in tcpros_base.py (~L568 --

self.close()
raise TransportInitError(str(e)) #re-raise i/o error
)

And yes, regarding the lack of a test script, unfortunately, while definitely reproducible, this bug is highly intermittent, involves connections between multiple entities, and seems to be best triggered by some internal code and scenarios that I've been doing my best to strip down to the bare basics.... but alas, my closest efforts seem to just raise more questions (and take significantly longer to trigger the bug) than they help towards simplifying the problem.

@gitunit
Copy link

gitunit commented Dec 16, 2015

im having a similar issue. in my case it looks like a problem with rqt_reconfigure, though there was no user interaction when the error happened. edit: probably happened due to threading, i've set a lock to a part of my node and it looks like ok now.

[rospy.client][INFO] 2015-12-16 17:56:16,148: init_node, name[/rqt_reconfigure], pid[1440]
[xmlrpc][INFO] 2015-12-16 17:56:16,148: XML-RPC server binding to 0.0.0.0:0
[xmlrpc][INFO] 2015-12-16 17:56:16,148: Started XML-RPC server [http://MYPCNAME:34679/]
[rospy.init][INFO] 2015-12-16 17:56:16,148: ROS Slave URI: [http://MYPCNAME:34679/]
[rospy.impl.masterslave][INFO] 2015-12-16 17:56:16,149: _ready: http://MYPCNAME:34679/
[rospy.registration][INFO] 2015-12-16 17:56:16,149: Registering with master node http://localhost:11311
[xmlrpc][INFO] 2015-12-16 17:56:16,149: xml rpc node: starting XML-RPC server
[rospy.init][INFO] 2015-12-16 17:56:16,249: registered with master
[rospy.rosout][INFO] 2015-12-16 17:56:16,249: initializing /rosout core topic
[rospy.rosout][INFO] 2015-12-16 17:56:16,250: connected to core topic /rosout
[rospy.simtime][INFO] 2015-12-16 17:56:16,251: /use_sim_time is not set, will not subscribe to simulated time [/clock] topic
[rospy.internal][INFO] 2015-12-16 17:56:16,527: topic[/rosout] adding connection to [/rosout], count 0
[rospy.internal][INFO] 2015-12-16 17:56:27,717: topic[/my_program/parameter_updates] adding connection to [http://MYPCNAME:52821/], count 0
[rospy.internal][INFO] 2015-12-16 17:56:27,718: topic[/my_program/parameter_descriptions] adding connection to [http://MYPCNAME:52821/], count 0
[rospy.internal][WARNING] 2015-12-16 18:41:19,314: Unknown error initiating TCP/IP socket to MYPCNAME:44943 (http://MYPCNAME:52821/): Traceback (most recent call last):
  File "/opt/ros/indigo/lib/python2.7/dist-packages/rospy/impl/tcpros_base.py", line 555, in connect
    self.socket.connect((dest_addr, dest_port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

[rospy.internal][INFO] 2015-12-16 18:41:19,314: topic[/my_program/parameter_descriptions] removing connection to http://MYPCNAME:52821/
[rospy.internal][WARNING] 2015-12-16 18:41:19,315: Unknown error initiating TCP/IP socket to MYPCNAME:44943 (http://MYPCNAME:52821/): Traceback (most recent call last):
  File "/opt/ros/indigo/lib/python2.7/dist-packages/rospy/impl/tcpros_base.py", line 555, in connect
    self.socket.connect((dest_addr, dest_port))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused

[rospy.internal][INFO] 2015-12-16 18:41:19,315: topic[/my_program/parameter_updates] removing connection to http://MYPCNAME:52821/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants