diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4236c59..b9fd4ba 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,5 +18,5 @@ repos: rev: 6.3.0 hooks: - id: pydocstyle - additional_dependencies: [toml] + additional_dependencies: [tomli] files: ^(?!tests) diff --git a/docs/changelog.rst b/docs/changelog.rst index abb6d99..721efc4 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +0.1.0 (2023-06-29) +------------------ + +Changed +~~~~~~~ + +- **BREAKING CHANGE:** Merge ``yapw.clients.Publisher`` into :class:`yapw.clients.Blocking`, because its methods are synchronous. Replace ``Durable`` and ``Transient`` with a ``durable`` keyword argument. + 0.0.13 (2022-01-28) ------------------- @@ -23,7 +31,7 @@ Fixed Added ~~~~~ -- :meth:`yapw.clients.Publisher.declare_queue` and :meth:`yapw.clients.Threaded.consume` accept an ``arguments`` keyword argument. +- ``yapw.clients.Publisher.declare_queue`` and :meth:`yapw.clients.Threaded.consume` accept an ``arguments`` keyword argument. 0.0.10 (2022-01-24) ------------------- @@ -63,7 +71,7 @@ Added Added ~~~~~ -- :meth:`yapw.clients.Publisher.declare_queue` and :meth:`yapw.clients.Consumer.consume`: Rename the ``routing_key`` argument to ``queue``, and add a ``routing_keys`` optional argument. +- ``yapw.clients.Publisher.declare_queue`` and :meth:`yapw.clients.Consumer.consume`: Rename the ``routing_key`` argument to ``queue``, and add a ``routing_keys`` optional argument. Changed ~~~~~~~ @@ -83,8 +91,8 @@ Changed ~~~~~~~ - Add ``decode`` as first argument to :mod:`yapw.decorators` functions. -- :class:`yapw.clients.Publisher`: Rename ``encoder`` keyword argument to ``encode``. -- :class:`yapw.clients.Publisher`'s ``encode`` keyword argument defaults to :func:`yapw.util.default_encode`. +- ``yapw.clients.Publisher``: Rename ``encoder`` keyword argument to ``encode``. +- ``yapw.clients.Publisher``'s ``encode`` keyword argument defaults to :func:`yapw.util.default_encode`. - :func:`yapw.util.default_encode` encodes ``str`` to ``bytes`` and pickles non-``str`` to ``bytes``. 0.0.4 (2021-11-19) @@ -93,7 +101,7 @@ Changed Added ~~~~~ -- :class:`yapw.clients.Publisher` (and children) accepts ``encoder`` and ``content_type`` keyword arguments. +- ``yapw.clients.Publisher`` (and children) accepts ``encoder`` and ``content_type`` keyword arguments. Changed ~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index 85cbb23..295d835 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -21,17 +21,16 @@ Create client classes, by layering in :doc:`mixins`: from yapw import clients - class Consumer(clients.Threaded, clients.Durable, clients.Blocking, clients.Base): + class Consumer(clients.Threaded, clients.Blocking, clients.Base): pass - class Publisher(clients.Durable, clients.Blocking, clients.Base): + class Publisher(clients.Blocking, clients.Base): pass Each mixing contributes features, such that a client will: -- :class:`~yapw.clients.Blocking`: Use `pika.BlockingConnection `__, while avoiding deadlocks by setting ``blocked_connection_timeout`` to a sensible default. -- :class:`~yapw.clients.Durable`: Declare a durable exchange, use persistent messages on :meth:`~yapw.clients.Durable.publish`, and create a durable queue on :meth:`~yapw.clients.Threaded.consume`. +- :class:`~yapw.clients.Blocking`: Use `pika.BlockingConnection `__, while avoiding deadlocks by setting ``blocked_connection_timeout`` to a sensible default. Declare an exchange. - :class:`~yapw.clients.Threaded`: Run the consumer callback in separate threads when consuming messages. Install handlers for the SIGTERM, SIGINT and user-defined signals to stop consuming messages, wait for threads to terminate, and close the connection. Publish messages outside a consumer callback @@ -94,7 +93,7 @@ The :func:`~yapw.methods.blocking.ack`, :func:`~yapw.methods.blocking.nack` and Encoding and decoding ~~~~~~~~~~~~~~~~~~~~~ -By default, when publishing messages, the :class:`~yapw.clients.Durable` and :class:`~yapw.clients.Transient` mixins use a content type of "application/json" and encode the message body with the :func:`~yapw.util.default_encode` function, which serializes to JSON-formatted bytes when the content type is "application/json". +By default, when publishing messages, the :class:`~yapw.clients.Blocking` mixin uses a content type of "application/json" and encodes the message body with the :func:`~yapw.util.default_encode` function, which serializes to JSON-formatted bytes when the content type is "application/json". Similarly, when consuming messages, the :class:`yapw.clients.Threaded` mixin uses the :func:`~yapw.decorators.default_decode` function, which deserializes from JSON-formatted bytes when the consumed message's content type is "application/json". diff --git a/tests/clients/test_blocking.py b/tests/clients/test_blocking.py index 757aedb..5cb3ae4 100644 --- a/tests/clients/test_blocking.py +++ b/tests/clients/test_blocking.py @@ -9,7 +9,7 @@ class Client(Blocking, Base): @patch("pika.BlockingConnection") def test_init_default(connection): - client = Client() + client = Client(durable=False) connection.assert_called_once() @@ -22,7 +22,10 @@ def test_init_default(connection): @patch("pika.BlockingConnection") def test_init_kwargs(connection): client = Client( - url="https://host:1234/%2Fv?blocked_connection_timeout=10", blocked_connection_timeout=300, prefetch_count=10 + url="https://host:1234/%2Fv?blocked_connection_timeout=10", + blocked_connection_timeout=300, + prefetch_count=10, + durable=False, ) connection.assert_called_once() diff --git a/tests/clients/test_publisher.py b/tests/clients/test_publisher.py index 0a66df3..f8eb706 100644 --- a/tests/clients/test_publisher.py +++ b/tests/clients/test_publisher.py @@ -4,15 +4,11 @@ import pika import pytest -from yapw.clients import Base, Blocking, Durable, Transient +from yapw.clients import Base, Blocking from yapw.util import default_encode -class DurableClient(Durable, Blocking, Base): - pass - - -class TransientClient(Transient, Blocking, Base): +class Client(Blocking, Base): pass @@ -20,10 +16,10 @@ def dumps(message, content_type): return b"overridden" -@pytest.mark.parametrize("client_class", [DurableClient, TransientClient]) +@pytest.mark.parametrize("durable", [True, False]) @patch("pika.BlockingConnection") -def test_init_default(connection, client_class): - client = client_class() +def test_init_default(connection, durable): + client = Client(durable=durable) client.channel.exchange_declare.assert_not_called() @@ -33,10 +29,11 @@ def test_init_default(connection, client_class): assert client.format_routing_key("test") == "_test" -@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)]) +@pytest.mark.parametrize("durable", [True, False]) @patch("pika.BlockingConnection") -def test_init_kwargs(connection, client_class, durable): - client = client_class( +def test_init_kwargs(connection, durable): + client = Client( + durable=durable, exchange="exch", exchange_type="fanout", encode=dumps, @@ -52,10 +49,10 @@ def test_init_kwargs(connection, client_class, durable): assert client.format_routing_key("test") == "test_exch" -@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)]) +@pytest.mark.parametrize("durable", [True, False]) @patch("pika.BlockingConnection") -def test_declare_queue(connection, client_class, durable): - client = client_class(exchange="exch") +def test_declare_queue(connection, durable): + client = Client(durable=durable, exchange="exch") client.declare_queue("q") @@ -68,10 +65,10 @@ def test_declare_queue(connection, client_class, durable): ) -@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)]) +@pytest.mark.parametrize("durable", [True, False]) @patch("pika.BlockingConnection") -def test_declare_queue_routing_keys(connection, client_class, durable): - client = client_class(exchange="exch") +def test_declare_queue_routing_keys(connection, durable): + client = Client(durable=durable, exchange="exch") client.declare_queue("q", ["r", "k"]) @@ -85,14 +82,14 @@ def test_declare_queue_routing_keys(connection, client_class, durable): ) -@pytest.mark.parametrize("client_class,delivery_mode", [(DurableClient, 2), (TransientClient, 1)]) +@pytest.mark.parametrize("durable,delivery_mode", [(True, 2), (False, 1)]) @patch("pika.BlockingConnection") -def test_publish(connection, client_class, delivery_mode, caplog): +def test_publish(connection, durable, delivery_mode, caplog): connection.return_value.channel.return_value.channel_number = 1 caplog.set_level(logging.DEBUG) - client = client_class(exchange="exch") + client = Client(durable=durable, exchange="exch") client.publish({"a": 1}, "q") diff --git a/tests/clients/test_threaded.py b/tests/clients/test_threaded.py index 41dccd9..3e79f44 100644 --- a/tests/clients/test_threaded.py +++ b/tests/clients/test_threaded.py @@ -7,7 +7,7 @@ import pytest -from yapw.clients import Base, Blocking, Threaded, Transient +from yapw.clients import Base, Blocking, Threaded from yapw.decorators import discard, requeue from yapw.methods.blocking import ack, nack, publish @@ -17,12 +17,12 @@ RABBIT_URL = os.getenv("TEST_RABBIT_URL", "amqp://127.0.0.1") -class Client(Threaded, Transient, Blocking, Base): +class Client(Threaded, Blocking, Base): pass def get_client(**kwargs): - return Client(url=RABBIT_URL, exchange="yapw_test", **kwargs) + return Client(durable=False, url=RABBIT_URL, exchange="yapw_test", **kwargs) def encode(message): diff --git a/tests/fixtures/raiser.py b/tests/fixtures/raiser.py index 4bce73d..ad05ab7 100644 --- a/tests/fixtures/raiser.py +++ b/tests/fixtures/raiser.py @@ -12,7 +12,7 @@ logging.basicConfig(level=logging.INFO) -class Client(clients.Threaded, clients.Transient, clients.Blocking, clients.Base): +class Client(clients.Threaded, clients.Blocking, clients.Base): pass @@ -21,7 +21,7 @@ def callback(state, channel, method, properties, body): def main(): - client = Client(exchange="yapw_development") + client = Client(durable=False, exchange="yapw_development") client.consume(callback, "raise", decorator=requeue) diff --git a/tests/fixtures/sleeper.py b/tests/fixtures/sleeper.py index a190333..23fcef0 100644 --- a/tests/fixtures/sleeper.py +++ b/tests/fixtures/sleeper.py @@ -13,7 +13,7 @@ logging.basicConfig(level=logging.INFO) -class Client(clients.Threaded, clients.Transient, clients.Blocking, clients.Base): +class Client(clients.Threaded, clients.Blocking, clients.Base): pass @@ -25,7 +25,7 @@ def callback(state, channel, method, properties, body): def main(): - client = Client(exchange="yapw_development") + client = Client(durable=False, exchange="yapw_development") client.consume(callback, "sleep") diff --git a/yapw/clients.py b/yapw/clients.py index d50e5b8..1413b6a 100644 --- a/yapw/clients.py +++ b/yapw/clients.py @@ -1,11 +1,13 @@ """ -Mixins that can be combined to create a RabbitMQ client. For example: +Mixins that can be combined to create a RabbitMQ client. + +For example: .. code-block:: python from yapw import clients - class Client(clients.Threaded, clients.Durable, clients.Blocking, clients.Base): + class Client(clients.Threaded, clients.Blocking, clients.Base): pass The layers are: @@ -22,13 +24,6 @@ class Client(clients.Threaded, clients.Durable, clients.Blocking, clients.Base): Available mixins: - :class:`~yapw.clients.Blocking` -Publisher - Declare an exchange, declare and bind queues, and publish messages. - - Available mixins: - - - :class:`~yapw.clients.Durable` - - :class:`~yapw.clients.Transient` Consumer Consume messages. @@ -119,13 +114,16 @@ def __getsafe__(self) -> Set[str]: class Blocking: """ - Uses a blocking connection while avoiding deadlocks due to + Uses a blocking connection adapter while avoiding deadlocks due to `blocked connections `__. """ + # Attributes that this mixin expects from base classes. + format_routing_key: Callable[[str], str] + # The connection isn't "safe to use" but it can be "used safely" like in: # https://github.com/pika/pika/blob/master/examples/basic_consumer_threaded.py - __safe__ = ["connection"] + __safe__ = ["connection", "exchange", "encode", "content_type", "delivery_mode"] def __init__( self, @@ -133,51 +131,7 @@ def __init__( url: str = "amqp://127.0.0.1", blocked_connection_timeout: float = 1800, prefetch_count: int = 1, - **kwargs: Any, - ): - """ - Connect to RabbitMQ and create a channel. - - :param url: the connection string (don't set a blocked_connection_timeout query string parameter) - :param blocked_connection_timeout: the timeout, in seconds, that the connection may remain blocked - :param prefetch_count: the maximum number of unacknowledged deliveries that are permitted on the channel - """ - super().__init__(**kwargs) - - parameters = pika.URLParameters(url) - parameters.blocked_connection_timeout = blocked_connection_timeout - - #: The connection. - self.connection: pika.BlockingConnection = pika.BlockingConnection(parameters) - - #: The channel. - self.channel: pika.adapters.blocking_connection.BlockingChannel = self.connection.channel() - self.channel.basic_qos(prefetch_count=prefetch_count) - - def close(self) -> None: - """ - Close the connection. - """ - self.connection.close() - - -class Publisher: - """ - An abstract parent class. Use :class:`~yapw.clients.Durable` or :class:`~yapw.clients.Transient` instead. - """ - - durable: bool - delivery_mode: int - - # Attributes that this mixin expects from base classes. - format_routing_key: Callable[[str], str] - channel: pika.channel.Channel - - __safe__ = ["exchange", "encode", "content_type", "delivery_mode"] - - def __init__( - self, - *, + durable: bool = True, exchange: str = "", exchange_type: ExchangeType = ExchangeType.direct, encode: Encode = default_encode, @@ -186,29 +140,47 @@ def __init__( **kwargs: Any, ): """ - Declare an exchange, unless using the default exchange. + Connect to RabbitMQ, create a channel and declare an exchange, unless using the default exchange. When publishing a message, by default, its body is encoded using :func:`yapw.util.default_encode`, and its content type is set to "application/json". Use the ``encode`` and ``content_type`` keyword arguments to change this. The ``encode`` must be a function that accepts ``(message, content_type)`` arguments and returns bytes. + :param url: the connection string (don't set a blocked_connection_timeout query string parameter) + :param blocked_connection_timeout: the timeout, in seconds, that the connection may remain blocked + :param prefetch_count: the maximum number of unacknowledged deliveries that are permitted on the channel + :param durable: whether to declare a durable exchange, declare durable queues, and publish persistent messages :param exchange: the exchange name :param exchange_type: the exchange type - :param encode: the message body's encoder - :param content_type: the message's content type + :param encode: the message bodies' encoder + :param content_type: the messages' content type :param routing_key_template: see :meth:`~yapw.clients.Base.__init__` """ super().__init__(routing_key_template=routing_key_template, **kwargs) # type: ignore # python/mypy#4335 + parameters = pika.URLParameters(url) + parameters.blocked_connection_timeout = blocked_connection_timeout + + #: Whether to declare a durable exchange, declare durable queues, and publish persistent messages. + self.durable = durable #: The exchange name. self.exchange: str = exchange - #: The message body's encoder. + #: The message bodies' encoder. self.encode: Encode = encode - #: The message's content type. + #: The messages' content type. self.content_type: str = content_type + #: #: The messages' delivery mode. + self.delivery_mode = 2 if durable else 1 + + #: The connection. + self.connection: pika.BlockingConnection = pika.BlockingConnection(parameters) - if self.exchange: - self.channel.exchange_declare(exchange=self.exchange, exchange_type=exchange_type, durable=self.durable) + #: The channel. + self.channel: pika.adapters.blocking_connection.BlockingChannel = self.connection.channel() + self.channel.basic_qos(prefetch_count=prefetch_count) + + if exchange: + self.channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=durable) def declare_queue( self, queue: str, routing_keys: Optional[List[str]] = None, arguments: Optional[Dict[str, str]] = None @@ -242,23 +214,11 @@ def publish(self, message: Any, routing_key: str) -> None: self.channel.basic_publish(**keywords) logger.debug(*basic_publish_debug_args(self.channel, message, keywords)) - -class Transient(Publisher): - """ - Declares a transient exchange, declares transient queues, and uses transient messages. - """ - - durable = False - delivery_mode = 1 - - -class Durable(Publisher): - """ - Declares a durable exchange, declares durable queues, and uses persistent messages. - """ - - durable = True - delivery_mode = 2 + def close(self) -> None: + """ + Close the connection. + """ + self.connection.close() # https://github.com/pika/pika/blob/master/examples/basic_consumer_threaded.py diff --git a/yapw/types.py b/yapw/types.py index 0bf297d..ea5760b 100644 --- a/yapw/types.py +++ b/yapw/types.py @@ -26,11 +26,11 @@ class State(NamedTuple): connection: pika.BlockingConnection #: The exchange name. exchange: str - #: The message body's encoder. + #: The message bodies' encoder. encode: Encode - #: The message's content type. + #: The messages' content type. content_type: str - #: The message's delivery mode. + #: The messages' delivery mode. delivery_mode: int diff --git a/yapw/util.py b/yapw/util.py index 9b47c3f..e86b4f0 100644 --- a/yapw/util.py +++ b/yapw/util.py @@ -14,7 +14,7 @@ from yapw.types import PublishKeywords, State if TYPE_CHECKING: - from yapw.clients import Publisher + from yapw.clients import Blocking def json_dumps(message: Any) -> bytes: @@ -53,7 +53,7 @@ def default_encode(message: Any, content_type: str) -> bytes: return pickle.dumps(message) -def basic_publish_kwargs(state: Union["Publisher", State], message: Any, routing_key: str) -> PublishKeywords: +def basic_publish_kwargs(state: Union["Blocking", State], message: Any, routing_key: str) -> PublishKeywords: """ Prepare keyword arguments for ``basic_publish``.