Skip to content

Commit

Permalink
gc now returns the actions it performed
Browse files Browse the repository at this point in the history
  • Loading branch information
oliyh committed Mar 20, 2020
1 parent ee6e70d commit 49c7a39
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 43 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -179,6 +179,11 @@ The two possibilities are handled differently:
:distribution :random}
:dlq {:deliveries 5
:stream "dlq"}})

;; returns
[{:action :dlq, :id "0-1", :consumer "consumer/messages/0"}
{:action :rebalance, :id "0-2", :consumer "consumer/messages/0", :claimant "consumer/messages/1"}
{:action :noop, :id "0-3", :consumer "consumer/messages/1"}]
```

GC behaviour is as follows:
Expand Down
82 changes: 47 additions & 35 deletions src/carmine_streams/core.clj
Expand Up @@ -202,38 +202,50 @@
(remove #(and (pos? (:pending %))
(< (:idle rebalance) (:idle %)))
all-consumers)
all-consumers)]
(if (empty? active-consumers)
(log/warn logging-context "No active consumers found" all-consumers)

(doseq [consumer-name (map :name all-consumers)
:let [logging-context (assoc logging-context :consumer consumer-name)]]
(loop [last-id "-"]
(let [pending-messages (car/wcar
conn-opts
(car/xpending stream group last-id "+" 100 consumer-name))]
(when (seq pending-messages)
(car/wcar conn-opts
(doseq [[message-id _consumer idle deliveries :as message] pending-messages]
(cond
(and dlq (message-exceeds? dlq message))
(do (log/info logging-context "Sending message" message-id "to" (:stream dlq) message)
(car/xack stream group message-id)
(car/xadd (:stream dlq) "*" "stream" stream "group" group "consumer" consumer-name "id" message-id "idle" idle "deliveries" deliveries))

(and rebalance (message-exceeds? rebalance message))
(let [next-consumer (as-> active-consumers %
(condp = (:distribution rebalance)
:activity (sort-by :idle %)
:inactivity (sort-by :idle > %)
(shuffle %))
(map :name %)
(set %)
(disj % consumer-name)
(first %))]
(log/info logging-context "Claiming message" message-id "for" next-consumer message)
(car/xclaim stream group next-consumer idle message-id))

:else
:noop)))
(recur (next-id (first (last pending-messages)))))))))))
all-consumers)
actions (transient [])]
(doseq [consumer-name (map :name all-consumers)
:let [logging-context (assoc logging-context :consumer consumer-name)]]
(loop [last-id "-"]
(let [pending-messages (car/wcar
conn-opts
(car/xpending stream group last-id "+" 100 consumer-name))]
(when (seq pending-messages)
(car/wcar conn-opts
(doseq [[message-id _consumer idle deliveries :as message] pending-messages]
(cond
(and dlq (message-exceeds? dlq message))
(do (log/info logging-context "Sending message" message-id "to" (:stream dlq) message)
(car/xack stream group message-id)
(car/xadd (:stream dlq) "*" "stream" stream "group" group "consumer" consumer-name "id" message-id "idle" idle "deliveries" deliveries)
(conj! actions {:action :dlq
:id message-id
:consumer consumer-name}))

(and rebalance (message-exceeds? rebalance message))
(if-let [claimant (as-> active-consumers %
(condp = (:distribution rebalance)
:activity (sort-by :idle %)
:inactivity (sort-by :idle > %)
(shuffle %))
(map :name %)
(set %)
(disj % consumer-name)
(first %))]
(do (log/info logging-context "Claiming message" message-id "for" claimant message)
(car/xclaim stream group claimant idle message-id)
(conj! actions {:action :rebalance
:id message-id
:consumer consumer-name
:claimant claimant}))
(do (log/warn "No active consumers found")
(conj! actions {:action :failed-rebalance
:id message-id
:consumer consumer-name})))

:else
(conj! actions {:action :noop
:id message-id
:consumer consumer-name}))))
(recur (next-id (first (last pending-messages))))))))
(persistent! actions)))
24 changes: 16 additions & 8 deletions test/carmine_streams/core_test.clj
Expand Up @@ -251,8 +251,9 @@

(testing "a gc moves it to the dlq"
(let [dlq (cs/stream-name "dlq")]
(cs/gc-consumer-group! conn-opts stream group {:dlq {:deliveries 1
:stream dlq}})
(is (= [{:action :dlq, :id "0-1", :consumer consumer}]
(cs/gc-consumer-group! conn-opts stream group {:dlq {:deliveries 1
:stream dlq}})))

(is (= {:name group
:pending 0
Expand Down Expand Up @@ -312,9 +313,11 @@
consumers-pending))))

(testing "a gc is a no-op when the criteria aren't met"
(cs/gc-consumer-group! conn-opts stream group {:rebalance {:idle 99999999
:siblings :active
:distribution :random}})
(is (every?
#(= :noop (:action %))
(cs/gc-consumer-group! conn-opts stream group {:rebalance {:idle 99999999
:siblings :active
:distribution :random}})))

(let [consumers-pending (->> (cs/group-stats conn-opts stream group)
:consumers
Expand All @@ -327,9 +330,14 @@
consumers-pending))))

(testing "a gc moves it to another consumer"
(cs/gc-consumer-group! conn-opts stream group {:rebalance {:idle 0
:siblings :active
:distribution :random}})
(is (every?
(fn [{:keys [action consumer claimant]}]
(and (= :rebalance action)
(= dead-consumer consumer)
(= alive-consumer claimant)))
(cs/gc-consumer-group! conn-opts stream group {:rebalance {:idle 0
:siblings :active
:distribution :random}})))

(is (true? (deref succeeded? 500 ::timed-out)))

Expand Down

0 comments on commit 49c7a39

Please sign in to comment.