Skip to content

Commit

Permalink
Updated examples. Fixed bug in stateful stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
markostanimirovic committed Jul 7, 2019
1 parent a3485c2 commit df1d3b2
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
3 changes: 1 addition & 2 deletions examples/hello-world/src/hello_world/stateful_stream.clj
@@ -1,9 +1,8 @@
(ns hello-world.stateful-stream
(:require [re-streamer.stateful-stream :as stateful-stream]))

;; stateful-stream

;; create stateful stream with initial value
;; note: if you don't pass it, initial value will be nil
(def foo (stateful-stream/create 10))

;; subscribe to the stateful stream
Expand Down
2 changes: 0 additions & 2 deletions examples/hello-world/src/hello_world/stream.clj
@@ -1,8 +1,6 @@
(ns hello-world.stream
(:require [re-streamer.stream :as stream]))

;; stream

;; create stream
(def bar (stream/create))

Expand Down
2 changes: 1 addition & 1 deletion project.clj
@@ -1,4 +1,4 @@
(defproject org.clojars.stanimirovic/re-streamer "0.2.1"
(defproject org.clojars.stanimirovic/re-streamer "0.2.2"
:description "Simple Clojure library for reactive programming"
:url "https://github.com/stanimirovic/re-streamer"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
Expand Down
36 changes: 21 additions & 15 deletions src/re_streamer/stateful_stream.cljc
@@ -1,6 +1,6 @@
(ns re-streamer.stateful-stream
#?(:cljs (:require [reagent.core :as reagent]))
(:refer-clojure :rename {map c-map distinct c-distinct}))
(:refer-clojure :rename {map c-map distinct c-distinct filter c-filter}))

(defn create
([] (create nil))
Expand Down Expand Up @@ -28,31 +28,37 @@

;; operators

(defn map [stream f]
(defn map [stream f watcher-key]
(let [state #?(:cljs (reagent/atom (f @(:state stream)))
:default (atom (f @(:state stream))))
subs (atom #{})]

(add-watch (:state stream) :watch #(reset! state (f %4)))
(add-watch (:state stream) watcher-key #(reset! state (f %4)))
(add-watch state :watch #(doseq [sub @subs] (sub %4)))

{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state))
:state state}))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state))
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:state state}))

(defn pluck [stream keys]
(map stream #(select-keys % keys)))
(defn pluck [stream keys watcher-key]
(map stream #(select-keys % keys) watcher-key))

(defn distinct [stream f]
(defn distinct [stream f watcher-key]
(let [state #?(:cljs (reagent/atom @(:state stream))
:default (atom @(:state stream)))
subs (atom #{})]

(add-watch (:state stream) :watch #(if (not (f @state %4)) (reset! state %4)))
(add-watch (:state stream) watcher-key #(if (not (f @state %4)) (reset! state %4)))
(add-watch state :watch #(doseq [sub @subs] (sub %4)))

{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state))
:state state}))
{:subscribe! (fn [sub]
(swap! subs conj sub)
(sub @state))
:unsubscribe! (fn [sub]
(swap! subs disj sub)
nil)
:state state}))

0 comments on commit df1d3b2

Please sign in to comment.