/
async_clj.clj
72 lines (60 loc) · 1.54 KB
/
async_clj.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
(ns com.wsscode.common.async-clj
"DEPRECATED: please use com.wsscode.async.async-clj instead"
(:require
[clojure.core.async :as async]
[clojure.core.async.impl.protocols :as async.prot]))
(defmacro if-cljs
[then else]
(if (:ns &env) then else))
(defn chan? [c]
(satisfies? async.prot/ReadPort c))
(defmacro go-catch [& body]
`(async/go
(try
~@body
(catch Throwable e# e#))))
(defn error? [err]
(instance? Throwable err))
(defn throw-err [x]
(if (error? x)
(throw x)
x))
(defmacro <? [ch]
`(throw-err (async/<! ~ch)))
(defmacro <?maybe [x]
`(let [res# ~x]
(if (chan? res#) (<? res#) res#)))
(defmacro <!maybe [x]
`(let [res# ~x]
(if (chan? res#) (async/<! res#) res#)))
(defmacro <!!maybe [x]
`(let [res# ~x]
(if (chan? res#) (async/<!! res#) res#)))
(defmacro let-chan
"Handles a possible channel on value."
[[name value] & body]
`(let [res# ~value]
(if (chan? res#)
(go-catch
(let [~name (<? res#)]
~@body))
(let [~name res#]
~@body))))
(defmacro let-chan*
"Like let-chan, but async errors will be returned instead of propagated"
[[name value] & body]
`(let [res# ~value]
(if (chan? res#)
(go-catch
(let [~name (async/<! res#)]
~@body))
(let [~name res#]
~@body))))
(defmacro go-promise [& body]
`(let [ch# (async/promise-chan)]
(async/go
(let [res# (try
~@body
(catch Throwable e# e#))]
(async/put! ch# res#)))
ch#))