/
stateful_stream.cljc
125 lines (109 loc) · 3.91 KB
/
stateful_stream.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
(ns re-streamer.stateful-stream
#?(:cljs (:require [reagent.core :as reagent]))
(:refer-clojure :rename {map c-map distinct c-distinct filter c-filter}))
(defn create
([] (create nil))
([val]
(let [subs (atom #{})
state #?(:cljs (reagent/atom val)
:default (atom val))]
(add-watch state :watch #(doseq [sub @subs] (sub %4)))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state)
sub)
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:emit! (fn [val]
(reset! state val)
nil)
:flush! (fn []
(reset! subs #{})
nil)
:destroy! (fn []
(remove-watch state :watch)
nil)
:state state})))
;; watcher keys generator
(defonce ^:private watcher-key
(let [counter (atom 0)]
#(swap! counter inc)))
;; operators
(defn map
([stream f]
(map stream f (watcher-key)))
([stream f watcher-key]
(let [state #?(:cljs (reagent/atom (f @(:state stream)))
:default (atom (f @(:state stream))))
subs (atom #{})]
(add-watch (:state stream) watcher-key #(reset! state (f %4)))
(add-watch state :watch #(doseq [sub @subs] (sub %4)))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state)
sub)
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:flush! (fn []
(reset! subs #{})
nil)
:destroy! (fn []
(remove-watch (:state stream) watcher-key)
(remove-watch state :watch)
nil)
:state state})))
(defn pluck
([stream keys]
(pluck stream keys (watcher-key)))
([stream keys watcher-key]
(map stream #(select-keys % keys) watcher-key)))
(defn distinct
([stream f]
(distinct stream f (watcher-key)))
([stream f watcher-key]
(let [state #?(:cljs (reagent/atom @(:state stream))
:default (atom @(:state stream)))
subs (atom #{})]
(add-watch (:state stream) watcher-key #(if (not (f @state %4)) (reset! state %4)))
(add-watch state :watch #(doseq [sub @subs] (sub %4)))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state)
sub)
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:flush! (fn []
(reset! subs #{})
nil)
:destroy! (fn []
(remove-watch (:state stream) watcher-key)
(remove-watch state :watch)
nil)
:state state})))
(defn filter
([stream f]
(filter stream f (watcher-key)))
([stream f watcher-key]
(let [state #?(:cljs (reagent/atom (if (f @(:state stream)) @(:state stream) nil))
:default (atom (if (f @(:state stream)) @(:state stream) nil)))
subs (atom #{})]
(add-watch (:state stream) watcher-key #(if (f %4) (reset! state %4)))
(add-watch state :watch #(doseq [sub @subs] (sub %4)))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(if (f @(:state stream)) (sub @state))
sub)
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:flush! (fn []
(reset! subs #{})
nil)
:destroy! (fn []
(remove-watch (:state stream) watcher-key)
(remove-watch state :watch)
nil)
:state state})))