diff --git a/examples/hello-world/src/hello_world/stateful_stream.clj b/examples/hello-world/src/hello_world/stateful_stream.clj index dea329a..ed5b7db 100644 --- a/examples/hello-world/src/hello_world/stateful_stream.clj +++ b/examples/hello-world/src/hello_world/stateful_stream.clj @@ -1,9 +1,8 @@ (ns hello-world.stateful-stream (:require [re-streamer.stateful-stream :as stateful-stream])) -;; stateful-stream - ;; create stateful stream with initial value +;; note: if you don't pass it, initial value will be nil (def foo (stateful-stream/create 10)) ;; subscribe to the stateful stream diff --git a/examples/hello-world/src/hello_world/stream.clj b/examples/hello-world/src/hello_world/stream.clj index e08ced8..dae1718 100644 --- a/examples/hello-world/src/hello_world/stream.clj +++ b/examples/hello-world/src/hello_world/stream.clj @@ -1,8 +1,6 @@ (ns hello-world.stream (:require [re-streamer.stream :as stream])) -;; stream - ;; create stream (def bar (stream/create)) diff --git a/project.clj b/project.clj index 99322db..e56a9e7 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject org.clojars.stanimirovic/re-streamer "0.2.1" +(defproject org.clojars.stanimirovic/re-streamer "0.2.2" :description "Simple Clojure library for reactive programming" :url "https://github.com/stanimirovic/re-streamer" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" diff --git a/src/re_streamer/stateful_stream.cljc b/src/re_streamer/stateful_stream.cljc index ad160d6..6dc3813 100644 --- a/src/re_streamer/stateful_stream.cljc +++ b/src/re_streamer/stateful_stream.cljc @@ -1,6 +1,6 @@ (ns re-streamer.stateful-stream #?(:cljs (:require [reagent.core :as reagent])) - (:refer-clojure :rename {map c-map distinct c-distinct})) + (:refer-clojure :rename {map c-map distinct c-distinct filter c-filter})) (defn create ([] (create nil)) @@ -28,31 +28,37 @@ ;; operators -(defn map [stream f] +(defn map [stream f watcher-key] (let [state #?(:cljs (reagent/atom (f @(:state stream))) :default (atom (f @(:state stream)))) subs (atom #{})] - (add-watch (:state stream) :watch #(reset! state (f %4))) + (add-watch (:state stream) watcher-key #(reset! state (f %4))) (add-watch state :watch #(doseq [sub @subs] (sub %4))) - {:subscribe! (fn [sub] - (swap! subs conj sub) - (sub @state)) - :state state})) + {:subscribe! (fn [sub] + (swap! subs conj sub) + (sub @state)) + :unsubscribe! (fn [sub] + (swap! subs disj sub) + nil) + :state state})) -(defn pluck [stream keys] - (map stream #(select-keys % keys))) +(defn pluck [stream keys watcher-key] + (map stream #(select-keys % keys) watcher-key)) -(defn distinct [stream f] +(defn distinct [stream f watcher-key] (let [state #?(:cljs (reagent/atom @(:state stream)) :default (atom @(:state stream))) subs (atom #{})] - (add-watch (:state stream) :watch #(if (not (f @state %4)) (reset! state %4))) + (add-watch (:state stream) watcher-key #(if (not (f @state %4)) (reset! state %4))) (add-watch state :watch #(doseq [sub @subs] (sub %4))) - {:subscribe! (fn [sub] - (swap! subs conj sub) - (sub @state)) - :state state})) + {:subscribe! (fn [sub] + (swap! subs conj sub) + (sub @state)) + :unsubscribe! (fn [sub] + (swap! subs disj sub) + nil) + :state state}))