-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.clj
86 lines (74 loc) · 2.12 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
(ns serum.async
(:require
[clojure.core.async :as a]
[clojure.tools.logging :as log]))
(defmacro fire!
"execute 'body' on an asynchronous worker thread via a core.async channel.
(fire! and forget)"
[body]
`(a/go
(let [channel# (a/chan)]
(a/>!
channel#
(do
(try
(~@body)
(catch
Throwable
exception#
(do
(log/error "fire! caught exception on worker thread:")
(log/error exception#))))
(a/close! channel#)
true)))))
;; `throw-exceptions`, `<?` and `<??` adapted from:
;; http://martintrojer.github.io/clojure/2014/03/09/working-with-coreasync-exceptions-in-go-blocks
(defn throw-exceptions
[x]
(when (instance? Throwable x)
(throw x))
x)
(defmacro <?
[channel]
`(throw-exceptions (a/<! ~channel)))
(defmacro <??
[channel]
`(throw-exceptions (a/<!! ~channel)))
(defn chan->seq
"recursive function which derives a lazy sequence from a core.async channel, `channel`.
Intended for use on main thread."
[channel]
(lazy-seq
;; the reflection within `<??` shouldn't be too slow for many use cases
(when-let [took (<?? channel)]
(cons took (chan->seq channel)))))
(defn map-async
"an asynchronous implementation of `map`
returns a lazy sequence of the asynchronous evaluation of `f` applied to members of the collection `coll`
will block until all asynchronous evaluation completes.
Intended for use on main thread.
`f` single-argument function
`n` parallelization parameter
`coll` collection applied to the function `f`"
([f coll]
(map-async
f
(.. Runtime getRuntime availableProcessors)
coll))
([f n coll]
(let [input-channel (a/chan n)
output-channel (a/chan n)]
(a/onto-chan input-channel coll)
(a/pipeline-blocking
n
output-channel
(map
(fn [x]
(try
(f x)
(catch Throwable e
(do
(a/close! input-channel)
e)))))
input-channel)
(chan->seq output-channel))))