Skip to content

Commit

Permalink
Merge pull request #4 from ovotech/backoff
Browse files Browse the repository at this point in the history
Add backoff option to poll
  • Loading branch information
adryanovo committed Mar 3, 2020
2 parents f21517c + 899afd5 commit e573d84
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject ovotech/kafka-clj-test-utils "2.1.0-2"
(defproject ovotech/kafka-clj-test-utils "2.1.0-3"
:description "Companion test utility library for `ovotech/kafka-clj-utils`"
:url "https://github.com/ovotech/kafka-clj-test-utils"
:license {:name "Eclipse Public License"
Expand Down
10 changes: 6 additions & 4 deletions src/kafka_clj_test_utils/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@

(defn poll*
[consumer &
{:keys [expected-msgs retries poll-timeout],
:or {expected-msgs 1, retries 200, poll-timeout 25}}]
{:keys [expected-msgs retries poll-timeout backoff?],
:or {expected-msgs 1, retries 200, poll-timeout 25, backoff? false}}]
(loop [received []
retries retries]
retries retries
poll-timeout poll-timeout]
(if (or (>= (count received) expected-msgs) (zero? retries))
received
(recur (concat received
(map ConsumerRecord->m (.poll consumer poll-timeout)))
(dec retries)))))
(dec retries)
(* poll-timeout (if backoff? 2 1))))))

(defn with-consumer
([kafka-config kafka-serde-config f]
Expand Down

0 comments on commit e573d84

Please sign in to comment.