Skip to content
A Clojure Library for Reliable RabbitMQ Consumers
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
src/kithara
test-resources
test
.gitignore
.travis.yml
LICENSE
README.adoc
project.clj

README.adoc

kithara Build Status

kithara is a (limited) RabbitMQ client for Clojure, based on Lyra. Its specific scope - and thus source of its limitations - is the simple creation of RabbitMQ-based consumers with appropriate recovery and retry facilities.

If you’re looking for a more complete RabbitMQ library you should check out langohr.

Usage

Leiningen (via Clojars)

Clojars Project

Note that you might want to additionally include an slf4j-compliant logging implementation, e.g. logback.

REPL

(require '[kithara.core :as k])

Quickstart

With kithara you have a handler:

(defn message-handler
  [{:keys [body]}]
  (println "I received a message:" body)
  {:status :ack})

Since the handler takes a message and returns a description of what to do with it, you can wrap it easily:

(defn message-handler-with-nack
  [{:keys [body] :as message}]
  (if (= body "error")
    {:status :nack, :requeue? false}
    (message-handler message)))

You turn the handler into a component, adding coercion and a name:

(defonce consumer
  (-> message-handler-with-nack
      (k/consumer {:as :string, :consumer-name "kithara-test"})
      ...))

Followed by a channel, a queue, some bindings and a connection:

(defonce consumer
  (-> message-handler-with-nack
      (k/consumer {:as :string, :consumer-name "kithara-test"})
      (k/with-channel {:prefetch-count 256})
      (k/with-queue "kithara-queue" {:exchange "kithara", :routing-keys ["#"]})
      (k/with-connection {:host "localhost", :vhost "/kithara"})) )

And you watch it goooooo:

(alter-var-root #'consumer com.stuartsierra.component/start)
;; => ...
17:37:25.372 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Creating connection cxn-2 to [docker:5672]
17:37:25.390 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created connection cxn-2 to amqp://192.168.59.103:5672/
17:37:25.395 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-1 on cxn-2
17:37:25.410 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-2 on cxn-2
17:37:25.421 [nREPL-worker-0] DEBUG kithara.components.base-consumer - [kithara-test] starting consumer on queue kithara-queue (desired tag: 'kithara-test:96CQWOfPfKf8tjQ328') ...
17:37:25.425 [nREPL-worker-0] INFO net.jodah.lyra.internal.ChannelHandler - Created consumer-kithara-test:96CQWOfPfKf8tjQ328 of kithara-queue via channel-2 on cxn-2
I received a message: Hello, World!
17:38:29.819 [rabbitmq-cxn-2-consumer] DEBUG kithara.middlewares.logging - [kithara-test] [kithara-test:96CQWOfPfKf8tjQ328] [ack] exchange="kithara", routing-key="test-message", size=13

(And then you probably disable some of Lyra’s logging.)

Building a Consumer

kithara is basically a toolbox with components handling RabbitMQ connection, channel and queue setup. Here you’ll see how to combine them.

Handler

A handler is a function. It takes a single map (the message) and produces another map (the confirmation) specifying how to react to the message, i.e. whether to ACK, NACK or REJECT it.

Message Format

This is an example message:

{:channel      #object[com.sun.proxy.$Proxy1 0x17cd517c "net.jodah.lyra.config.Config@190abc2d"],
 :body         #object["[B" 0x13d05618 "[B@13d05618"],
 :body-raw     #object["[B" 0x13d05618 "[B@13d05618"],
 :properties   {...},
 :routing-key  "the-routing-key",
 :exchange     "the-exchange",
 :redelivered? false,
 :delivery-tag 33,
 :consumer-tag "kithara:96CDgYbID9a1IlDwDS"}

Most of the semantics should be clear, so just a few notes:

  • :body-raw always contains a byte array with the raw payload.

  • :body contains a potentially preprocessed payload (see Message Coercion), otherwise it’s identical to :body-raw.

  • :properties represents the message’s BasicProperties.

Confirmation Format

The confirmation is a map describing how to react to the message that was processed, mainly using the :status key which can take any of the following values:

  • :ack (ACK the message)

  • :nack (NACK the message)

  • :reject (REJECT the message)

  • :error (an exception occured)

  • :done (the message was explicitly confirmed within the handler)

:nack, :reject and :error also rely on the :requeue? key to decide whether or not to re-add a message to the back of the queue (defaults to true for NACK). Additionally, :message and :error can be given to augment the log messages printed by the consumer.

An example confirmation could thus be:

{:status   :nack
 :requeue? false
 :message  "this is unacceptable."}

The reliance on pure data for message handling should make your handlers a little more testable in the long run.

Base Consumer

Once your handler is ready, you can create the base consumer component. It’s not yet bound to any connection, queue or channel, it just encapsulates the handling logic.

(k/consumer handler)
(k/consumer handler {... options ...})

Options can be used to tweak its behaviour a bit.

Message Coercion

The :as option specifies a coercer for the incoming payload. It can be one of the following:

  • :bytes (default): just use the raw byte array,

  • :string: convert the byte array to a UTF-8 string,

  • a function: apply the function to the byte array,

  • any value implementing kithara.protocols/Coercer.

So, a consumer that prints every incoming message’s :body as a string would be constructed as:

(k/consumer
  (fn [{:keys [body]}]
    (println body)
    {:status :ack})
  {:as :string})

Default Confirmation Behaviour

If a handler does not return a map (or a map without the :status key) the message will be confirmed using ACK. In the same vein, if the handler throws an exception the message will be confirmed with NACK and requeued.

This can be adjusted using the :default-confirmation and :error-confirmation keys, e.g.:

(k/consumer
  ...
  {:default-confirmation {:status :nack}
   :error-confirmation   {:status :reject}})

Consumer Name/Tag

It’s often useful to be able to identify a consumer, e.g. in the RabbitMQ management plugin displaying only the consumer tag.

You can thus either set the consumer tag explicitly (using the :consumer-tag option) or you can give your consumer a name (:consumer-name) that will be included in a custom, unique consumer tag chosen by kithara.

Channels

Consumers have to be bound to a channel before they can be started, which is easily achieved:

(k/with-channel
  consumer
  {:prefetch-count 256})

See the documentation for available options. You should set at least :prefetch-count, though, to prevent your consumer from loading more messages into memory than it can stomach. A shorthand for this can be found in with-prefetch-channel.

Queues

Consumers need a queue to receive messages from and that queue is bound to exchange/routing-key pairs. It can be easily set up:

(k/with-queue
  consumer
  "queue-name"
  {:durable?     true
   :exclusive?   false
   :auto-delete? false
   :exchange     "exchange"
   :routing-keys ["#"]})

If no options are given, the queue will not be actively declared but expected to already exist. Note that there are shorthands for commonly used queue types like with-server-named-queue and with-durable-queue.

Connection

Without a connection to your RabbitMQ cluster there isn’t really a lot your consumer can accomplish. Set it thus up via:

(k/with-connection
  consumer
  {:host     "rabbitmq.host.com"
   :vhost    "/kithara"
   :username "..."
   :password "..."})

See kithara.config/connection for endpoint configuration and kithara.config/behaviour for recovery/retry semantics. By default, the connection will employ backing-off recovery (up to 60s) and immediate and unlimited retry.

Common Messaging Patterns and Scenarios

Kithara aims to provide easily usable implementations for common messaging patterns and scenarios.

Environment and External Resources

Usually, messages trigger some kind of change to the world, i.e. writing to a DB, starting of some pre-defined task, etc …​ And the "world" these changes rely on needs to be available to the message handler. There are several ways this can be achieved.

Closures

The message handler function can close over its dependencies, no matter whether they are available as global variables (like in mount) or parameters to a constructor function:

(defn build-message-handler
  [database]
  (fn [message]
    (write! database message)
    {:status :ack}))

(defonce closure-rabbitmq-consumer
  (-> (build-message-handler my-database)
      (k/consumer ...)
      ...))

Dependency Injection

The environment can be passed to the message handler directly within the message map, facilitated by kithara’s with-env function:

(defn message-handler
  [{:keys [env] :as message}]
  (write! (:database env) message)
  {:status :ack})

(defonce injection-rabbitmq-consumer
  (-> message-handler
      (k/consumer ...)
      ...
      (k/with-env {:database my-database})))

The with-env wrapper plays nicely with the component library since it allows for arbitrary keys to be added to the environment using plain assoc, making it possible to employ consumers in component systems:

(defonce system
  (component/system-map
    :consumer (-> message-handler
                  (k/consumer ...)
                  ...
                  (k/with-env)
                  (component/using [:database]))
    :database (map->DB {...})))

Note that with-env has to be on the top-level for this to work.

Wrapper Component (recommended)

A combination of the two previous approaches uses an outer component to gather the environment, then builds the consumer internally on startup. peripheral's defcomponent will let you formulate this concisely (but note that it can be done with plain component just as well):

(defcomponent Consumer [database]
  :message-handler    (build-message-handler database)
  :component/consumer (-> message-handler
                          (k/consumer ...)
                          ...))

(defonce system
  (component/system-map
    :consumer (component/using (map->Consumer {}) [:database])
    :database (map->DB {...})))

Or using with-env:

(defcomponent Consumer [database]
  :component/consumer
  (-> message-handler
      (k/consumer ...)
      ...
      (k/with-env {:database database})))

This approach arguably offers the most flexibility since building the stack is explicit and thus completely under your control.

Multiple Consumers

Every with-* function takes either a single component or a seq of them, allowing your topology to "branch out" however you desire.

For example, you can parallelise processing by adding multiple identical consumers to a channel:

(-> (repeat 5 (k/consumer ...))
    (k/with-channel ...)
    ...)

Although, following the one-channel-per-thread model, it should probably look like this:

(-> (k/consumer ...))
    (k/with-channel ...)
    (->> (repeat 5))
    ...)

It would be just as easy to have two completely independent consumers on the same connection:

(def consumer-1
  (-> (k/consumer ...)
      (k/with-channel ...)
      (k/with-queue "consumer-1")))

(def consumer-2
  (-> (k/consumer ...)
      (k/with-channel ...)
      (k/with-queue "consumer-2")))

(def consumer
  (k/with-connection
    [consumer-1 consumer-2]
    ...))

You have full control over who shares what, on each layer of the consumer stack.

Multi-Threaded Consumers

In some cases you might want to declare just a single consumer but dispatch message processing to a number of worker threads. This functionality is offered by with-threads which will setup and teardown a fixed-size thread pool for one or more consumers:

(require '[kithara.patterns.threads :refer [with-threads])

(defonce rabbitmq-consumer-with-backoff
  (-> (k/consumer ...)
      (with-threads 4)
      (k/with-queue ...)
      ...))

Message Batching

Sometimes, throughput can be gained by batchwise processing of messages (i.e. by bundling writes to a datastore), which can be achieved using kithara’s batching-consumer, a drop-in replacement for the default consumer:

(defn batch-handler
  [messages]
  ...
  {:status :ack})

(defonce rabbitmq-batching-consumer
  (-> batch-handler
      (k/batching-consumer
        {:as :string
         :consumer-name "badger"
         :batch-size 10})
      ...))

The batch handler can return either a single confirmation or a seq of them for more nuanced processing. See the docstring of batching-consumer for more details.

Backoff via Dead-Letter-Queues

The two wrappers with-dead-letter-backoff and with-durable-dead-letter-backoff provide delayed requeuing of messages by dispatching them to a secondary queue, the "dead letter queue", from which they’ll eventually be republished. Both wrappers have to be applied after with-queue.

The simplest version infers names of additional exchanges/queues using the original consumer queue:

(require '[kithara.patterns.dead-letter-backoff :as dlx])

(defonce rabbitmq-consumer-with-backoff
  (-> (k/consumer ...)
      (dlx/with-dead-letter-backoff)
      (k/with-queue ...)
      ...))

Lower-Level RabbitMQ API

Kithara wraps the official Java RabbitMQ client - but only as far as necessary to build consumers (and patterns). You can access those functions using the kithara.rabbitmq.* namespaces.

(Alghtough, if you crave this level of control you should probably use something like langohr.)

Contributing

Contributions are always welcome!

  1. Create a new branch where you apply your changes (ideally also adding tests).

  2. Make sure existing tests are passing.

  3. Open a Pull Request on Github.

License

The MIT License (MIT)

Copyright (c) 2016 Yannick Scherer

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
You can’t perform that action at this time.