kafka producer and consumer support in riemann
Switch branches/tags
Nothing to show
Clone or download
Latest commit 61fe1b4 Jan 14, 2016
Failed to load latest commit information.
resources/riemann_plugin/kafka import Dec 17, 2013
src/riemann/plugin Consumer commit behavior configurable. Jan 14, 2016
.gitignore import Dec 17, 2013
.travis.yml travis: enable ci Jul 31, 2015
LICENSE import Dec 17, 2013
README.md Add support for customized encoder Dec 22, 2015
project.clj bump Jan 14, 2016



Build Status

kafka consumer and producer support for riemann


In riemann.config


(kafka/kafka-consumer {:topic "events"
                       :zookeeper.connect "localhost:181"
                       :group.id "riemann.consumer"
                       :auto.offset.reset "smallest"
                       :auto.commit.enable "false"})

(let [producer (kafka/kafka-producer
                 {:topic "expired"
                  :metadata.broker.list "localhost:9091"})]
    (expired producer)))

Customized Encoder and Decoder

The riemann-kafka producer supports customized encoder to encode events.
A encoder is a function that expects a sequence of events and returns a Bytes object.
If you don't specify a :encoder, riemann.common/encode is used, which turns events into a protobuf.

(defn my-json-encoder
  "Encode events into json, then toBytes"
  (.getBytes (cheshire.core/encode events)))

(let [producer (kafka/kafka-producer
                 {:topic "expired"
                  :metadata.broker.list "localhost:9091"
                  :encoder my-json-encoder})]
    (expired producer)))

The riemann-kafka consumer supports customized decoder to decode messages.
A decoder gets a message from kafka as a Byte object, then returns a sequence of events.
If you don't specify a :decoder, the default decoder expects the incoming message to be a riemann protobuf object.

(defn my-json-decoder
  "Decode kafka message into a riemann event"
  ; input is a single kafka message in Bytes
  ; If the payload is a string, it needs to be reverted by `(String. input)`
  ; Return SHOULD be a seq of riemann events
  (let [decoded-msg (cheshire.core/parse-string (String. input) true)]
    (map riemann.common/event decoded-msg)))

(kafka/kafka-consumer {:topic "events"
                       :zookeeper.connect "localhost:181"
                       :group.id "riemann.consumer"
                       :auto.offset.reset "smallest"
                       :auto.commit.enable "false"
                       :decoder my-json-decoder})


You will need to build this module for now and push it on riemann's classpath, for this you will need a working JDK, JRE and leiningen.

First build the project:

lein uberjar

The resulting artifact will be in target/riemann-kafka-standalone-0.1.0.jar. You will need to push that jar on the machine(s) where riemann runs, for instance, in /usr/lib/riemann/riemann-kafka.jar.

If you have installed riemann from a stock package you will only need to tweak /etc/default/riemann and change the line EXTRA_CLASSPATH to read:


You can then use exposed functions, provided you have loaded the plugin in your configuration.