Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hangs on closing connection if nsqd socket is closed during message sending #33

Closed
Ivashkaization opened this issue Apr 2, 2021 · 2 comments · Fixed by #34
Closed

Comments

@Ivashkaization
Copy link

Environment Versions

  1. Python version: Python 3.8.5
  2. ansq version: 0.0.17

Originally was reproduced in kubectl which closes pipe on pod deletion.

Steps to reproduce

  1. Run nsqd container
    $ docker run -it -p 4150:4150 nsqio/nsq sh
  2. Start nsqd inside container:
    # nsqd
  3. Run the following script
import json
import time
import asyncio
from ansq import open_connection


async def main():
    nsq = await open_connection(debug=True)

    message = json.dumps([0] * 1000000)

    while True:
        await nsq.pub('sample_topic', message)
        await asyncio.sleep(10) # <-- stop nsqd here while message not fully sent


if __name__ == '__main__':
    asyncio.run(main())
  1. Stop nsqd in container using ^c (to simulate broken pipe error)
  2. The script hangs

Expected result

Script should raise and error and fail (since only one reconnection is possible, and pipe is already closed)

Actual result
Traceback after stopping with ^c

2021-04-02 15:21:06,209 - INFO - ansq localhost:4150.0: Lost connection to NSQ
2021-04-02 15:21:07,211 - DEBUG - ansq localhost:4150.0: Reconnecting to tcp://localhost:4150...
2021-04-02 15:21:07,211 - DEBUG - ansq localhost:4150.0: Connection tcp://localhost:4150 is closing...
^CTraceback (most recent call last):
  File "reproduce.py", line 18, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
    self._run_once()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.8/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
2021-04-02 15:21:20,637 - ERROR - asyncio: Task exception was never retrieved
future: <Task finished name='Task-3' coro=<NSQConnection.reconnect() done, defined at /home/oleg/work/open-source/ansq/ansq/tcp/connection.py:44> exception=BrokenPipeError(32, 'Broken pipe')>
Traceback (most recent call last):
  File "/home/oleg/work/open-source/ansq/ansq/tcp/connection.py", line 62, in reconnect
    await self._do_close(change_status=False)
  File "/home/oleg/work/open-source/ansq/ansq/tcp/connection.py", line 126, in _do_close
    await self._writer.wait_closed()
  File "/usr/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 933, in _write_ready
    n = self._sock.send(self._buffer)
BrokenPipeError: [Errno 32] Broken pipe
@Ivashkaization
Copy link
Author

Ivashkaization commented Apr 2, 2021

I've researched it a bit, as asyncio docs says about StreamWriter.write method:

The method attempts to write the data to the underlying socket immediately. If that fails, the data is queued in an internal write buffer until it can be sent.

Also, wait_closed is writing data left in buffer before exiting

However, in library code any exception in wait_closed is ignored:

ansq/ansq/tcp/connection.py

Lines 122 to 128 in cb4c722

assert self._writer is not None
try:
self._writer.close()
if sys.version_info >= (3, 7):
await self._writer.wait_closed()
finally:
pass

And code intended to unhang command located below is never reached

ansq/ansq/tcp/connection.py

Lines 130 to 133 in cb4c722

for future, callback in self._cmd_waiters:
if not future.cancelled():
future.set_exception(ConnectionClosedError("Connection is closed"))
callback is not None and callback(None)

Ivashkaization pushed a commit to Ivashkaization/ansq that referenced this issue Apr 2, 2021
@Ivashkaization
Copy link
Author

Suggested fix at #34

shalakhin added a commit that referenced this issue Apr 21, 2021
…osing-connection-if-nsqd-socket-is-closed-during-message-sending

Catch `writer.wait_closed()` exception to avoid command hanging. #33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant