-
Notifications
You must be signed in to change notification settings - Fork 1
/
types.cljc
119 lines (97 loc) · 2.73 KB
/
types.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
(ns promisespromises.stream.types
(:require
#?(:cljs [cljs.core :refer [IEquiv]])
[promesa.core :as pr]
[promisespromises.stream.protocols :as pt]))
(extend-protocol pt/IStreamValue
#?(:clj Object :cljs default)
(-unwrap-value [this] this)
nil
(-unwrap-value [_this] nil))
;; core.async channels don't support nil values,
;; but we would like clj and cljs to be as similar
;; as possible, so we'll wrap nils when we are
;; using core.async
(deftype StreamNil []
pt/IStreamValue
(-unwrap-value [_] nil)
#?@(:clj
[Object
(equals [_a b]
(instance? StreamNil b))]
:cljs
[IEquiv
(-equiv [this other] (and
(= (type this) (type other))
(= (.-x this) (.-x other))))]))
(defn stream-nil
[]
(->StreamNil))
(defn stream-nil?
[v]
(instance? StreamNil v))
(deftype StreamPromise [p]
pt/IStreamValue
(-unwrap-value [_] p))
(defn stream-promise
[p]
(->StreamPromise p))
(defn stream-promise?
[v]
(instance? StreamPromise v))
;; neither core.async nor manifold have error-states on
;; streams/chans - so we'll model errors by putting a
;; wrapped value onto a stream and closing it immediately
;; thereafter. whenever an error value is taken from a
;; stream it will result in an errored promise or a
;; downstream stream also getting a wrapped error-value/closed
(deftype StreamError [err]
pt/IStreamError
(-unwrap-error [_] err)
pt/IStreamValue
(-unwrap-value [_]
;; (warn err "unwrapping StreamError" (ex-data err))
(throw err)))
(defn stream-error?
[v]
(instance? StreamError v))
(defn stream-error
[err]
(if (stream-error? err)
err
(->StreamError err)))
(extend-protocol pt/IStreamChunk
#?(:clj Object
:cljs default)
(-chunk-flatten [this] this)
nil
(-chunk-flatten [_] nil))
(declare ->StreamChunk)
(deftype StreamChunk [values]
pt/IStreamChunk
(-chunk-values [_] values)
(-chunk-flatten [_]
(pr/let [realized-values (pr/all values)]
(->StreamChunk realized-values)))
#?@(:clj [Object
(equals [a b]
(and (instance? StreamChunk b)
(= (.-values a) (.-values b))))]
:cljs [IEquiv
(-equiv [a b]
(and (instance? StreamChunk b)
(= (.-values a) (.-values b))))]))
#?(:clj
(defmethod print-method StreamChunk [x writer]
(.write writer "#Chunk<")
(print-method (pt/-chunk-values x) writer)
(.write writer ">")))
(defn stream-chunk?
[v]
(instance? StreamChunk v))
(defn stream-chunk
[values]
(let [values (vec values)]
(when (<= (count values) 0)
(throw (ex-info "empty chunk not allowed" {})))
(->StreamChunk values)))