Skip to content

Commit

Permalink
Add dynamic *executor* var
Browse files Browse the repository at this point in the history
Permits flexible global or local setting of the executor to be used
for callbacks.
  • Loading branch information
Stuart Sierra committed Nov 23, 2012
1 parent c0f6a85 commit b571aaa
Showing 1 changed file with 19 additions and 27 deletions.
46 changes: 19 additions & 27 deletions src/cljque/promises.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
(:refer-clojure :exclude (promise deliver))
(:import (java.util.concurrent CountDownLatch Executor TimeUnit)))

(def ^:dynamic *executor*
"java.util.concurrent.Executor to use for callbacks created with the
2-argument form of 'attend'. Applies at the time the callback is
created, not when the promise is delivered. Defaults to the agent
send-off thread pool. If bound to nil, no executor will be used and
the callback will be invoked on the same thread that delivered the
promise."
clojure.lang.Agent/soloExecutor)

(defprotocol IDeliver
(deliver [promise val]
"Delivers the supplied value to the promise, releasing any pending
Expand All @@ -17,8 +26,7 @@
(attend [promise f] [promise f executor]
"Registers callback f on promise and returns the promise. When a
value is delivered to the promise, it will submit #(f promise) to
the executor. With no executor, (f promise) will be called on the
thread that delivered the value."))
the executor, or *executor* if none supplied."))

(comment
;; This doesn't work because "Mismatched return type: execute,
Expand All @@ -37,7 +45,6 @@

;; Do not create directly, use 'promise' function
(deftype Promise [latch
default-executor
^:unsynchronized-mutable v ; value
^:unsynchronized-mutable q ; queue
^:unsynchronized-mutable e] ; error
Expand All @@ -54,8 +61,8 @@
fs))]
(.countDown latch)
(doseq [[f executor] fs]
(if-let [exe (or executor default-executor)]
(.execute exe #(f this))
(if executor
(.execute executor #(f this))
(f this)))))
(fail [this ex]
(when-let [fs (locking this
Expand All @@ -65,15 +72,13 @@
fs))]
(.countDown latch)
(doseq [[f executor] fs]
(if-let [exe (or executor default-executor)]
(.execute exe #(f this))
(if executor
(.execute executor #(f this))
(f this)))))
INotify
(attend
[this f]
(when-not (locking this
(when q (set-q this (conj q [f]))))
(f this)))
(.attend this f *executor*))
(attend
[this f executor]
(when-not (locking this
Expand Down Expand Up @@ -102,21 +107,9 @@
Callback functions can be attached to the promise with 'attend'.
When the promise is delivered or failed, all callback functions will
be invoked, in undetermined order, with the promise as an argument.
Options are key-value pairs from:
:default-executor
The java.util.concurrent.Executor on which to run any callback
which did not specify an executor. Defaults to the send-off
agent thread pool. If nil, callbacks will execute on whatever
thread delivered to the promise."
[& options]
(let [{:keys [default-executor]
:or {default-executor clojure.lang.Agent/soloExecutor}}
options]
(Promise. (CountDownLatch. 1) default-executor nil [] nil)))
be invoked, in undetermined order, with the promise as an argument."
[]
(Promise. (CountDownLatch. 1) nil [] nil))

(defmacro on
"Bindings is a vector of [binding-form promise]. Creates a callback
Expand All @@ -134,8 +127,7 @@
promise."
[bindings & body]
(let [[binding-form base-promise] bindings]
`(let [return# (promise :default-executor
(.-default_executor ~base-promise))]
`(let [return# (promise)]
(attend ~base-promise
(fn [promise#]
(try (let [~binding-form (deref promise#)]
Expand Down

0 comments on commit b571aaa

Please sign in to comment.