-
Notifications
You must be signed in to change notification settings - Fork 24
/
Zip.cljs
67 lines (63 loc) · 2.25 KB
/
Zip.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
(ns missionary.impl.Zip)
(declare cancel transfer)
(deftype Process
[combinator notifier flusher
iterators results
^number pending]
IFn
(-invoke [z] (cancel z))
IDeref
(-deref [z] (transfer z)))
(defn cancel [^Process z]
(let [its (.-iterators z)]
(dotimes [i (alength its)]
(when-some [it (aget its i)] (it)))))
(defn transfer [^Process z]
(let [its (.-iterators z)
res (.-results z)]
(try (set! (.-pending z) (dec (.-pending z)))
(dotimes [i (alength its)]
(set! (.-pending z) (inc (.-pending z)))
(aset res i @(aget its i)))
(.apply (.-combinator z) nil res)
(catch :default e
(set! (.-notifier z) (.-flusher z))
(throw e))
(finally
(set! (.-pending z) (inc (.-pending z)))
(when (zero? (.-pending z)) ((.-notifier z)))
(when (identical? (.-notifier z) (.-flusher z)) (cancel z))))))
(defn run [f fs n t]
(let [c (count fs)
i (iter fs)
z (->Process f n nil (object-array c) (object-array c) 0)]
(set! (.-flusher z)
#(let [its (.-iterators z)
cnt (alength its)]
(loop []
(let [flushed (loop [i 0
f 0]
(if (< i cnt)
(recur
(inc i)
(if-some [it (aget its i)]
(do (try @it (catch :default _))
(inc f)) f)) f))]
(if (zero? flushed)
(t) (when (zero? (set! (.-pending z) (+ (.-pending z) flushed)))
(recur)))))))
(loop [index 0]
(aset (.-iterators z) index
((.next i)
#(let [p (dec (.-pending z))]
(set! (.-pending z) p)
(when (zero? p) ((.-notifier z))))
#(do (aset (.-iterators z) index nil)
(set! (.-notifier z) (.-flusher z))
(let [p (set! (.-pending z) (dec (.-pending z)))]
(when-not (neg? p)
(cancel z)
(when (zero? p) ((.-notifier z))))))))
(when (.hasNext i) (recur (inc index))))
(when (zero? (set! (.-pending z) (+ (.-pending z) c)))
((.-notifier z))) z))