Skip to content

Repeatable ConnectionResetError  #463

@alexdukemiller

Description

@alexdukemiller

Hello,

We are using a Python program as an Inmarsat multi-client server to handle inbound and outbound data.

Our issue is that while the code works initially, it consistently fails after a similar number of iterations for querying inbound messages (between 385 to 440 times) -- we've tried polling periods ranging from 10 sec to 2 minutes with the same results. We have also tried to run the code with zeep versions 1.5.0, 1.6.0, and 2.0.0 with the same results.

We have a simple test case which reliably reproduces the problem. This test case periodically queries the web service for inbound messages, and on the 441st iteration, the OS detects and raises a ConnectionResetError. Before the error is raised the querying thread appears to block for ~2.5 minutes on an invocation of GetReturnMessages(). The Skywave support team has assured us they are seeing nothing abnormal in our interaction with the web service. We therefore suspect that we are using the zeep interface incorrectly and wonder if you could provide some advice.

Also, through experimentation we discovered that resetting the zeep client before the error is detected allows the server to run continuously. So, presently, our code periodically (every 100 iterations) deletes the instance of zeep client we are using and creates a new one. In the attached test case, you can produce the error by setting RESET_COUNT to 10000. Unfortunately, it takes about 1.25 hours to produce the error because we cannot query the web service more frequently than once every 10 seconds.

We really appreciate any assistance you can provide for isolating the source of this problem.

Info:

  1. zeep version == 2.0.0
  2. WSDL == https://isatdatapro.skywave.com/GLGW/GWServices_v1/Messages.wsdl
  3. Example script with a SkyWave test account that can be freely used:

`

Configurable constants

INBOUND_RX_PERIOD = 10 # Period, in seconds, at which the modem gateway is polled for inbound messages (>10 sec)
STATISTICS_PERIOD = 30 # Period, in seconds, at which statistics are printed
RESET_COUNT = 100 # Number of iterations between resets of the zeep client

import socket
import threading
import time
import zeep

class ZeepClient:

def __init__(self):

    # User account information for the http://isatdatapro.skywave.com gateway.
    # This data is taken from a SkyWave support email.
    self.account = "70000934"
    self.password = "password"
    self.modemid = "01097623SKY2C68"

    # Connect to the SkyWave server and initialize the SOAP interface.
    # Force HTTPS for the connection so username and password are not broadcast as plain text.
    print("SkwWave Server: https://isatdatapro.skywave.com/")
    self.client = zeep.Client('https://isatdatapro.swlab.ca:8143/GLGW/GWServices_v1/Messages.wsdl')

    # Get the SOAP type objects for everything we'll need.
    self.ForwardMessage_type = self.client.get_type('ns0:ForwardMessage')
    self.ArrayOfForwardMessage_type = self.client.get_type('ns0:ArrayOfForwardMessage')
    self.ReturnMessageFilter_type = self.client.get_type("ns0:ReturnMessageFilter")


def GetReturnMessages(self, highwater):
    # Get inbound messages from gateway and forward them to each registered client.

    ReturnMessageFilter = self.ReturnMessageFilter_type(StartUTC=highwater,
                                                   MobileID=self.modemid,
                                                   StartMessageID=0,
                                                   IncludeRawPayload=True,
                                                   IncludeType=False)

    result = self.client.service.GetReturnMessages(self.account, self.password, ReturnMessageFilter)

    print("Highwater: {}".format(highwater))

    # If there are new inbound messages from the gateway, send each one to each registered client
    if (result.Messages != None) and (result.ErrorID == 0):
        for rx_msg in result.Messages.ReturnMessage:
            self.handle_rx_msg(rx_msg)
    elif result.ErrorID != 0:
        print("Inbound message error:  {}".format(result.ErrorID))
    else:
        print("no rx messages")

    # Based on the server response, return the new "high watermark" for the next call.
    if (result.NextStartUTC == None):
        return highwater
    else:
        return result.NextStartUTC

def handle_rx_msg(self, msg):

    # Render the message payload as a hex string.
    raw_data = ''.join('{:02x} '.format(x) for x in msg.RawPayload)

    sin = msg.RawPayload[0]
    if (sin < 128):
        # Non-payload data that is not reported to mission control.  This is
        # typically a ping response, position response, or modem registration.
        # For now, use the Messenger application to decode the data.
        header = "<< RX  {:s}, {:s}, STATUS    :".format(msg.ReceiveUTC, msg.MobileID, sin)
        print(header, raw_data)
    else:
        # User payload data reported to mission control.  By our convention,
        # the SIN = 128 and MIN = 0 for all user payload data, and these
        # fields are not passed on to mission control.
        header = "<< RX  {:s}, {:s}, DATA[{:>4d}]:".format(msg.ReceiveUTC, msg.MobileID, len(msg.RawPayload))
        print(header, raw_data)


@classmethod
def reset_client(cls):
    global zeep_client
    del zeep_client.client
    del zeep_client
    zeep_client = ZeepClient()

class InboundMessages(threading.Thread):
def init(self):
threading.Thread.init(self)
self.name = "InboundMessages"

def run(self):
    # Listen for inbound messages from modem and send them to each registered client.

    global zeep_client

    # Get the start time for the high water mark.
    highwater = zeep_client.client.service.InfoUTC()
    print("SkyWave Server Version:", zeep_client.client.service.InfoVersion())
    print("SkyWave Server UTC Time:", highwater)

    iteration = 1
    reset_counter = 1

    while True:
        # Do not poll any more frequently than once every 10 seconds.
        time.sleep(INBOUND_RX_PERIOD)

        # Check for new messages and process them.
        try:
            highwater = zeep_client.GetReturnMessages(highwater)
        except OSError as err:
            print("Inbound: OSError: {}, resetting zeep client".format(err))
            ZeepClient.reset_client()

        print("Iteration: {}.".format(iteration))
        timestamp()

        iteration = iteration + 1

        if reset_counter == RESET_COUNT:
            print("Inbound:  Proactively resetting zeep client.")
            ZeepClient.reset_client()
            reset_counter = 1
        else:
            reset_counter = reset_counter + 1

def timestamp():
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(time.time()))
print("Current UTC time: {}".format(ts))

Create threads for the server.

inbound_thread = InboundMessages()

Create zeep client to connect with the Skywave gateway

zeep_client = ZeepClient()

Start all the threads.

inbound_thread.start()

Wait until someone kills the server

while True:
time.sleep(STATISTICS_PERIOD)
print("Main thread is alive.")
timestamp()`


Here is the error messages we get in the terminal window upon failure when RESET_COUNT is 10000 on the 441st iteration of the code provided above:

Highwater: 2017-06-09 17:25:36
no rx messages
Iteration: 439
Main thread is alive.
Highwater: 2017-06-09 17:25:36
no rx messages
Iteration: 440
Main thread is alive.
Main thread is alive.
Main thread is alive.
Main thread is alive.
Main thread is alive.
Exception in thread InboundMessages:
Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 386, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 382, in _make_request
    httplib_response = conn.getresponse()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 1331, in getresponse
    response.begin()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:


Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\adapters.py", line 438, in send
    timeout=timeout
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 649, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\util\retry.py", line 357, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\packages\six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 386, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\packages\urllib3\connectionpool.py", line 382, in _make_request
    httplib_response = conn.getresponse()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 1331, in getresponse
    response.begin()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\http\client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\socket.py", line 586, in readinto
    return self._sock.recv_into(b)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
requests.packages.urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "OC-test.py", line 113, in run
    highwater = zeep_client.GetReturnMessages(highwater)
  File "OC-test.py", line 51, in GetReturnMessages
    result = self.client.service.GetReturnMessages(self.account, self.password, ReturnMessageFilter)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\client.py", line 41, in __call__
    self._op_name, args, kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\wsdl\bindings\soap.py", line 112, in send
    options['address'], envelope, http_headers)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\transports.py", line 96, in post_xml
    return self.post(address, message, headers)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\zeep\transports.py", line 68, in post
    timeout=self.operation_timeout)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 565, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 518, in request
    resp = self.send(prep, **send_kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\sessions.py", line 639, in send
    r = adapter.send(request, **kwargs)
  File "C:\Users\amiller\AppData\Local\Programs\Python\Python36\lib\site-packages\requests\adapters.py", line 488, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))`

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions