Skip to content

Commit

Permalink
raises different exception when trying to process timed out message
Browse files Browse the repository at this point in the history
  • Loading branch information
GinTR1k committed Oct 4, 2020
1 parent cb4c722 commit 915ada4
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 38 deletions.
6 changes: 6 additions & 0 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ async def messages(self) -> AsyncGenerator[NSQMessage, None]:
message = await self._message_queue.get()
if message is None:
return
if message.is_timed_out:
self.logger.error(f"Message with id={message.id} is timed out")
continue
if message.is_processed:
self.logger.error(f"Message with id={message.id} is processed already")
continue
yield message

def get_message(self) -> Optional[NSQMessage]:
Expand Down
6 changes: 4 additions & 2 deletions ansq/tcp/types/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def not_processed(func: Callable) -> Callable:
async def decorator(cls: "NSQMessage", *args: Any, **kwargs: Any) -> Any:
if cls.is_processed:
raise RuntimeWarning("Message has already been processed")
if cls.is_timed_out:
raise RuntimeWarning("Message is timed out")
response = await func(cls, *args, **kwargs)
return response

Expand Down Expand Up @@ -78,9 +80,8 @@ def is_processed(self) -> bool:
"""True if message has been processed:
* finished
* re-queued
* timed out
"""
return self.is_timed_out or self._is_processed
return self._is_processed

@property
def timeout(self) -> timedelta:
Expand Down Expand Up @@ -118,3 +119,4 @@ async def touch(self) -> None:
:raises RuntimeWarning: in case message was processed earlier.
"""
await self._connection.touch(self.id)
self._initialized_at = datetime.now(tz=timezone.utc)
118 changes: 82 additions & 36 deletions tests/test_read_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def test_read_message():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out

await message.fin()
assert message.is_processed
Expand All @@ -46,7 +46,7 @@ async def test_read_message_and_req():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out

await message.req()
assert message.is_processed
Expand All @@ -55,35 +55,6 @@ async def test_read_message_and_req():
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_touch():
nsq = await open_connection()
assert nsq.status.is_connected

timestamp = time()

response = await nsq.pub(
"test_read_message_and_touch", f"hello sent at {timestamp}",
)
assert response.is_ok

response = await nsq.sub("test_read_message_and_touch", "channel1")
assert response.is_ok

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed

await message.touch()
assert not message.is_processed

await message.fin()
assert message.is_processed

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_fin_twice():
nsq = await open_connection()
Expand All @@ -101,7 +72,7 @@ async def test_read_message_and_fin_twice():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out
await message.fin()

with pytest.raises(RuntimeWarning) as warning:
Expand All @@ -126,7 +97,7 @@ async def test_read_messages_via_generator():
processed_messages = 0

async for message in nsq.messages():
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out
await message.fin()
processed_messages += 1

Expand Down Expand Up @@ -160,7 +131,7 @@ async def test_read_single_message_via_get_message():
message = nsq.get_message()

assert isinstance(message, NSQMessage)
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out
await message.fin()

await nsq.close()
Expand All @@ -180,14 +151,89 @@ async def test_read_bytes_message():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert not message.is_processed and not message.is_timed_out

assert message.body == b"\xa1"
with pytest.raises(UnicodeDecodeError):
str(message)

await message.fin()
assert message.is_processed
assert message.is_processed and not message.is_timed_out

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_timeout_messages():
nsq = await open_connection()
assert nsq.status.is_connected

first_message = "first test message at " + str(time())
first_message_response = await nsq.pub(
"test_read_timed_out_messages", first_message,
)
assert first_message_response.is_ok

second_message = "second test message at " + str(time())
second_message_response = await nsq.pub(
"test_read_timed_out_messages", second_message,
)
assert second_message_response.is_ok

await nsq.subscribe("test_read_timed_out_messages", "channel1", 1)
assert nsq.is_subscribed

# We need to wait while the message will be timed out.
# The default message timeout is 60 secs
await asyncio.sleep(61)

async for message in nsq.messages():
assert not message.is_processed and not message.is_timed_out
assert str(message) == second_message
assert message.attempts == 1
await message.fin()
break

message = await asyncio.wait_for(nsq.message_queue.get(), timeout=1)
assert not message.is_processed and not message.is_timed_out
assert str(message) == first_message
assert message.attempts == 2
await message.fin()

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_touch():
nsq = await open_connection()
assert nsq.status.is_connected

timestamp = time()

response = await nsq.pub(
"test_read_message_and_touch", f"hello sent at {timestamp}",
)
assert response.is_ok

response = await nsq.sub("test_read_message_and_touch", "channel1")
assert response.is_ok

await nsq.rdy(1)

message = await nsq.message_queue.get()
assert not message.is_processed and not message.is_timed_out
await asyncio.sleep(31)

assert not message.is_processed and not message.is_timed_out
await message.touch()
assert not message.is_processed and not message.is_timed_out
await asyncio.sleep(31)

assert not message.is_processed and not message.is_timed_out
await message.fin()
assert message.is_processed and not message.is_timed_out

await nsq.close()
assert nsq.is_closed

0 comments on commit 915ada4

Please sign in to comment.