Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise different exception on processing timed out message #28

Merged
merged 7 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ansq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ async def messages(self) -> AsyncGenerator[NSQMessage, None]:
message = await self._message_queue.get()
if message is None:
return
if message.is_timed_out:
self.logger.error(f"Message id={message.id} is timed out")
continue
yield message

def get_message(self) -> Optional[NSQMessage]:
Expand Down
7 changes: 6 additions & 1 deletion ansq/tcp/types/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class ConnectionFeatures:
deflate: bool = False
deflate_level: int = 6
feature_negotiation: bool = True
heartbeat_interval: int = 30000
heartbeat_interval: int = 30_000
sample_rate: int = 0
snappy: bool = False
tls_v1: bool = False
msg_timeout: int = 60_000


@attr.define(frozen=True, auto_attribs=True, kw_only=True)
Expand Down Expand Up @@ -209,6 +210,10 @@ def is_closed(self) -> bool:
"""True if connection is closed or closing."""
return self.status.is_closed or self._status.is_closing

@property
def options(self) -> ConnectionOptions:
return self._options

@abc.abstractmethod
async def connect(self) -> bool:
raise NotImplementedError()
Expand Down
57 changes: 31 additions & 26 deletions ansq/tcp/types/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Union
from typing import TYPE_CHECKING, Any, Callable

from ansq.tcp.consts import DEFAULT_REQ_TIMEOUT

Expand All @@ -9,51 +9,50 @@

from . import NSQMessageSchema

__all__ = "NSQMessage"
__all__ = ["NSQMessage"]


def not_processed(func: Callable) -> Callable:
"""Decorator to verify that the message has not yet been processed.
def ensure_can_be_processed(func: Callable) -> Callable:
"""Decorator to verify that the message can be processed.

:raises RuntimeWarning: in case message was processed earlier.
"""

@wraps(func)
async def decorator(cls: "NSQMessage", *args: Any, **kwargs: Any) -> Any:
if cls.is_processed:
raise RuntimeWarning("Message has already been processed")
response = await func(cls, *args, **kwargs)
return response
async def wrapper(message: "NSQMessage", *args: Any, **kwargs: Any) -> Any:
if message.is_processed:
raise RuntimeWarning(f"Message id={message.id} has already been processed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While RuntimeWarning is technically an exception, it should not be raised manually in code, because conceptually it belongs to the warnings mechanism. This must be fixed in a following-up PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking issue - #72.

if message.is_timed_out:
raise RuntimeWarning(f"Message id={message.id} is timed out")
return await func(message, *args, **kwargs)

return decorator
return wrapper


class NSQMessage:
def __init__(
self,
message_schema: "NSQMessageSchema",
connection: "NSQConnection",
timeout_in: Union[timedelta, float, int] = timedelta(minutes=1),
is_processed: bool = False,
) -> None:
self.timestamp = message_schema.timestamp
self.attempts = message_schema.attempts
self.body = message_schema.body
self.id = message_schema.id
self._connection = connection
self._is_processed = is_processed

if isinstance(timeout_in, (float, int)):
timeout_in = timedelta(seconds=timeout_in)
self._timeout_in = timeout_in
self._connection = connection
self._timeout_in = timedelta(
milliseconds=connection.options.features.msg_timeout
)
self._is_processed = False
self._initialized_at = datetime.now(tz=timezone.utc)

def __repr__(self) -> str:
return (
'<NSQMessage id="{id}", body={body!r}, attempts={attempts}, '
"timestamp={timestamp}, timeout={timeout}, "
"initialized_at={initialized_at}, is_timed_out={is_timed_out}, "
"is_processed={is_processed}>".format(
"is_processed={is_processed}, can_be_processed={can_be_processed}>".format(
id=self.id,
body=self.body,
attempts=self.attempts,
Expand All @@ -62,6 +61,7 @@ def __repr__(self) -> str:
initialized_at=self._initialized_at,
is_timed_out=self.is_timed_out,
is_processed=self.is_processed,
can_be_processed=self.can_be_processed,
)
)

Expand All @@ -78,9 +78,8 @@ def is_processed(self) -> bool:
"""True if message has been processed:
* finished
* re-queued
* timed out
"""
return self.is_timed_out or self._is_processed
return self._is_processed

@property
def timeout(self) -> timedelta:
Expand All @@ -90,31 +89,37 @@ def timeout(self) -> timedelta:
def is_timed_out(self) -> bool:
return self._initialized_at + self.timeout < datetime.now(tz=timezone.utc)

@not_processed
@property
def can_be_processed(self) -> bool:
"""True if the message has not been processed and has not timed out yet"""
return not self.is_timed_out and not self.is_processed

@ensure_can_be_processed
async def fin(self) -> None:
"""Finish a message (indicate successful processing)

:raises RuntimeWarning: in case message was processed earlier.
:raises RuntimeWarning: in case message was processed earlier or timed out.
"""
await self._connection.fin(self.id)
self._is_processed = True

@not_processed
@ensure_can_be_processed
async def req(self, timeout: int = DEFAULT_REQ_TIMEOUT) -> None:
"""Re-queue a message (indicate failure to process)

:param timeout: An ``int`` in milliseconds where
N <= configured max timeout; 0 is a special case
that will not defer re-queueing.
:raises RuntimeWarning: in case message was processed earlier.
:raises RuntimeWarning: in case message was processed earlier or timed out.
"""
await self._connection.req(self.id, timeout)
self._is_processed = True

@not_processed
@ensure_can_be_processed
async def touch(self) -> None:
"""Reset the timeout for an in-flight message.

:raises RuntimeWarning: in case message was processed earlier.
:raises RuntimeWarning: in case message was processed earlier or timed out.
"""
await self._connection.touch(self.id)
self._initialized_at = datetime.now(tz=timezone.utc)
1 change: 1 addition & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async def test_invalid_feature(create_nsqd, wait_for, nsqd):
assert nsq.status.is_closed


@pytest.mark.asyncio
async def test_connection_options_as_kwargs(nsqd):
nsq = await open_connection(debug=True)
assert nsq._options.debug is True
Expand Down
Loading