Skip to content

Commit

Permalink
Allow to specify minimum bytes and maximum wait for simple consumer API
Browse files Browse the repository at this point in the history
When requesting new messages with the simple consumer API and message is
available, by default, the call to `messages` will return immediatly. If
called in a loop to wait for new messages, a user will have to sleep
between each call to avoid a tight loop, increasing the latency. The
Java API exposes the ability to wait for a minimum of bytes and a
maximum of time before returning no record. We expose those knobs in
`fetch-request` and `messages`.
  • Loading branch information
vincentbernat authored and pingles committed Nov 24, 2015
1 parent de3d48e commit dcc3e6c
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions src/clj_kafka/consumer/simple.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
client-id))

(defn fetch-request
[client-id topic ^Long partition offset fetch-size]
[client-id topic ^Long partition offset fetch-size & {:keys [max-wait min-bytes]}]
(.build (doto (FetchRequestBuilder. )
(.clientId client-id)
(.addFetch topic (Integer/valueOf partition) offset fetch-size))))
(.addFetch topic (Integer/valueOf partition) offset fetch-size)
(#(when max-wait (.maxWait % max-wait)))
(#(when min-bytes (.minBytes % min-bytes))))))

(defn messages
[^SimpleConsumer consumer client-id topic partition offset fetch-size]
(let [fetch (fetch-request client-id topic partition offset fetch-size)]
[^SimpleConsumer consumer client-id topic partition offset fetch-size & more]
(let [fetch (apply fetch-request client-id topic partition offset fetch-size more)]
(map to-clojure (iterator-seq (.iterator (.messageSet ^FetchResponse (.fetch consumer ^FetchRequest fetch)
topic
partition))))))
Expand Down

0 comments on commit dcc3e6c

Please sign in to comment.