Skip to content

Commit

Permalink
raises different exception when trying to process timed out message
Browse files Browse the repository at this point in the history
  • Loading branch information
GinTR1k committed Oct 4, 2020
1 parent cb4c722 commit 917de51
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 39 deletions.
6 changes: 6 additions & 0 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ async def messages(self) -> AsyncGenerator[NSQMessage, None]:
message = await self._message_queue.get()
if message is None:
return
if message.is_timed_out:
self.logger.error(f"Message with id={message.id} is timed out")
continue
if message.is_processed:
self.logger.error(f"Message with id={message.id} is processed already")
continue
yield message

def get_message(self) -> Optional[NSQMessage]:
Expand Down
13 changes: 10 additions & 3 deletions ansq/tcp/types/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def not_processed(func: Callable) -> Callable:
async def decorator(cls: "NSQMessage", *args: Any, **kwargs: Any) -> Any:
if cls.is_processed:
raise RuntimeWarning("Message has already been processed")
if cls.is_timed_out:
raise RuntimeWarning("Message is timed out")
response = await func(cls, *args, **kwargs)
return response

Expand Down Expand Up @@ -53,7 +55,7 @@ def __repr__(self) -> str:
'<NSQMessage id="{id}", body={body!r}, attempts={attempts}, '
"timestamp={timestamp}, timeout={timeout}, "
"initialized_at={initialized_at}, is_timed_out={is_timed_out}, "
"is_processed={is_processed}>".format(
"is_processed={is_processed}, can_be_processed={can_be_processed}>".format(
id=self.id,
body=self.body,
attempts=self.attempts,
Expand All @@ -62,6 +64,7 @@ def __repr__(self) -> str:
initialized_at=self._initialized_at,
is_timed_out=self.is_timed_out,
is_processed=self.is_processed,
can_be_processed=self.can_be_processed
)
)

Expand All @@ -78,9 +81,8 @@ def is_processed(self) -> bool:
"""True if message has been processed:
* finished
* re-queued
* timed out
"""
return self.is_timed_out or self._is_processed
return self._is_processed

@property
def timeout(self) -> timedelta:
Expand All @@ -90,6 +92,10 @@ def timeout(self) -> timedelta:
def is_timed_out(self) -> bool:
return self._initialized_at + self.timeout < datetime.now(tz=timezone.utc)

@property
def can_be_processed(self) -> bool:
return not self.is_timed_out and not self.is_processed

@not_processed
async def fin(self) -> None:
"""Finish a message (indicate successful processing)
Expand Down Expand Up @@ -118,3 +124,4 @@ async def touch(self) -> None:
:raises RuntimeWarning: in case message was processed earlier.
"""
await self._connection.touch(self.id)
self._initialized_at = datetime.now(tz=timezone.utc)
118 changes: 82 additions & 36 deletions tests/test_read_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def test_read_message():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert message.can_be_processed

await message.fin()
assert message.is_processed
Expand All @@ -46,7 +46,7 @@ async def test_read_message_and_req():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert message.can_be_processed

await message.req()
assert message.is_processed
Expand All @@ -55,35 +55,6 @@ async def test_read_message_and_req():
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_touch():
nsq = await open_connection()
assert nsq.status.is_connected

timestamp = time()

response = await nsq.pub(
"test_read_message_and_touch", f"hello sent at {timestamp}",
)
assert response.is_ok

response = await nsq.sub("test_read_message_and_touch", "channel1")
assert response.is_ok

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed

await message.touch()
assert not message.is_processed

await message.fin()
assert message.is_processed

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_fin_twice():
nsq = await open_connection()
Expand All @@ -101,7 +72,7 @@ async def test_read_message_and_fin_twice():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert message.can_be_processed
await message.fin()

with pytest.raises(RuntimeWarning) as warning:
Expand All @@ -126,7 +97,7 @@ async def test_read_messages_via_generator():
processed_messages = 0

async for message in nsq.messages():
assert not message.is_processed
assert message.can_be_processed
await message.fin()
processed_messages += 1

Expand Down Expand Up @@ -160,7 +131,7 @@ async def test_read_single_message_via_get_message():
message = nsq.get_message()

assert isinstance(message, NSQMessage)
assert not message.is_processed
assert message.can_be_processed
await message.fin()

await nsq.close()
Expand All @@ -180,14 +151,89 @@ async def test_read_bytes_message():

await nsq.rdy(1)
message = await nsq.message_queue.get()
assert not message.is_processed
assert message.can_be_processed

assert message.body == b"\xa1"
with pytest.raises(UnicodeDecodeError):
str(message)

await message.fin()
assert message.is_processed
assert not message.can_be_processed

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_timeout_messages():
nsq = await open_connection()
assert nsq.status.is_connected

first_message = "first test message at " + str(time())
first_message_response = await nsq.pub(
"test_read_timed_out_messages", first_message,
)
assert first_message_response.is_ok

second_message = "second test message at " + str(time())
second_message_response = await nsq.pub(
"test_read_timed_out_messages", second_message,
)
assert second_message_response.is_ok

await nsq.subscribe("test_read_timed_out_messages", "channel1", 1)
assert nsq.is_subscribed

# We need to wait while the message will be timed out.
# The default message timeout is 60 secs
await asyncio.sleep(61)

async for message in nsq.messages():
assert message.can_be_processed
assert str(message) == second_message
assert message.attempts == 1
await message.fin()
break

message = await asyncio.wait_for(nsq.message_queue.get(), timeout=1)
assert message.can_be_processed
assert str(message) == first_message
assert message.attempts == 2
await message.fin()

await nsq.close()
assert nsq.is_closed


@pytest.mark.asyncio
async def test_read_message_and_touch():
nsq = await open_connection()
assert nsq.status.is_connected

timestamp = time()

response = await nsq.pub(
"test_read_message_and_touch", f"hello sent at {timestamp}",
)
assert response.is_ok

response = await nsq.sub("test_read_message_and_touch", "channel1")
assert response.is_ok

await nsq.rdy(1)

message = await nsq.message_queue.get()
assert message.can_be_processed
await asyncio.sleep(31)

assert message.can_be_processed
await message.touch()
assert message.can_be_processed
await asyncio.sleep(31)

assert message.can_be_processed
await message.fin()
assert not message.can_be_processed

await nsq.close()
assert nsq.is_closed

0 comments on commit 917de51

Please sign in to comment.