[#207 #219] Listeners: major revamp, add several new features
New features

  - [#219] Try ping on socket timeouts (@ylgrgyq)
  - [#207] Publish errors to handlers (@aravindbaskaran)
  - [#207] Support optional ping-ms keep-alive (@aravindbaskaran)
  - Add `parse-listener-msg` util
  - Allow pubsub to take direct handler-fn
  - Undocumented: allow ^:swap, ^:parse handler-fns
ptaoussanis committed Sep 17, 2020
1 parent 3f5ded2 commit c4d120d
Showing 2 changed files with 325 additions and 78 deletions.
331 changes: 255 additions & 76 deletions src/taoensso/carmine.clj
Original file line number Diff line number Diff line change
Expand Up @@ -414,112 +414,291 @@

(declare close-listener)

(defrecord Listener [connection handler state future]
(defrecord Listener [connection handler state future id status_]
(close [this] (close-listener this)))

(defn -with-new-listener
(defn close-listener [listener]
(when (compare-and-set! (:status_ listener) :running :closed)
(conns/close-conn (:connection listener))
(future-cancel (:future listener))))

(defmacro with-open-listener
"Evaluates body within the context of given listener's pre-existing persistent
[listener & body]
`(protocol/with-context (:connection ~listener) ~@body
(protocol/execute-requests (not :get-replies) nil)))

(defn- get-ping-fn
"Returns (fn ping-fn [action]) with actions:
:reset! ; Records activity
:reset!? ; Records activity and returns true iff no activity recorded in
; last `msecs`"
;; Much simpler to implement only for listeners than as a general conns feature
;; (where hooking in to recording activity is non-trivial).
(let [msecs (long msecs)
last-activity_ (atom (System/currentTimeMillis))]

(fn ping-fn [action]
(case action
:reset! (enc/reset-in! last-activity_ (System/currentTimeMillis))
(let [now (System/currentTimeMillis)]
(enc/swap-in! last-activity_
(fn [^long last-activity]
(if (> (- now last-activity) msecs)
(enc/swapped now true)
(enc/swapped last-activity false)))))))))

(comment (def pfn (get-ping-fn 2000)) (pfn :reset!?))

(defn parse-listener-msg
"Parses given listener message of form:
- [\"pong\" \"\"]
- [\"message\" <channel> <payload>]
- [\"pmessage\" <pattern> <channel> <payload>], etc.
and returns {:kind _ :pattern _ :channel _ :payload _ :raw _}."
(let [v (enc/have vector? listener-msg)]
(case (count v)
2 (let [[x1 x2 ] v] {:kind x1 :payload x2 :raw v})
3 (let [[x1 x2 x3 ] v] {:kind x1 :channel x2 :payload x3 :raw v})
4 (let [[x1 x2 x3 x4] v] {:kind x1 :pattern x2 :channel x3 :payload x4 :raw v})
(do { :raw v}))))

(parse-listener-msg ["ping" ""])
(parse-listener-msg ["message" "chan1" "payload"]))

(defn -call-with-new-listener
"Implementation detail. Returns new Listener."
[{:keys [conn-spec init-state handler-fn body-fn]}]
(let [state_ (atom init-state)
[{:keys [conn-spec init-state handler-fn body-fn
;; Incompatible with current unextensible macro API:
;; ping-ms error-fn swapping-handler? ; TODO Future release

(let [status_ (atom :running) ; e/o #{:running :closed :broken}
state_ (atom init-state)
handler-fn_ (atom handler-fn)
future_ (atom nil)
listener_ (atom nil)
done?_ (atom false)

{:keys [in] :as conn}
(assoc (conns/conn-spec conn-spec)
:listener? true))

(future-call ; Thread to long-poll for messages
{:keys [ping-ms]} conn-spec
?ping-fn (when-let [ms ping-ms] (get-ping-fn ms))

(fn [msg]
(when-let [hf @handler-fn_]
(let [{:keys [swap parse]} (meta hf) ; Undocumented
msg (if parse (parse-listener-msg msg) msg)]

(if swap
(swap! state_ (fn [state] (hf msg state)))
(do (hf msg @state_))))))

(fn [error throwable]
["carmine" "carmine:listener:error"
{:error error
:throwable throwable
:listener @listener_}])

(catch Throwable t
(timbre/error t "Listener (error) handler exception")

(fn [throwable]
(when (compare-and-set! done?_ false true)
(if (compare-and-set! status_ :running :broken)
(do ; Breaking
(when-let [f @future_] (future-cancel f))
(handle-error :conn-broken throwable)
(if-let [t throwable]
(timbre/error t "Listener connection broken")
(timbre/error "Listener connection broken"))))

(or ; Closing
(handle-error :conn-closed nil)
(timbre/error "Listener connection closed"))))

nil ; Never handle as msg

(bound-fn []
(while true ; Closes when conn closes
(let [reply (protocol/get-unparsed-reply in {})]
(@handler-fn_ reply @state_)
(catch Throwable t
(timbre/error t
"Listener handler exception")))))))]
(loop []
(when-not @done?_

(protocol/with-context conn (body-fn)
(protocol/execute-requests (not :get-replies) nil))
(when-let [pfn ?ping-fn] (pfn :reset!)) ; Record activity on conn
(when-let [msg
(protocol/get-unparsed-reply in {})

(catch _
(when-let [ex (conns/-conn-error conn)]
(done! ex)))

(catch Exception ex
(done! ex)))]

(handle msg)
(catch Throwable t
(handle-error :handler-ex t)
(timbre/error t "Listener handler exception")))))


(Listener. conn handler-fn_ state_ msg-polling-future
(enc/uuid-str) status_)]

(Listener. conn handler-fn_ state_ f)))
(reset! listener_ listener)
(reset! future_ msg-polling-future)

(protocol/with-context conn (body-fn)
(protocol/execute-requests (not :get-replies) nil))

(when-let [pfn ?ping-fn]
(let [sleep-msecs (+ (long ping-ms) 100)
(bound-fn []
(loop []
(when-not @done?_
(Thread/sleep sleep-msecs)
(when (pfn :reset!?) ; Should ping now?
(protocol/with-context conn (ping)
(protocol/execute-requests (not :get-replies) nil))
(catch Exception ex
(done! ex))))

(doto (Thread. ^Runnable f)
(.setDaemon true)


(defn -call-with-new-pubsub-listener
"Implementation detail."
[{:keys [conn-spec handler body-fn]}]
(let [?msg-handler-fns (when (map? handler) handler)]
{:conn-spec (assoc conn-spec :pubsub-listener? true)
:init-state (when-let [m ?msg-handler-fns] m)
:body-fn body-fn
(if-let [msg-handler-fns ?msg-handler-fns] ; {<chan-or-pattern> (fn [msg])}
(fn [msg _state]
(let [{:keys [channel pattern]} (parse-listener-msg msg)]
:if-let [hf (clojure.core/get msg-handler-fns channel)] (hf msg)
:if-let [hf (clojure.core/get msg-handler-fns pattern)] (hf msg)

;; Useful for "carmine"-kind messages
:if-let [hf (clojure.core/get msg-handler-fns "*")] (hf msg))))


(defmacro with-new-listener
"Creates a persistent[1] connection to Redis server and a thread to listen for
"Creates a persistent[1] connection to Redis server and a future to listen for
server messages on that connection.
Incoming messages will be dispatched (with current listener state) to
(fn handler [msg state]).
(fn handler [msg current-state]) will be called on each incoming message [2].
Evaluates body within the context of the connection and returns a
general-purpose Listener containing:
1. The underlying persistent connection to facilitate `close-listener` and
2. An atom containing the function given to handle incoming server messages.
3. An atom containing any other optional listener state.
1. The connection for use with `with-open-listener`, `close-listener`.
2. An atom containing the handler fn.
3. An atom containing optional listener state.
Useful for Pub/Sub, monitoring, etc.
[1] You probably do *NOT* want a :timeout for your `conn-spec` here."
[conn-spec handler initial-state & body]
Errors will be published to \"carmine:listener:error\" channel with Clojure
payload {:keys [error throwable listener]},
:error e/o #{:conn-closed :conn-broken :handler-ex}.
[1] You probably do *NOT* want a :timeout for your `conn-spec` here.
`conn-spec` can include `:ping-ms`, which'll test conn every given msecs.
[2] See also `parse-listener-msg`."
;; [{:keys []} & body] ; TODO Future release
[conn-spec handler-fn init-state & body]
{:conn-spec ~conn-spec
:init-state ~initial-state
:handler ~handler
:init-state ~init-state
:handler-fn ~handler-fn
:body-fn (fn [] ~@body)}))

(defmacro with-open-listener
"Evaluates body within the context of given listener's pre-existing persistent
[listener & body]
`(protocol/with-context (:connection ~listener) ~@body
(protocol/execute-requests (not :get-replies) nil)))
(defmacro with-new-pubsub-listener
"Like `with-new-listener` but `handler` is:
{<channel-or-pattern> (fn handler [msg])}.
(defn close-listener [listener]
(conns/close-conn (:connection listener))
(future-cancel (:future listener)))
(defn -with-new-pubsub-listener
"Implementation detail."
[{:keys [conn-spec msg-handler-fns body-fn]}]
{:conn-spec (assoc conn-spec :pubsub-listener? true)
:init-state msg-handler-fns ; {<chan-or-pattern> (fn [msg])}
:body-fn body-fn
(fn [msg state]
(let [[_msg-type chan-or-pattern _msg-content] msg]
(when-let [hf (clojure.core/get msg-handler-fns chan-or-pattern)]
(hf msg))))}))
{} ; Connection spec, as per `wcar` docstring [1]
(defmacro with-new-pubsub-listener
"A wrapper for `with-new-listener`.
Creates a persistent[1] connection to Redis server and a thread to
handle messages published to channels that you subscribe to with
`subscribe`/`psubscribe` calls in body.
Handlers will receive messages of form:
[<msg-type> <channel/pattern> <message-content>].
{} ; Connection spec, as per `wcar` docstring [1]
{\"channel1\" (fn [[type match content :as msg]] (prn \"Channel match: \" msg))
\"user*\" (fn [[type match content :as msg]] (prn \"Pattern match: \" msg))}
(subscribe \"foobar\") ; Subscribe thread conn to \"foobar\" channel
(psubscribe \"foo*\") ; Subscribe thread conn to \"foo*\" channel pattern
Returns the Listener to allow manual closing and adjustments to
[1] You probably do *NOT* want a :timeout for your `conn-spec` here."
[conn-spec message-handlers & subscription-commands]
{:conn-spec ~conn-spec
:msg-handler-fns ~message-handlers
:body-fn (fn [] ~@subscription-commands)}))
{\"channel1\" (fn [msg] (println \"Channel match: \" msg))
\"user*\" (fn [msg] (println \"Pattern match: \" msg))}
(subscribe \"foobar\") ; Subscribe thread conn to \"foobar\" channel
(psubscribe \"foo*\") ; Subscribe thread conn to \"foo*\" channel pattern
See `with-new-listener` for more info."
;; [{:keys []} & body] ; TODO Future release
[conn-spec handler & subscription-commands]
{:conn-spec ~conn-spec
:handler ~handler
:body-fn (fn [] ~@subscription-commands)}))

(wcar {:cache-buster 1} :as-pipeline
(return (conns/-conn-error (:conn protocol/*context*)))
(subscribe "foo")
(return (conns/-conn-error (:conn protocol/*context*)))

(def my-listener
(with-new-pubsub-listener {:ping-ms 3000 :cache-buster 2}
(fn [msg state]
(let [{:keys [kind channel pattern payload raw]} msg]
(println [:debug/global msg])
(when (= payload "throw")
(throw (Exception. "Whoops!")))))
{"chan1" (fn [msg] (println [:debug/chan1 msg]))
"*" (fn [msg] (println [:debug/* msg]))}

(subscribe "chan1")
(psubscribe "pchan1" "*")))

(wcar {} (publish "chan1" "msg"))
(wcar {} (publish "chan1" "throw"))
(close-listener my-listener))

;;;; Atomic macro
;; The design here's a little on the heavy side; I'd suggest instead reaching
Expand Down

0 comments on commit c4d120d

