Kafka clojure client
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.



Copied and modifed from the legacy kafka-clj(https://github.com/kafka-dev/kafka/tree/master/clients/clojure)


This kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface.

  • Better concurrency than Java API

  • Multifetch is not supported yet.

  • Consumer should be refactored

Quick Start

Download and start Kafka.

Pull dependencies with Leiningen:

$ lein deps
$ lein jar 


Sending messages

(with-open [p (producer {:broker.list "0:localhost:9092"})]
  (.produce p "topic1" "Message 1")
  (.produce p "topic1" ["Message 2" "Message 3"])
  (.produce p "topic1" "partition-key" ["Message 2" "Message 3"])

Sending Multi messages

(with-open [p (producer {:zk.connect "localhost:2082"})]
  (.produce p [ ["topic1" nil "Messages 1"] 
                ["topic2" "partition-key" "Message 2"]]))

Message Partitioning

(with-open [p (producer {:zk.connect "localhost:2082"}]
  (.produce p "test" "Partition-key1" "Message 1")
  (.produce p "test" "Partition-key2" "Message 2")

Following options are supported:

  • :broker.list string_ Comma seperated broker connection string. ::
  • :zk.connect string ZooKeeper Connection String
  • :zk.connectiontimeout.ms int ZooKeepr connection timeout Millis
  • :zk.sessiontimeout.ms int ZooKeepr session timeout Millis
  • :broker.type string [sync|async|batch] default : sync
  • :partitioner function Partitioner function whchi accepts two arguments (partition-key num-partition). Default is random partition.

Simple consumer (will be deprecated)

(with-open [c (consumer "localhost" 9092)]
  (let [offs (offsets c "test" 0 -1 10)]
    (.consume c "test" 0 (last offs) 1000000)))

Consumer sequence (will be deprecated)

(with-open [c (consumer {:broker.list "localhost:9092"})]
  (doseq [m (.consume-seq c "test" 0 {:blocking true})]
    (println m)))

Following options are supported:

  • :blocking boolean default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
  • :repeat-count int number of attempts to fetch new messages before terminating, default 10.
  • :repeat-timeout int wait time in milliseconds between fetch attempts, default 1000.
  • :offset long initialized to highest offset if not provided.
  • :max-size int max result message size, default 1000000.

Java Interop

Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("multithread", "true"); kafka.types.Producer producer = kafka.kafka.newProducer(props);

List messages = new java.util.ArrayList(); messages.add("hello") messages.add("world") producer.produce("topic", messages);