Skip to content

Commit

Permalink
Merge pull request #102 from dm3/deferred-alt
Browse files Browse the repository at this point in the history
Add deferred/alt
  • Loading branch information
ztellman committed Jan 17, 2017
2 parents 23f3c7f + 92c742b commit 908458b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
46 changes: 46 additions & 0 deletions src/manifold/deferred.clj
Expand Up @@ -1084,6 +1084,52 @@
(map #(or (->deferred % nil) %))
(apply zip')))

;; same technique as clojure.core.async/random-array
(defn- random-array [n]
(let [a (int-array n)]
(clojure.core/loop [i 1]
(if (= i n)
a
(let [j (rand-int (inc i))]
(aset a i (aget a j))
(aset a j i)
(recur (inc i)))))))

(defn alt'
"Like `alt`, but only unwraps Manifold deferreds."
[& vals]
(let [d (deferred)
cnt (count vals)
^ints idxs (random-array cnt)]
(clojure.core/loop [i 0]
(when (< i cnt)
(let [i' (aget idxs i)
x (nth vals i')]
(if (deferred? x)
(success-error-unrealized x
val (success! d val)
err (error! d err)
(do (on-realized (chain' x)
#(success! d %)
#(error! d %))
(recur (inc i))))
(success! d x)))))
d))

(defn alt
"Takes a list of values, some of which may be deferrable, and returns a
deferred that will yield the value which was realized first.
@(alt 1 2) => 1
@(alt (future (Thread/sleep 1) 1)
(future (Thread/sleep 1) 2)) => 1 or 2 depending on the thread scheduling
Values appearing earlier in the input are preferred."
[& vals]
(->> vals
(map #(or (->deferred % nil) %))
(apply alt')))

(defn timeout!
"Takes a deferred, and sets a timeout on it, such that it will be realized as `timeout-value`
(or a TimeoutException if none is specified) if it is not realized in `interval` ms. Returns
Expand Down
20 changes: 20 additions & 0 deletions test/manifold/deferred_test.clj
Expand Up @@ -243,6 +243,26 @@
(d/error! d (Exception.))
(is (= ::delivered (deref target-d 0 ::not-delivered)))))

(deftest test-alt
(is (#{1 2 3} @(d/alt 1 2 3)))
(is (= 2 @(d/alt (d/future (Thread/sleep 10) 1) 2)))

(is (= 2 @(d/alt (d/future (Thread/sleep 10) (throw (Exception. "boom"))) 2)))

(is (thrown-with-msg? Exception #"boom"
@(d/alt (d/future (throw (Exception. "boom"))) (d/future (Thread/sleep 10)))))

(testing "uniformly distributed"
(let [results (atom {})
;; within 10%
n 1e4, r 10, eps (* n 0.1)
f #(/ (% n eps) r)]
(dotimes [_ n]
@(d/chain (apply d/alt (range r))
#(swap! results update % (fnil inc 0))))
(doseq [[i times] @results]
(is (<= (f -) times (f +)))))))

;;;

(deftest ^:benchmark benchmark-chain
Expand Down

0 comments on commit 908458b

Please sign in to comment.