Skip to content

Commit

Permalink
services/rabbitmq: Add note about worker state. Mention Splitter patt…
Browse files Browse the repository at this point in the history
…ern. Move consumer prefetch and database commits in logical order.
  • Loading branch information
jpmckinney committed Apr 17, 2024
1 parent 7737c9c commit 9aa7c78
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions docs/services/rabbitmq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,38 @@ Disabling heartbeats is `highly discouraged <https://www.rabbitmq.com/heartbeats

That said, from Datlab's experience, the RabbitMQ connection can be unreliable, regardless of the connection settings.

.. https://github.com/open-contracting/data-registry/issues/140
Consumer prefetch
~~~~~~~~~~~~~~~~~

In an early production environment, `prefetch count <https://www.rabbitmq.com/confirms.html#channel-qos-prefetch>`__ is set to 1, which is the `most conservative <https://www.rabbitmq.com/confirms.html#channel-qos-prefetch-throughput>`__ option. In a mature production environment, it is set to 20, in order to scale first by using more threads before using more processes, based on this `blog post <https://blog.rabbitmq.com/posts/2012/04/rabbitmq-performance-measurements-part-2>`__.

Idempotence
~~~~~~~~~~~

Messages can be redelivered, and consumers must handle message redelivery gracefully. It is `recommended <https://www.rabbitmq.com/docs/reliability#consumer-side>`__ to design consumers to be idempotent, rather than to explicitly perform deduplication.

To limit cascading redelivery – that is, where a consumer publishes messages but fails before acknowledging the received message, then receives the redelivered message and publishes messages, again – publish messages immediately before acknowledging the received message: that is, after any potential failure.

To be idempotent, make state changes as late as possible: for example, write to the database immediately before publishing any messages and acknowledging the message.
To be idempotent, make state changes as late as possible: for example, write to the database immediately before publishing any messages and acknowledging the message. The worker should be as **stateless* as possible. It should not make changes to its internal state that carry over between received messages, since messages can arrive in any order.
The simplest form of deduplication is to delete previously written rows before writing new rows to the database.

Database commits
~~~~~~~~~~~~~~~~

If the consumer callback performs database operations, then all database operations before each message publication should be performed in a transaction. This ensures that, if the database operations fail and the incoming message is not acknowledged, then they have a chance to succeed when that message is redelivered, since no partial work had been committed. This guidance applies to *each* message publication, so that work is committed before the related message is published for further processing.

The message publication should not be within the transaction block, if using a ``with`` statement with `psycopg2 <https://www.psycopg.org/docs/usage.html#with-statement>`__ or `Django <https://docs.djangoproject.com/en/4.2/topics/db/transactions/#django.db.transaction.atomic>`__. This ensures that the commit completes (e.g. without integrity errors), before a message is published for further processing.

Acknowledgements
~~~~~~~~~~~~~~~~

Usually, a message is ack'd once processing is complete. In some cases, a message is ack'd *before* its processing is complete:

- *When processing is long*: If a message is not ack'd on a channel within the `acknowledgement timeout <https://www.rabbitmq.com/consumers.html#acknowledgement-timeout>`__ (30 minutes by default), the broker closes the channel. This might cause unexpected errors the next time the consumer uses the channel.
- *When processing isn't atomic*: After some initial work, a consumer might perform work and publish messages in chunks. If it encounters an error in one chunk, the consumer cannot easily "retry" the original message, without encountering integrity errors and publishing duplicate messages. As such, the message is ack'd after the initial work ("point-of-no-return").
- *When processing isn't atomic*: After some initial work, a consumer might perform work and publish messages in chunks, like when implementing the `Splitter pattern <https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html>`__. If it encounters an error in one chunk, the consumer cannot easily "retry" the original message, without encountering integrity errors and publishing duplicate messages. As such, the message is ack'd after the initial work ("point-of-no-return").

If a consumer is interrupted or fails before a message is ack'd, the broker `automatically requeues <https://www.rabbitmq.com/confirms.html#automatic-requeueing>`__ the message, once either the acknowledgement timeout or the `heartbeat timeout <https://www.rabbitmq.com/heartbeats.html>`__ is reached, at which time the consumer is considered buggy, stuck or unavailable by the broker.

Expand All @@ -93,20 +107,6 @@ When an exception is raised:

*Message acknowledgment* under `Work Queues tutorial <https://www.rabbitmq.com/tutorials/tutorial-two-python.html>`__

.. https://github.com/open-contracting/data-registry/issues/140
Consumer prefetch
~~~~~~~~~~~~~~~~~

In an early production environment, `prefetch count <https://www.rabbitmq.com/confirms.html#channel-qos-prefetch>`__ is set to 1, which is the `most conservative <https://www.rabbitmq.com/confirms.html#channel-qos-prefetch-throughput>`__ option. In a mature production environment, it is set to 20, in order to scale first by using more threads before using more processes, based on this `blog post <https://blog.rabbitmq.com/posts/2012/04/rabbitmq-performance-measurements-part-2>`__.

Database commits
~~~~~~~~~~~~~~~~

If the consumer callback performs database operations, then all database operations before each message publication should be performed in a transaction. This ensures that, if the database operations fail and the incoming message is not acknowledged, then they have a chance to succeed when that message is redelivered, since no partial work had been committed. This guidance applies to *each* message publication, so that work is committed before the related message is published for further processing.

The message publication should not be within the transaction block, if using a ``with`` statement with `psycopg2 <https://www.psycopg.org/docs/usage.html#with-statement>`__ or `Django <https://docs.djangoproject.com/en/4.2/topics/db/transactions/#django.db.transaction.atomic>`__. This ensures that the commit completes (e.g. without integrity errors), before a message is published for further processing.

Unused features
---------------

Expand Down

0 comments on commit 9aa7c78

Please sign in to comment.