/
async_cljs.cljs
82 lines (72 loc) · 2.18 KB
/
async_cljs.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
(ns com.wsscode.async.async-cljs
(:require-macros [com.wsscode.async.async-cljs :refer [go go-promise <?maybe]])
(:require [cljs.core.async :as async]
[cljs.core.async.impl.protocols :as async.prot]
[goog.object :as gobj]))
(defn chan?
"Check if c is a core.async channel."
[c]
(satisfies? async.prot/ReadPort c))
(defn promise?
"Return true if X is a js obj with the property .then available as a fn."
[x]
(try
(fn? (gobj/get x "then"))
(catch :default _ false)))
(defn promise->chan
"Converts promise p in a promise-chan. The response of this channel should be consumed
using `consume-pair`."
[p]
(let [c (async/promise-chan)]
(.then p
#(async/put! c {:success %})
#(async/put! c {:error %}))
c))
(defn consumer-pair
"Consume promise channel result pair."
[resp]
(if (contains? resp :error)
(throw (:error resp))
(:success resp)))
(defn error?
"Returns true if err is an error object."
[err]
(instance? js/Error err))
(defn throw-err
"Throw error x if x is an error."
[x]
(if (error? x)
(throw x)
x))
(defn timeout-chan
"Returns a channel that will respond will c, or an error after timeout-ms."
[timeout-ms c]
(go-promise
(let [timer (async/timeout timeout-ms)
[res ch] (async/alts! [c timer] :priority true)]
(if (= ch timer)
(throw (ex-info "Timeout" {:timeout-ms timeout-ms}))
res))))
(defn pulling-retry*
[options f]
(let [options' (if (map? options)
options
{::done? options})
{::keys [done? timeout retry-ms]
:or {retry-ms 10
timeout 2000}} options'
*stop? (atom false)
res (timeout-chan timeout
(go-promise
(loop []
(when-not @*stop?
(let [res (<?maybe (f))]
(if (done? res)
res
(do
(async/<! (async/timeout retry-ms))
(recur))))))))]
(go
(async/<! (async/timeout timeout))
(reset! *stop? true))
res))