Skip to content

Commit

Permalink
make error handling in streams more consistent, per @mithrandi
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Mar 31, 2015
1 parent bff9731 commit 4b98862
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 41 deletions.
58 changes: 28 additions & 30 deletions src/manifold/stream.clj
Expand Up @@ -425,24 +425,18 @@
(downstream [_]
(when downstream [downstream]))
IEventSink
(put [_ x _]
(put [this x _]
(try
(let [rsp (f x)]
(if (nil? constant-response)
rsp
constant-response))
(catch Throwable e
(log/error e "error in consume")
(d/error-deferred e))))
(put [_ x _ _ _]
(try
(let [rsp (f x)]
(if (nil? constant-response)
rsp
constant-response))
(catch Throwable e
(log/error e "error in consume")
(d/error-deferred e))))
(log/error e "error in stream handler")
(.close this)
(d/success-deferred false))))
(put [this x default-val _ _]
(.put this x default-val))
(isClosed [_]
(if downstream
(.isClosed downstream)
Expand All @@ -459,15 +453,16 @@

(defn connect-via
"Feeds all messages from `src` into `callback`, with the understanding that they will
eventually be propagated into `dst` in some form."
eventually be propagated into `dst` in some form. The return value of `f` should be
a deferred yielding either `true` or `false`."
([src callback dst]
(connect-via src callback dst nil))
([src callback dst options]
(let [dst (->sink dst)]
(connect
src
(Callback. callback dst nil)
(merge options {:dst' dst})))))
options))))

(defn- connect-via-proxy
([src proxy dst]
Expand Down Expand Up @@ -560,8 +555,12 @@
(let [s' (stream)]
(connect-via s
(fn [msg]
(d/chain msg
#(put! s' %)))
(-> msg
(d/chain #(put! s' %))
(d/catch (fn [e]
(log/error e "deferred realized as error, closing stream")
(close! s')
false))))
s'
{:description {:op "realize-each"}})
(source-only s')))
Expand Down Expand Up @@ -595,19 +594,18 @@

(source-only dst)))))

(let [response (d/success-deferred true)]
(defn filter
"Equivalent to Clojure's `filter`, but for streams instead of sequences."
[pred s]
(let [s' (stream)]
(connect-via s
(fn [msg]
(if (pred msg)
(put! s' msg)
response))
s'
{:description {:op "filter"}})
(source-only s'))))
(defn filter
"Equivalent to Clojure's `filter`, but for streams instead of sequences."
[pred s]
(let [s' (stream)]
(connect-via s
(fn [msg]
(if (pred msg)
(put! s' msg)
(d/success-deferred true)))
s'
{:description {:op "filter"}})
(source-only s')))

(defn reductions
"Equivalent to Clojure's `reductions`, but for streams instead of sequences."
Expand Down Expand Up @@ -803,7 +801,7 @@
@val
val)))
(put [_ x blocking? timeout timeout-val]
;; TODO: this doesn't really time out, because that would
;; TODO: this doesn't realy time out, because that would
;; require consume-side filtering of messages
(buf+ (metric x))
(.put ^IEventSink buf x blocking? timeout timeout-val)
Expand Down
17 changes: 11 additions & 6 deletions src/manifold/stream/default.clj
Expand Up @@ -57,15 +57,22 @@
(when-not permanent?
(utils/with-lock lock
(when-not (s/closed? this)
(add!)

(try
(add!)
(catch Throwable e
(log/error e "error in stream transformer")))

(loop []
(when-let [^Consumer c (.poll consumers)]
(try
(d/success! (.deferred c) (.default-val c))
(catch Throwable e
(log/error e "error in callback")))
(recur)))

(.markClosed this)

(when (s/drained? this)
(.markDrained this))))))

Expand All @@ -85,7 +92,8 @@
(add! this msg))
(catch Throwable e
(.close this)
(d/error-deferred e))))
(log/error e "error in stream transformer")
(d/success-deferred false))))

close?
(reduced? result)
Expand All @@ -109,10 +117,7 @@

(instance? Production result)
(let [^Production p result]
(try
(d/success! (.deferred p) (.message p) (.token p))
(catch Throwable e
(log/error e "error in callback")))
(d/success! (.deferred p) (.message p) (.token p))
(if blocking?
true
(d/success-deferred true executor)))
Expand Down
6 changes: 1 addition & 5 deletions src/manifold/stream/graph.clj
Expand Up @@ -25,7 +25,6 @@
^boolean upstream?
^boolean downstream?
^IEventSink sink
^IEventSink sink'
^String description])

(deftype AsyncPut
Expand All @@ -41,7 +40,7 @@
.iterator
iterator-seq
(map (fn [^Downstream d]
[(.description d) (or (.sink' d) (.sink d))]))))))
[(.description d) (.sink d)]))))))

(defn- async-send
[^Downstream d msg dsts]
Expand Down Expand Up @@ -87,7 +86,6 @@

(defn- handle-async-error [^AsyncPut x err source]
(some-> ^Downstream (.dst x) .sink s/close!)
(some-> ^Downstream (.dst x) .sink' s/close!)
(log/error err "error in message propagation")
(let [^CopyOnWriteArrayList l (.dsts x)]
(.remove l (.dst x))
Expand Down Expand Up @@ -274,7 +272,6 @@
^IEventSink dst
{:keys [upstream?
downstream?
dst'
timeout
description]
:or {timeout -1
Expand All @@ -287,7 +284,6 @@
(boolean (and upstream? (instance? IEventSink src)))
downstream?
dst
dst'
description)
k (.weakHandle ^IEventStream src ref-queue)]
(if-let [dsts (.get graph k)]
Expand Down
31 changes: 31 additions & 0 deletions test/manifold/stream_test.clj
@@ -1,5 +1,6 @@
(ns manifold.stream-test
(:require
[clojure.tools.logging :as log]
[clojure.core.async :as async]
[clojure.test :refer :all]
[manifold.test-utils :refer :all]
Expand Down Expand Up @@ -236,6 +237,36 @@
s/stream->seq)))
#_(is (= 1 @cnt))))

(deftest test-error-handling

(binding [log/*logger-factory* clojure.tools.logging.impl/disabled-logger-factory]

(let [s (s/stream)
s' (s/map #(/ 1 %) s)]
(is (not (s/closed? s)))
(is (not (s/drained? s')))
(is (= false @(s/put-all! s [0 1])))
(is (s/closed? s))
(is (s/drained? s')))

(let [s (s/stream)
s' (s/map #(d/future (/ 1 %)) s)]
(is (not (s/closed? s)))
(is (not (s/drained? s')))
(is (= true @(s/put! s 0)))
(is (not (s/closed? s)))
(is (not (s/drained? s'))))

(let [s (s/stream)
s' (->> s
(s/map #(d/future (/ 1 %)))
s/realize-each)]
(is (not (s/closed? s)))
(is (not (s/drained? s')))
(s/put-all! s (range 10))
(is (nil? @(s/take! s')))
(is (s/drained? s')))))

;;;

(defn blocking-queue-benchmark [^BlockingQueue q]
Expand Down

0 comments on commit 4b98862

Please sign in to comment.