Skip to content

mhaemmerle/clj-kafka

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

clj-kafka

Simple Clojure interface to Kafka.

It's currently a snapshot only until things flesh out a little more. API Documentation is also available.

Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.7 release was published as source. This library uses a build of the 0.7 incubator release published on Clojars.

Installing

Add the following to your Leiningen project.clj:

[clj-kafka "0.0.5-0.7-SNAPSHOT"]

Usage

clj-kafka currently only supports Kafka 0.7.

Producer

Allows batching of messages:

(use 'clj-kafka.producer)

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
                             (map #(.getBytes %))
                             (map message)))

Or sending a single message:

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))

SimpleConsumer

(use 'clj-kafka.consumer.simple)

(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))

(messages c f)

({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zk.connect" "localhost:2181" 
             "groupid"    "my-task-group"})

(with-resource [c (consumer config)]
  shutdown
  (take 5 (messages c "test")))

({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})

It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:

(take 5 (messages c "test1" "test2"))

License

Copyright © 2012 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.

About

Wrapper to the Java API for interacting with Kafka

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published