Skip to content

RabbitMQ

Alisha Mohanty edited this page May 22, 2024 · 9 revisions

NOTE: Minimum version is 3.9.0, because Delayed Message Plugin doesn't support RabbitMQ below that.

RabbitMQ Message Broker doesn't support Batch Jobs, Periodic Jobs & some Job management APIs of Goose.

RabbitMQ opts need :settings, :queue-type, :publisher-confirms, :return-listener & :shutdown-listener.

Connection config

Goose uses Langohr for connecting to RabbitMQ. Define the accepted connection config in :settings key. Refer to Langohr connection wiki for more details.

Replication

In order to replicate job data in a cluster of RabbitMQ, specify a quorum queue via :queue-type key. Acceptable queue types are classic & quorum

Publisher Confirms

To ensure reliable enqueue of messages, RabbitMQ has a provision of Publisher Confirms on the client side. They can either be synchronous or asynchronous and are defined via :publisher-confirms key.

We recommend sync strategy with backoff+retries as they'll handle 90% use-cases & scale.

Async strategy does provide a performance boost of ~30x. However, implementing it is quite complicated & involved. Refer to this tutorial for a basic implementation & choose accordingly.

Return Listener

Due to manual deletion of queues, some messages might be unroutable and can be handled via :return-listener key. If a return-listener isn't defined, these messages will be dropped silently. Goose wraps Langohr's return listener which has 6 params & creates a map of them which gets passed to user's handler.

Shutdown Listener

Abrupt RabbitMQ connection shutdowns not initialized by application can be handled via :shutdown-listener key.

Channels

Goose creates a pool of channels for producing/consuming messages from RabbitMQ

  • For a producer/client, channel count must be provided
  • For a consumer/worker, channel count is matched to threads count. Hence, need not be provided

Usage

(ns rmq-broker
  (:require
    [goose.brokers.rmq.broker :as rmq]
    [goose.brokers.rmq.shutdown-listener :as shutdown-listener]
    [goose.client :as c]
    [goose.defaults :as d]
    [goose.worker :as w]))

(let [settings {:uri "amqp://username:password@my-rmq.host:5672"}
      queue-type {:type               d/rmq-quorum-queue
                  :replication-factor 3}
      publisher-confirms-strategy {:strategy       d/sync-confirms
                                   :timeout-ms     1000
                                   :max-retries    3
                                   :retry-delay-ms 100}
      return-listener (fn [{:keys [body]}] (println "report to sentry" body))
      rmq-opts {:settings           settings
                :queue-type         queue-type
                :publisher-confirms publisher-confirms-strategy
                :return-listener    return-listener
                :shutdown-listener  shutdown-listener/default}

      channels 10
      rmq-producer (rmq/new-producer rmq-opts channels)

      rmq-consumer (rmq/new-consumer rmq-opts)

      ;; Use client/worker opts for enqueuing/dequeuing jobs.
      client-opts (assoc c/default-opts :broker rmq-producer)
      worker-opts (assoc w/default-opts :broker rmq-consumer)])

Previous: Message Brokers        Next: Redis