Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader doesn't reconnect if a loop has been blocked for a while #62

Closed
atugushev opened this issue Aug 10, 2022 · 6 comments · Fixed by #97
Closed

Reader doesn't reconnect if a loop has been blocked for a while #62

atugushev opened this issue Aug 10, 2022 · 6 comments · Fixed by #97
Labels
bug Something isn't working

Comments

@atugushev
Copy link
Collaborator

atugushev commented Aug 10, 2022

Environment Versions

  1. Python version: Python 3.8.10
  2. ansq version: 0.1.0

Steps to reproduce

  1. Run the following script
import time
import ansq
import asyncio
import uuid


async def main():
    topic = "example_topic" + str(uuid.uuid1())[:4]

    reader = await ansq.create_reader(
        topic=topic,
        channel="example_channel",
        connection_options=ansq.ConnectionOptions(debug=True)
    )

    writer = await ansq.create_writer()
    for i in range(20):
        await writer.pub(
            topic=topic,
            message=f"Hello, world ({i})!",
        )
    await writer.close()

    async for message in reader.messages():
        print(f">>>>> Message: {message.body}")
        await message.fin()
        time.sleep(300)  # <------------- block the loop

    await reader.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
  1. Wait for 10 minutes for the following traceback:
ERROR - ansq localhost:4150.0: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/private/var/folders/l0/lnq1ghps5yqc4vkgszlcz92m0000gp/T/tmp.HaRNonRx/.venv/lib/python3.8/site-packages/ansq/tcp/connection.py", line 180, in _do_close
    await self._writer.wait_closed()
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/private/var/folders/l0/lnq1ghps5yqc4vkgszlcz92m0000gp/T/tmp.HaRNonRx/.venv/lib/python3.8/site-packages/ansq/tcp/connection.py", line 357, in _read_data_task
    data = await self._reader.read(consts.MAX_CHUNK_SIZE)
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
2022-08-10 18:06:32,053 - DEBUG - ansq localhost:4150.0: Connection tcp://localhost:4150 is closed
  1. the script hangs
script logs
2022-08-10 17:56:31,972 - DEBUG - ansq localhost:4150.0: Connect to tcp://localhost:4150 established
2022-08-10 17:56:31,972 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'IDENTIFY\n\x00\x00\x00\x94{"deflate": false, "deflate_level": 6, "feature_negotiation": true, "heartbeat_interval": 30000, "sample_rate": 0, "snappy": false, "tls_v1": false}'
2022-08-10 17:56:31,973 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'{"max_rdy_count":2500,"version":"1.2.1","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":6,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_requir...
2022-08-10 17:56:31,973 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'SUB example_topic00d1 example_channel\n'
2022-08-10 17:56:31,974 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
2022-08-10 17:56:31,974 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'RDY 1\n'
2022-08-10 17:56:31,995 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQMessageSchema frame_type:FrameType.MESSAGE, body:b'Hello, world (0)!', timestamp:1660146991995145000, attempts:1, id:1122f8523f125000>
>>>>> Message: b'Hello, world (0)!'
2022-08-10 17:56:32,018 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'FIN 1122f8523f125000\n'

2022-08-10 18:01:32,028 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQMessageSchema frame_type:FrameType.MESSAGE, body:b'Hello, world (1)!', timestamp:1660146992014222000, attempts:1, id:1122f85243d25000>
>>>>> Message: b'Hello, world (1)!'
2022-08-10 18:01:32,029 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'FIN 1122f85243d25000\n'

2022-08-10 18:06:32,039 - ERROR - ansq localhost:4150.0: Connection tcp://localhost:4150 is closing due an error: [Errno 32] Broken pipe
2022-08-10 18:06:32,039 - ERROR - ansq localhost:4150.0: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/private/var/folders/l0/lnq1ghps5yqc4vkgszlcz92m0000gp/T/tmp.HaRNonRx/.venv/lib/python3.8/site-packages/ansq/tcp/connection.py", line 180, in _do_close
    await self._writer.wait_closed()
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/private/var/folders/l0/lnq1ghps5yqc4vkgszlcz92m0000gp/T/tmp.HaRNonRx/.venv/lib/python3.8/site-packages/ansq/tcp/connection.py", line 357, in _read_data_task
    data = await self._reader.read(consts.MAX_CHUNK_SIZE)
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
    await self._waiter
  File "/Users/albert/.pyenv/versions/3.8.10/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
2022-08-10 18:06:32,053 - DEBUG - ansq localhost:4150.0: Connection tcp://localhost:4150 is closed
nsqd logs
[nsqd] 2022/08/10 17:56:31.971925 INFO: TCP: new client([::1]:52407)
[nsqd] 2022/08/10 17:56:31.972375 INFO: CLIENT([::1]:52407): desired protocol magic '  V2'
[nsqd] 2022/08/10 17:56:31.973156 INFO: [[::1]:52407] IDENTIFY: {ClientID: Hostname: HeartbeatInterval:30000 OutputBufferSize:0 OutputBufferTimeout:0 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent: MsgTimeout:0}
[nsqd] 2022/08/10 17:56:31.974118 INFO: TOPIC(example_topic00d1): created
[nsqd] 2022/08/10 17:56:31.974181 INFO: TOPIC(example_topic00d1): new channel(example_channel)
[nsqd] 2022/08/10 17:56:31.974227 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2022/08/10 17:56:31.976464 INFO: TCP: new client([::1]:52408)
[nsqd] 2022/08/10 17:56:31.976825 INFO: CLIENT([::1]:52408): desired protocol magic '  V2'
[nsqd] 2022/08/10 17:56:31.976945 INFO: [[::1]:52408] IDENTIFY: {ClientID: Hostname: HeartbeatInterval:30000 OutputBufferSize:0 OutputBufferTimeout:0 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent: MsgTimeout:0}
[nsqd] 2022/08/10 17:56:31.995201 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2022/08/10 17:56:32.018776 INFO: PROTOCOL(V2): [[::1]:52408] exiting ioloop
[nsqd] 2022/08/10 17:56:32.018807 INFO: PROTOCOL(V2): [[::1]:52408] exiting messagePump

[nsqd] 2022/08/10 17:57:32.021987 INFO: PROTOCOL(V2): [[::1]:52407] exiting ioloop
[nsqd] 2022/08/10 17:57:32.022040 ERROR: client([::1]:52407) - failed to read command - read tcp [::1]:4150->[::1]:52407: i/o timeout
[nsqd] 2022/08/10 17:57:32.022113 INFO: PROTOCOL(V2): [[::1]:52407] exiting messagePump

Additional context

#33

@atugushev atugushev added the bug Something isn't working label Aug 10, 2022
@pndiku
Copy link

pndiku commented Apr 20, 2023

Any fix for this? Is the project dead?

@shalakhin
Copy link
Contributor

shalakhin commented Apr 20, 2023 via email

@atugushev
Copy link
Collaborator Author

We also use this library actively. In fact, this particular "steps to reproduce" no longer reproduce the bug.

@pndiku
Copy link

pndiku commented Apr 20, 2023

The bug still occurs, which is why I asked. I use it extensively and my workers were losing NSQ connectivity and not auto reconnecting.

Hadn't seen much activity on the project but I accept my question did come across negative.

@atugushev
Copy link
Collaborator Author

@pndiku could you send a PR with a failing test? I'll help with the fix.

@pndiku
Copy link

pndiku commented Apr 20, 2023

The code sample in this issue breaks for me if I leave it running for 3-4 iterations of the loop.

2023-04-20 12:30:47,470 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQMessageSchema frame_type:FrameType.MESSAGE, body:b'Hello, world (1)!', timestamp:1681982747370192841, attempts:1, id:125946562940d000>
>>>>> Message: b'Hello, world (1)!'
2023-04-20 12:30:47,470 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'FIN 125946562940d000\n'
2023-04-20 12:35:47,570 - INFO - ansq localhost:4150.0: Lost connection to NSQ tcp://localhost:4150
2023-04-20 12:35:48,571 - DEBUG - ansq localhost:4150.0: Reconnecting to tcp://localhost:4150...
2023-04-20 12:35:48,572 - DEBUG - ansq localhost:4150.0: Connect to tcp://localhost:4150 established
2023-04-20 12:35:48,572 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'IDENTIFY\n\x00\x00\x00\xf1{"client_id": "niobe", "hostname": "niobe", "user_agent": "ansq/0.2.0", "deflate": false, "deflate_level": 6, "feature_negotiation": true, "heartbeat_interval": 30000, "sample_rate": 0, "snappy": false, "tls_v1": false, "msg_timeout": 60000}'
2023-04-20 12:35:48,572 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'{"max_rdy_count":2500,"version":"1.2.0","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":6,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeo...', is_ok:False>
2023-04-20 12:35:48,572 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'SUB example_topic5540 example_channel\n'
2023-04-20 12:35:48,573 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQResponseSchema frame_type:FrameType.RESPONSE, body:b'OK', is_ok:True>
2023-04-20 12:35:48,573 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'RDY 1\n'
2023-04-20 12:35:48,573 - DEBUG - ansq localhost:4150.0: Reconnected to tcp://localhost:4150
2023-04-20 12:35:48,573 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQMessageSchema frame_type:FrameType.MESSAGE, body:b'Hello, world (2)!', timestamp:1681982747370423498, attempts:1, id:125946562940d001>
>>>>> Message: b'Hello, world (2)!'
2023-04-20 12:35:48,573 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'FIN 125946562940d001\n'
^A^P2023-04-20 12:40:48,670 - DEBUG - ansq localhost:4150.0: NSQ: Got data: <NSQMessageSchema frame_type:FrameType.MESSAGE, body:b'Hello, world (3)!', timestamp:1681982747370659630, attempts:1, id:125946562940d002>
>>>>> Message: b'Hello, world (3)!'
2023-04-20 12:40:48,670 - DEBUG - ansq localhost:4150.0: NSQ: Executing command b'FIN 125946562940d002\n'
2023-04-20 12:45:48,770 - ERROR - ansq localhost:4150.0: Connection tcp://localhost:4150 is closing due an error: [Errno 32] Broken pipe
2023-04-20 12:45:48,770 - ERROR - ansq localhost:4150.0: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/home/pndiku/tmp/ansq/px/lib/python3.10/site-packages/ansq/tcp/connection.py", line 178, in _do_close
    await self._writer.wait_closed()
  File "/usr/lib64/python3.10/asyncio/streams.py", line 343, in wait_closed
    await self._protocol._get_close_waiter(self)
  File "/usr/lib64/python3.10/asyncio/selector_events.py", line 924, in write
    n = self._sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
2023-04-20 12:45:48,771 - DEBUG - ansq localhost:4150.0: Connection tcp://localhost:4150 is closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants