-
Notifications
You must be signed in to change notification settings - Fork 11
/
async.clj
115 lines (100 loc) · 3.76 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
(ns net.core.async
"Small extensions and improvements on top of core.async
Some bits shamelessly stolen from @mpenet's jet,
See https://github.com/mpenet/jet for original"
(:require [clojure.core.async :as a]
[clojure.spec.alpha :as s]))
(defn backpressure-handling-fn
[status backpressure! close!]
(fn [result]
(cond
(not result) (when (fn? close!) (close!))
(compare-and-set! status ::sending ::sent) nil
(compare-and-set! status ::paused ::sent) (backpressure! false))))
(defn drain
([chan f]
(loop []
(let [v (a/poll! chan)]
(when (some? v)
(when f
(f v))
(recur)))))
([chan]
(drain chan nil)))
(defn close-draining [chan f]
(a/close! chan)
(drain chan f))
(defn put!
"Takes a `ch`, a `msg`, a single arg function that when passed
`true` enables backpressure and when passed `false` disables it,
and a no-arg function which, when invoked, closes the upstream
source."
([ch msg backpressure! close!]
(let [status (atom ::sending)]
(a/put! ch msg (backpressure-handling-fn status backpressure! close!))
;; it's still sending, means it's parked, so suspend source
(when (compare-and-set! status ::sending ::paused)
(backpressure! true))))
([ch msg backpressure!]
(put! ch msg backpressure! nil)))
(defn validating-fn
"Yield correct predicate based on assertion preference."
[spec always-assert?]
(if always-assert?
(fn [x] (s/assert* spec x))
(fn [x] (s/assert spec x))))
(defn validating-promise-chan
"A promise chan which ensures that values produced
to it match a given spec. Failing to match the spec
will produce `error-value` on the channel.
When `error-value` is a function, call it with no
args to produce the error value, or produce
`error-value` itself.
When `always-assert?` is provided, force asserts,
regardless of the value of `*clojure.core/compile-asserts*`,
otherwise, and by default honor the value.
The 1-arity version produces nil on the chan in case of errors."
([spec error-value always-assert?]
(a/promise-chan (map (validating-fn spec always-assert?))
(fn [_] (if (fn? error-value)
(error-value)
error-value))))
([spec error-value]
(validating-promise-chan spec error-value false))
([spec]
(validating-promise-chan spec nil false)))
(defn validating-chan
"A chan which ensures that values produced
to it match a given spec. Failing to match the spec
will produce `error-value` on the channel.
When `error-value` is a function, call it with no
args to produce the error value, or produce
`error-value` itself.
When `always-assert?` is provided, force asserts,
regardless of the value of `*clojure.core/compile-asserts*`,
otherwise, and by default honor the value.
The 1-arity version produces nil on the chan in case of errors."
([spec buf-or-n error-value always-assert?]
(a/chan buf-or-n (map (validating-fn spec always-assert?))
(fn [_] (if (fn? error-value)
(error-value)
error-value))))
([spec buf-or-n error-value]
(validating-chan spec buf-or-n error-value false))
([spec buf-or-n]
(validating-chan spec buf-or-n nil false)))
(defn timeout-pipe
"A variation on `clojure.core.async/pipe` which will
close if no input is submitted within a given interval."
([max-wait from to close?]
(a/go-loop []
(let [tm (a/timeout max-wait)]
(if (a/alt! from ([v] (when (some? v) (a/>! to v)))
tm ([_] false))
(recur)
(when close?
(a/close! from)
(a/close! to)))))
to)
([tmval from to]
(timeout-pipe tmval from to true)))