-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.cljc
77 lines (68 loc) · 1.85 KB
/
test.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
(ns prpr3.stream.test
(:require
[promesa.core :as pr]
[prpr3.promise :as prpr]
[prpr3.stream.protocols :as pt]
[prpr3.stream.transport :as stream.transport]
))
(defn stream-of
"returns a stream of the individual values
(*not* chunked)"
[vs]
(let [s (stream.transport/stream)]
(stream.transport/put-all-and-close! s vs)
s))
(defn consume
"consume a stream to a vector. an error
will be added to the end of the vector
as [::error <error>]"
[s]
(pr/loop [rs []]
(prpr/handle-always
(stream.transport/take! s ::drained)
(fn [v e]
(cond
(some? e) (conj rs [::error e])
(= ::drained v) rs
:else
(pr/recur (conj rs v)))))))
(defn safe-take!
"transport/take! (with unwrapping) from a stream returning
Promise<[::ok <val>]> | Promise<[::error <err>]>"
[s & args]
(prpr/handle-always
(apply stream.transport/take! s args)
(fn [v e]
(if (some? e)
[::error e]
[::ok v]))))
(defn safe-consume
"keep safe-take! ing until ::closed, returning
a vector of safe-take!s"
[s]
#_{:clj-kondo/ignore [:loop-without-recur]}
(pr/loop [r []]
(pr/let [[_t v :as t-v] (safe-take! s ::closed)]
(if (= ::closed v)
(conj r t-v)
(pr/recur (conj r t-v))))))
(defn safe-low-take!
"take! directly from a stream impl without any unwrapping
Promise<[::ok <val>]> | Promise<[::error <err>]>"
[s & args]
(prpr/handle-always
(apply pt/-take! s args)
(fn [v e]
(if (some? e)
[::error e]
[::ok v]))))
(defn safe-low-consume
"keep safe-low-take! ing until ::closed, returning
a vector of safe-low-take!s"
[s]
#_{:clj-kondo/ignore [:loop-without-recur]}
(pr/loop [r []]
(pr/let [[_t v :as t-v] (safe-low-take! s ::closed)]
(if (= ::closed v)
(conj r t-v)
(pr/recur (conj r t-v))))))