Skip to content

Commit

Permalink
Adding the concat-producer function
Browse files Browse the repository at this point in the history
  • Loading branch information
roman committed Feb 20, 2012
1 parent ad5690e commit 6ca9e52
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG
@@ -1,4 +1,4 @@
2012-02-18 Roman Gonzalez 2012-02-19 Roman Gonzalez
* release v0.1.0 * release v0.1.0
* remove the `run>` and `gen-producer>` macros, `gen-producer` function * remove the `run>` and `gen-producer>` macros, `gen-producer` function
* complete change on filter implemenation now they are binded to an * complete change on filter implemenation now they are binded to an
Expand All @@ -9,6 +9,7 @@
compose them, the final function gets called with a `produce-eof` compose them, the final function gets called with a `produce-eof`
* add `p*` to bind filters into producers * add `p*` to bind filters into producers
* add `*c` to bind filters into consumers * add `*c` to bind filters into consumers
* add the `concat-producer` function
* major refactoring of the tests to support this features * major refactoring of the tests to support this features
* update of the README * update of the README
* update of the examples code * update of the examples code
Expand Down
53 changes: 49 additions & 4 deletions src/river/core.clj
Expand Up @@ -139,7 +139,7 @@


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ;;
;; Utility macros and functions to run consumers ;; function to run consumers
;; ;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;


Expand All @@ -148,18 +148,59 @@
[& more] [& more]
(produce-eof (reduce #(%2 %1) (reverse more)))) (produce-eof (reduce #(%2 %1) (reverse more))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Composing consumers together
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defmacro do-consumer [steps result] (defmacro do-consumer [steps result]
"Binds the river-m monadic implementation to the domonad macro, "Binds the river-m monadic implementation to the domonad macro,
check clojure.algo.monads/domonad for further info." check clojure.algo.monads/domonad for further info.
Example:
> (def new-consumer
> (do-consumer [
> _ (river.seq/drop-while #(not= 0))
> n (river.seq/first)
> ]
> result))
>
> (run (river.seq/produce-seq [20 3 4 0 5 6])
> new-consumer)
> ; #river.core.ConsumerDone { :result 5 :remainder (6) }"
`(monad/domonad river-m ~steps ~result)) `(monad/domonad river-m ~steps ~result))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Concatanating producers together
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defn concat-producer [& producers]
"Concatenates two ore more producers, creating new producer
that's going stream both producers.
Example:
> (def new-producer
> (concat-producer (river.seq/produce-seq (range 1 10))
> (river.seq/produce-seq (range 11 20))))
> (run new-producer river.seq/consume)
> ; river.core.ConsumerDone { :result (range 1 20) :remainder eof }"
(fn new-producer [consumer]
(reduce #(%2 %1) consumer producers)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ;;
;; Filter functions ;; Filter functions
;; ;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;


(defn concat-stream [s1 s2] (defn concat-stream [s1 s2]
"Concatenates two streams together; whenever a stream gets concatenated
with `river.core/eof`, the latter is returned."
(cond (cond
(or (= s1 eof) (or (= s1 eof)
(= s2 eof)) eof (= s2 eof)) eof
Expand Down Expand Up @@ -215,9 +256,13 @@
inner-consumer)))) inner-consumer))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Binding filters to producers & consumers
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;


(defn *c (defn *c
"Binds a filter to a consumer." "Binds one or more filters to a consumer."
([a-filter consumer] ([a-filter consumer]
(letfn [ (letfn [
(check [step] (check [step]
Expand All @@ -238,7 +283,7 @@
(reduce #(*c %2 %1) (*c a-filter consumer) more)))) (reduce #(*c %2 %1) (*c a-filter consumer) more))))


(defn p* (defn p*
"Binds a filter to a producer." "Binds one or more filters to a producer."
([producer a-filter] ([producer a-filter]
(fn new-producer [consumer] (fn new-producer [consumer]
(let [new-consumer (produce-eof (producer (a-filter consumer)))] (let [new-consumer (produce-eof (producer (a-filter consumer)))]
Expand Down
7 changes: 7 additions & 0 deletions test/river/test/core.clj
Expand Up @@ -20,3 +20,10 @@
(is (= [1 3 5 7 9 11 13 15 17 19] (:result result))) (is (= [1 3 5 7 9 11 13 15 17 19] (:result result)))
(is eof (:remainder result)))) (is eof (:remainder result))))


(deftest concat-producers-test
(let [producer (concat-producer (rs/produce-seq (range 1 20))
(rs/produce-seq (range 20 37)))
result (run producer
rs/consume)]
(is (= (range 1 37) (:result result)))))

0 comments on commit 6ca9e52

Please sign in to comment.