diff --git a/CHANGELOG b/CHANGELOG index f72006f..8a8f134 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,4 +1,4 @@ -2012-02-18 Roman Gonzalez +2012-02-19 Roman Gonzalez * release v0.1.0 * remove the `run>` and `gen-producer>` macros, `gen-producer` function * complete change on filter implemenation now they are binded to an @@ -9,6 +9,7 @@ compose them, the final function gets called with a `produce-eof` * add `p*` to bind filters into producers * add `*c` to bind filters into consumers + * add the `concat-producer` function * major refactoring of the tests to support this features * update of the README * update of the examples code diff --git a/src/river/core.clj b/src/river/core.clj index 684e64d..d634ecd 100644 --- a/src/river/core.clj +++ b/src/river/core.clj @@ -139,7 +139,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; -;; Utility macros and functions to run consumers +;; function to run consumers ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -148,11 +148,50 @@ [& more] (produce-eof (reduce #(%2 %1) (reverse more)))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; +;; Composing consumers together +;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (defmacro do-consumer [steps result] "Binds the river-m monadic implementation to the domonad macro, - check clojure.algo.monads/domonad for further info." + check clojure.algo.monads/domonad for further info. + + Example: + + > (def new-consumer + > (do-consumer [ + > _ (river.seq/drop-while #(not= 0)) + > n (river.seq/first) + > ] + > result)) + > + > (run (river.seq/produce-seq [20 3 4 0 5 6]) + > new-consumer) + > ; #river.core.ConsumerDone { :result 5 :remainder (6) }" `(monad/domonad river-m ~steps ~result)) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; +;; Concatanating producers together +;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn concat-producer [& producers] + "Concatenates two ore more producers, creating new producer + that's going stream both producers. + + Example: + + > (def new-producer + > (concat-producer (river.seq/produce-seq (range 1 10)) + > (river.seq/produce-seq (range 11 20)))) + > (run new-producer river.seq/consume) + > ; river.core.ConsumerDone { :result (range 1 20) :remainder eof }" + (fn new-producer [consumer] + (reduce #(%2 %1) consumer producers))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; ;; Filter functions @@ -160,6 +199,8 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn concat-stream [s1 s2] + "Concatenates two streams together; whenever a stream gets concatenated + with `river.core/eof`, the latter is returned." (cond (or (= s1 eof) (= s2 eof)) eof @@ -215,9 +256,13 @@ inner-consumer)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; +;; Binding filters to producers & consumers +;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn *c - "Binds a filter to a consumer." + "Binds one or more filters to a consumer." ([a-filter consumer] (letfn [ (check [step] @@ -238,7 +283,7 @@ (reduce #(*c %2 %1) (*c a-filter consumer) more)))) (defn p* - "Binds a filter to a producer." + "Binds one or more filters to a producer." ([producer a-filter] (fn new-producer [consumer] (let [new-consumer (produce-eof (producer (a-filter consumer)))] diff --git a/test/river/test/core.clj b/test/river/test/core.clj index b607a9d..ab62f9a 100644 --- a/test/river/test/core.clj +++ b/test/river/test/core.clj @@ -20,3 +20,10 @@ (is (= [1 3 5 7 9 11 13 15 17 19] (:result result))) (is eof (:remainder result)))) +(deftest concat-producers-test + (let [producer (concat-producer (rs/produce-seq (range 1 20)) + (rs/produce-seq (range 20 37))) + result (run producer + rs/consume)] + (is (= (range 1 37) (:result result))))) +