Skip to content

Commit

Permalink
feat: Merge Publisher into Blocking, because its methods are synchron…
Browse files Browse the repository at this point in the history
…ous. Replace Durable and Transient with a durable keyword argument.
  • Loading branch information
jpmckinney committed Jun 29, 2023
1 parent 0d8f6e3 commit d7e41eb
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ repos:
rev: 6.3.0
hooks:
- id: pydocstyle
additional_dependencies: [toml]
additional_dependencies: [tomli]
files: ^(?!tests)
18 changes: 13 additions & 5 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

0.1.0 (2023-06-29)
------------------

Changed
~~~~~~~

- **BREAKING CHANGE:** Merge ``yapw.clients.Publisher`` into :class:`yapw.clients.Blocking`, because its methods are synchronous. Replace ``Durable`` and ``Transient`` with a ``durable`` keyword argument.

0.0.13 (2022-01-28)
-------------------

Expand All @@ -23,7 +31,7 @@ Fixed
Added
~~~~~

- :meth:`yapw.clients.Publisher.declare_queue` and :meth:`yapw.clients.Threaded.consume` accept an ``arguments`` keyword argument.
- ``yapw.clients.Publisher.declare_queue`` and :meth:`yapw.clients.Threaded.consume` accept an ``arguments`` keyword argument.

0.0.10 (2022-01-24)
-------------------
Expand Down Expand Up @@ -63,7 +71,7 @@ Added
Added
~~~~~

- :meth:`yapw.clients.Publisher.declare_queue` and :meth:`yapw.clients.Consumer.consume`: Rename the ``routing_key`` argument to ``queue``, and add a ``routing_keys`` optional argument.
- ``yapw.clients.Publisher.declare_queue`` and :meth:`yapw.clients.Consumer.consume`: Rename the ``routing_key`` argument to ``queue``, and add a ``routing_keys`` optional argument.

Changed
~~~~~~~
Expand All @@ -83,8 +91,8 @@ Changed
~~~~~~~

- Add ``decode`` as first argument to :mod:`yapw.decorators` functions.
- :class:`yapw.clients.Publisher`: Rename ``encoder`` keyword argument to ``encode``.
- :class:`yapw.clients.Publisher`'s ``encode`` keyword argument defaults to :func:`yapw.util.default_encode`.
- ``yapw.clients.Publisher``: Rename ``encoder`` keyword argument to ``encode``.
- ``yapw.clients.Publisher``'s ``encode`` keyword argument defaults to :func:`yapw.util.default_encode`.
- :func:`yapw.util.default_encode` encodes ``str`` to ``bytes`` and pickles non-``str`` to ``bytes``.

0.0.4 (2021-11-19)
Expand All @@ -93,7 +101,7 @@ Changed
Added
~~~~~

- :class:`yapw.clients.Publisher` (and children) accepts ``encoder`` and ``content_type`` keyword arguments.
- ``yapw.clients.Publisher`` (and children) accepts ``encoder`` and ``content_type`` keyword arguments.

Changed
~~~~~~~
Expand Down
9 changes: 4 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ Create client classes, by layering in :doc:`mixins<api/clients>`:
from yapw import clients
class Consumer(clients.Threaded, clients.Durable, clients.Blocking, clients.Base):
class Consumer(clients.Threaded, clients.Blocking, clients.Base):
pass
class Publisher(clients.Durable, clients.Blocking, clients.Base):
class Publisher(clients.Blocking, clients.Base):
pass
Each mixing contributes features, such that a client will:

- :class:`~yapw.clients.Blocking`: Use `pika.BlockingConnection <https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html>`__, while avoiding deadlocks by setting ``blocked_connection_timeout`` to a sensible default.
- :class:`~yapw.clients.Durable`: Declare a durable exchange, use persistent messages on :meth:`~yapw.clients.Durable.publish`, and create a durable queue on :meth:`~yapw.clients.Threaded.consume`.
- :class:`~yapw.clients.Blocking`: Use `pika.BlockingConnection <https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html>`__, while avoiding deadlocks by setting ``blocked_connection_timeout`` to a sensible default. Declare an exchange.
- :class:`~yapw.clients.Threaded`: Run the consumer callback in separate threads when consuming messages. Install handlers for the SIGTERM, SIGINT and user-defined signals to stop consuming messages, wait for threads to terminate, and close the connection.

Publish messages outside a consumer callback
Expand Down Expand Up @@ -94,7 +93,7 @@ The :func:`~yapw.methods.blocking.ack`, :func:`~yapw.methods.blocking.nack` and
Encoding and decoding
~~~~~~~~~~~~~~~~~~~~~

By default, when publishing messages, the :class:`~yapw.clients.Durable` and :class:`~yapw.clients.Transient` mixins use a content type of "application/json" and encode the message body with the :func:`~yapw.util.default_encode` function, which serializes to JSON-formatted bytes when the content type is "application/json".
By default, when publishing messages, the :class:`~yapw.clients.Blocking` mixin uses a content type of "application/json" and encodes the message body with the :func:`~yapw.util.default_encode` function, which serializes to JSON-formatted bytes when the content type is "application/json".

Similarly, when consuming messages, the :class:`yapw.clients.Threaded` mixin uses the :func:`~yapw.decorators.default_decode` function, which deserializes from JSON-formatted bytes when the consumed message's content type is "application/json".

Expand Down
7 changes: 5 additions & 2 deletions tests/clients/test_blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Client(Blocking, Base):

@patch("pika.BlockingConnection")
def test_init_default(connection):
client = Client()
client = Client(durable=False)

connection.assert_called_once()

Expand All @@ -22,7 +22,10 @@ def test_init_default(connection):
@patch("pika.BlockingConnection")
def test_init_kwargs(connection):
client = Client(
url="https://host:1234/%2Fv?blocked_connection_timeout=10", blocked_connection_timeout=300, prefetch_count=10
url="https://host:1234/%2Fv?blocked_connection_timeout=10",
blocked_connection_timeout=300,
prefetch_count=10,
durable=False,
)

connection.assert_called_once()
Expand Down
39 changes: 18 additions & 21 deletions tests/clients/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,22 @@
import pika
import pytest

from yapw.clients import Base, Blocking, Durable, Transient
from yapw.clients import Base, Blocking
from yapw.util import default_encode


class DurableClient(Durable, Blocking, Base):
pass


class TransientClient(Transient, Blocking, Base):
class Client(Blocking, Base):
pass


def dumps(message, content_type):
return b"overridden"


@pytest.mark.parametrize("client_class", [DurableClient, TransientClient])
@pytest.mark.parametrize("durable", [True, False])
@patch("pika.BlockingConnection")
def test_init_default(connection, client_class):
client = client_class()
def test_init_default(connection, durable):
client = Client(durable=durable)

client.channel.exchange_declare.assert_not_called()

Expand All @@ -33,10 +29,11 @@ def test_init_default(connection, client_class):
assert client.format_routing_key("test") == "_test"


@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)])
@pytest.mark.parametrize("durable", [True, False])
@patch("pika.BlockingConnection")
def test_init_kwargs(connection, client_class, durable):
client = client_class(
def test_init_kwargs(connection, durable):
client = Client(
durable=durable,
exchange="exch",
exchange_type="fanout",
encode=dumps,
Expand All @@ -52,10 +49,10 @@ def test_init_kwargs(connection, client_class, durable):
assert client.format_routing_key("test") == "test_exch"


@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)])
@pytest.mark.parametrize("durable", [True, False])
@patch("pika.BlockingConnection")
def test_declare_queue(connection, client_class, durable):
client = client_class(exchange="exch")
def test_declare_queue(connection, durable):
client = Client(durable=durable, exchange="exch")

client.declare_queue("q")

Expand All @@ -68,10 +65,10 @@ def test_declare_queue(connection, client_class, durable):
)


@pytest.mark.parametrize("client_class,durable", [(DurableClient, True), (TransientClient, False)])
@pytest.mark.parametrize("durable", [True, False])
@patch("pika.BlockingConnection")
def test_declare_queue_routing_keys(connection, client_class, durable):
client = client_class(exchange="exch")
def test_declare_queue_routing_keys(connection, durable):
client = Client(durable=durable, exchange="exch")

client.declare_queue("q", ["r", "k"])

Expand All @@ -85,14 +82,14 @@ def test_declare_queue_routing_keys(connection, client_class, durable):
)


@pytest.mark.parametrize("client_class,delivery_mode", [(DurableClient, 2), (TransientClient, 1)])
@pytest.mark.parametrize("durable,delivery_mode", [(True, 2), (False, 1)])
@patch("pika.BlockingConnection")
def test_publish(connection, client_class, delivery_mode, caplog):
def test_publish(connection, durable, delivery_mode, caplog):
connection.return_value.channel.return_value.channel_number = 1

caplog.set_level(logging.DEBUG)

client = client_class(exchange="exch")
client = Client(durable=durable, exchange="exch")

client.publish({"a": 1}, "q")

Expand Down
6 changes: 3 additions & 3 deletions tests/clients/test_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest

from yapw.clients import Base, Blocking, Threaded, Transient
from yapw.clients import Base, Blocking, Threaded
from yapw.decorators import discard, requeue
from yapw.methods.blocking import ack, nack, publish

Expand All @@ -17,12 +17,12 @@
RABBIT_URL = os.getenv("TEST_RABBIT_URL", "amqp://127.0.0.1")


class Client(Threaded, Transient, Blocking, Base):
class Client(Threaded, Blocking, Base):
pass


def get_client(**kwargs):
return Client(url=RABBIT_URL, exchange="yapw_test", **kwargs)
return Client(durable=False, url=RABBIT_URL, exchange="yapw_test", **kwargs)


def encode(message):
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/raiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
logging.basicConfig(level=logging.INFO)


class Client(clients.Threaded, clients.Transient, clients.Blocking, clients.Base):
class Client(clients.Threaded, clients.Blocking, clients.Base):
pass


Expand All @@ -21,7 +21,7 @@ def callback(state, channel, method, properties, body):


def main():
client = Client(exchange="yapw_development")
client = Client(durable=False, exchange="yapw_development")
client.consume(callback, "raise", decorator=requeue)


Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/sleeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
logging.basicConfig(level=logging.INFO)


class Client(clients.Threaded, clients.Transient, clients.Blocking, clients.Base):
class Client(clients.Threaded, clients.Blocking, clients.Base):
pass


Expand All @@ -25,7 +25,7 @@ def callback(state, channel, method, properties, body):


def main():
client = Client(exchange="yapw_development")
client = Client(durable=False, exchange="yapw_development")
client.consume(callback, "sleep")


Expand Down

0 comments on commit d7e41eb

Please sign in to comment.