Skip to content

Commit

Permalink
Fix a bug where a command would hang due to closed connection (#26)
Browse files Browse the repository at this point in the history
* Fix a bug where a command would hang due to closed connection

* Try to fix flaky test

* Fix timeouts

* Fix test
  • Loading branch information
atugushev committed Sep 16, 2020
1 parent fd54721 commit cb4c722
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
15 changes: 13 additions & 2 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from typing import Any, AsyncGenerator, Callable, Optional, Union

from ansq.tcp import consts
from ansq.tcp.exceptions import NSQUnauthorized, ProtocolError, get_exception
from ansq.tcp.exceptions import (
ConnectionClosedError,
NSQUnauthorized,
ProtocolError,
get_exception,
)
from ansq.tcp.types import (
ConnectionStatus,
NSQCommands,
Expand Down Expand Up @@ -122,6 +127,11 @@ async def _do_close(
finally:
pass

for future, callback in self._cmd_waiters:
if not future.cancelled():
future.set_exception(ConnectionClosedError("Connection is closed"))
callback is not None and callback(None)

if self._message_queue.qsize() > 0:
self._message_queue.get_nowait()

Expand Down Expand Up @@ -163,7 +173,8 @@ async def execute(
await self._reconnect_task

assert self._reader, "You should call `connect` method first"
assert self._status or command == NSQCommands.CLS, "Connection is closed"
if not self._status and not (command == NSQCommands.CLS):
raise ConnectionClosedError("Connection is closed")

future = self._loop.create_future()
if command in (
Expand Down
4 changes: 4 additions & 0 deletions ansq/tcp/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Union


class ConnectionClosedError(Exception):
pass


class NSQException(Exception):
"""XXX"""

Expand Down
35 changes: 32 additions & 3 deletions tests/test_send_commands.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from time import time
import asyncio
from time import sleep, time

import pytest

from ansq import open_connection
from ansq.tcp.connection import NSQConnection
from ansq.tcp.exceptions import ConnectionClosedError


@pytest.mark.asyncio
Expand Down Expand Up @@ -68,9 +70,10 @@ async def test_command_without_connection():
nsq = NSQConnection()
assert nsq.status.is_init

with pytest.raises(AssertionError) as e:
with pytest.raises(
AssertionError, match="^You should call `connect` method first$",
):
await nsq.pub("test_topic", "test_message")
assert str(e.value) == "You should call `connect` method first"

await nsq.close()
assert nsq.status.is_init
Expand All @@ -87,3 +90,29 @@ async def test_command_sub():

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_command_with_closed_connection():
nsq = await open_connection()
await nsq.close()

with pytest.raises(ConnectionClosedError, match="^Connection is closed$"):
await nsq.pub("test_topic", "test_message")


@pytest.mark.asyncio
async def test_command_with_concurrently_closed_connection():
nsq = await open_connection()

async def close():
await nsq.close()

async def blocking_wait_and_pub():
sleep(0.1)
await nsq.pub("test_topic", "test_message")

with pytest.raises(ConnectionClosedError, match="^Connection is closed$"):
await asyncio.wait_for(
asyncio.gather(close(), blocking_wait_and_pub()), timeout=1,
)

0 comments on commit cb4c722

Please sign in to comment.