Permalink
Browse files

Replacing zip* into zip, now zip is a consumer

  • Loading branch information...
1 parent ba3b766 commit d233c4c46fbbf80075cf2c5b1d9eb69dbfe6464f @roman committed Feb 18, 2012
Showing with 33 additions and 24 deletions.
  1. +22 −12 src/river/seq.clj
  2. +11 −12 test/river/test/seq.clj
View
@@ -150,6 +150,28 @@
:else
(yield (core/first stream) stream))))
+(defn zip
+ "Multiplexes the stream into multiple consumers, each of the consumers
+ will be feed by the stream that this filter receives, this will return
+ a list of consumer results/continuations."
+ [& inner-consumers]
+ (letfn [
+ (consumer [inner-consumers stream]
+ (cond
+ (eof? stream)
+ (yield (map (comp :result produce-eof) inner-consumers)
+ stream)
+
+ (empty? stream)
+ (continue #(consumer inner-consumers %))
+
+ :else
+ (continue
+ #(consumer (for [c inner-consumers]
+ (ensure-done c stream))
+ %))))]
+ #(consumer inner-consumers %)))
+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Producers
@@ -281,18 +303,6 @@
(fn to-outer-consumer [inner-consumer]
((mapcat* (comp #(core/filter pred %) vector)) inner-consumer)))
-(defn zip*
- "Multiplexes the stream into multiple consumers, each of the consumers
- will be feed by the stream that this filter receives, this will return
- a list of consumer results/continuations."
- [& inner-consumers]
- (fn outer-consumer [stream]
- (cond
- (eof? stream)
- (for [c inner-consumers] (produce-eof c))
- :else
- (apply zip* (for [c inner-consumers] (ensure-done c stream))))))
-
(defn drop-while*
"Works similarly to the drop-while consumer, it will drop elements from
the stream until pred holds false, at that point the given inner-consumer
View
@@ -75,7 +75,7 @@
(deftest drop-while-test
(let [result (run (rs/produce-seq 7 (range 1 20))
- (rs/drop-while #(<= % 10)))]
+ (rs/drop-while #(<= % 10)))]
(is (nil? (:result result)))
(is (= (range 11 15) (:remainder result)))))
@@ -97,6 +97,15 @@
(is (= 1 (:result result)))
(is (= (range 1 8) (:remainder result)))))
+(deftest zip-test
+ (let [new-consumer (*c rs/consume
+ (rs/mapcat* #(vector % %)))
+ result (run (rs/produce-seq 7 (range 1 4))
+ (rs/zip new-consumer
+ rs/consume))]
+ (is (= [[1 1 2 2 3 3] [1 2 3]] (:result result)))
+ (is (= eof (:remainder result)))))
+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Filters
@@ -128,16 +137,6 @@
(is (= [0 2 4 6 8] (:result result)))
(is (= [9 10] (:remainder result)))))
-
-(deftest zip*-test
- (let [result (run (rs/produce-seq 7 (range 1 4))
- (rs/zip*)
- [(rs/mapcat* #(vector % %) rs/consume)
- rs/consume])]
- (is (= [[1 1 2 2 3 3] [1 2 3]] (map :result result)))
- (is (= [eof eof] (map :remainder result)))))
-
-
(deftest drop-while*-test
(let [new-producer (p* (rs/produce-seq 6 (range 1 20))
(rs/drop-while* not-fizzbuzz))
@@ -148,7 +147,7 @@
(deftest isolate*-test
- (let [new-consumer (*c rs/consume
+ (let [new-consumer (*c rs/consume
(rs/isolate* 5))
result (run (rs/produce-seq 7 (range 1 10000))
new-consumer)]

0 comments on commit d233c4c

Please sign in to comment.