Skip to content

Commit

Permalink
docs: Add async code samples
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Jul 2, 2023
1 parent 84f546b commit eed57c1
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 23 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"sphinx.ext.autodoc",
"sphinx.ext.intersphinx",
"sphinx.ext.viewcode",
"sphinx_design",
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
144 changes: 121 additions & 23 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,151 @@ Configure a RabbitMQ client

Import a client class, for example:

.. code-block:: python
.. tab-set::

.. tab-item:: Blocking
:sync: blocking

.. code-block:: python
from yapw.clients import Blocking
from yapw.clients import Blocking
.. tab-item:: Asynchronous
:sync: async

See :mod:`yapw.clients` for other clients based on Pika's ``SelectConnection``.
.. code-block:: python
from yapw.clients import AsyncConsumer
# Or, if the client will exclusively publish messages:
from yapw.clients import Async
Publish messages outside a consumer callback
--------------------------------------------

.. code-block:: python
.. tab-set::

.. tab-item:: Blocking
:sync: blocking

.. code-block:: python
publisher = Blocking(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.publish({"message": "value"}, routing_key="messages")
publisher.close()
from yapw.clients import Blocking
publisher = Blocking(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.publish({"message": "value"}, routing_key="messages")
publisher.close()
.. tab-item:: Asynchronous
:sync: async

.. code-block:: python
from yapw.clients import Async
publisher = Async(url="amqp://user:pass@127.0.0.1", exchange="myexchange")
publisher.publish({"message": "value"}, routing_key="messages")
publisher.close()
The routing key is namespaced by the exchange name, to make it "myexchange_messages".

Consume messages
----------------

.. code-block:: python
.. tab-set::

from yapw.decorators import discard
from yapw.methods import ack, nack, publish
.. tab-item:: Blocking
:sync: blocking

.. code-block:: python
def callback(state, channel, method, properties, body):
try:
key = body["key"]
# do work
publish(state, channel, {"message": "value"}, "myroutingkey")
except KeyError:
nack(state, channel, method.delivery_tag)
else:
ack(state, channel, method.delivery_tag)
from yapw.clients import Blocking
from yapw.decorators import discard
from yapw.methods import ack, nack, publish
consumer = Blocking(url="amqp://user:pass@127.0.0.1", exchange="myexchange", prefetch_count=5)
consumer.consume(callback, queue="messages", decorator=discard)
def callback(state, channel, method, properties, body):
try:
key = body["key"]
# do work
publish(state, channel, {"message": "value"}, "myroutingkey")
except KeyError:
nack(state, channel, method.delivery_tag)
else:
ack(state, channel, method.delivery_tag)
consumer = Blocking(
url="amqp://user:pass@127.0.0.1",
exchange="myexchange",
prefetch_count=5,
)
consumer.consume(callback, queue="messages", decorator=discard)
.. tab-item:: Asynchronous
:sync: async

.. code-block:: python
from yapw.clients import AsyncConsumer
from yapw.decorators import discard
from yapw.methods import ack, nack, publish
def callback(state, channel, method, properties, body):
try:
key = body["key"]
# do work
publish(state, channel, {"message": "value"}, "myroutingkey")
except KeyError:
nack(state, channel, method.delivery_tag)
else:
ack(state, channel, method.delivery_tag)
consumer = AsyncConsumer(
url="amqp://user:pass@127.0.0.1",
exchange="myexchange",
prefetch_count=5,
callback=callback,
queue="messages",
decorator=discard,
)
consumer.start()
yapw implements a pattern whereby the consumer declares and binds a queue. By default, the queue's name and binding key are the same, and are namespaced by the exchange name.

To manually set the binding keys:

.. code-block:: python
.. tab-set::

.. tab-item:: Blocking
:sync: blocking

.. code-block:: python
:emphasize-lines: 4
consumer.consume(
callback,
queue="messages",
routing_keys=["a", "b"],
decorator=discard,
)
.. tab-item:: Asynchronous
:sync: async

.. code-block:: python
:emphasize-lines: 5
consumer.consume(callback, queue="messages", routing_keys=["a", "b"], decorator=discard)
consumer = AsyncConsumer(
# ...
callback=callback,
queue="messages",
routing_keys=["a", "b"],
decorator=discard
)
yapw uses a thread pool to run the consumer callback in separate threads.

Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
furo
sphinx-design
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ docs =
furo
sphinx
sphinx-autobuild
sphinx-design

0 comments on commit eed57c1

Please sign in to comment.