Skip to content

Commit

Permalink
Adding multiple arity to p* and *c
Browse files Browse the repository at this point in the history
  • Loading branch information
roman committed Feb 18, 2012
1 parent d233c4c commit 85c9646
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 41 deletions.
58 changes: 33 additions & 25 deletions src/river/core.clj
Expand Up @@ -186,7 +186,7 @@
; ^ this function will feed all the stream possible
; to the filter consumer (consumer*), once the whole
; stream is empty, we return whatever the consumer* was
; able to parse from it, and the current state of
; able to parse from it, and the current state of
; consumer*
(let [new-stream (concat (:remainder consumer*) stream)]
(cond
Expand All @@ -202,7 +202,7 @@
(cond
(eof? stream)
(let [final-result (produce-eof consumer*)]
(yield (inner-consumer [(:result final-result)])
(yield (inner-consumer [(:result final-result)])
stream))

(empty? stream)
Expand All @@ -218,29 +218,37 @@
inner-consumer))))

(defn *c
"..."
[consumer some-filter]
(letfn [
(check [step]
(cond
(continue? step) (recur (produce-eof step))
(yield? step) step
:else
(throw (Exception. "Something terrible happened!"))))]
(do-consumer [
:let [outer-consumer (some-filter consumer)]
inner-consumer outer-consumer
result (check inner-consumer)]
result)))
"Binds a filter to a consumer."
([a-filter consumer]
(letfn [
(check [step]
(cond
(continue? step) (recur (produce-eof step))
(yield? step) step
:else
(throw (Exception. "Something terrible happened!"))))]
(do-consumer [
:let [outer-consumer (a-filter consumer)]
inner-consumer outer-consumer
result (check inner-consumer)]
result)))

([[ _ _ & _ :as filters]]
(let [[consumer a-filter & more] (reverse filters)]
(reduce #(*c %2 %1) (*c a-filter consumer) more))))

(defn p*
"..."
[producer some-filter]
(fn new-producer [consumer]
(let [new-consumer (produce-eof (producer (some-filter consumer)))]
(cond
(yield? new-consumer)
(:result new-consumer)
:else
(throw (Exception. "attach-filter: missbehaving consumer"))))))
"Binds a filter to a producer."
([producer a-filter]
(fn new-producer [consumer]
(let [new-consumer (produce-eof (producer (a-filter consumer)))]
(cond
(yield? new-consumer)
(:result new-consumer)
:else
(throw (Exception. "attach-filter: missbehaving consumer"))))))

([producer a-filter & more]
(reduce p* (p* producer a-filter) more)))


32 changes: 16 additions & 16 deletions test/river/test/seq.clj
Expand Up @@ -98,8 +98,8 @@
(is (= (range 1 8) (:remainder result)))))

(deftest zip-test
(let [new-consumer (*c rs/consume
(rs/mapcat* #(vector % %)))
(let [new-consumer (*c (rs/mapcat* #(vector % %))
rs/consume)
result (run (rs/produce-seq 7 (range 1 4))
(rs/zip new-consumer
rs/consume))]
Expand All @@ -113,8 +113,8 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(deftest mapcat*-test
(let [new-consumer (*c rs/consume
(rs/mapcat* #(vector % %)))
(let [new-consumer (*c (rs/mapcat* #(vector % %))
rs/consume)
result (run (rs/produce-seq 7 (range 1 4))
new-consumer)]
(is (= [1 1 2 2 3 3] (:result result)))
Expand All @@ -130,8 +130,8 @@


(deftest filter*-test
(let [new-consumer (*c (rs/take 5)
(rs/filter* #(= 0 (mod % 2))))
(let [new-consumer (*c (rs/filter* #(= 0 (mod % 2)))
(rs/take 5))
result (run (rs/produce-seq (range 0 11))
new-consumer)]
(is (= [0 2 4 6 8] (:result result)))
Expand All @@ -147,8 +147,8 @@


(deftest isolate*-test
(let [new-consumer (*c rs/consume
(rs/isolate* 5))
(let [new-consumer (*c (rs/isolate* 5)
rs/consume )
result (run (rs/produce-seq 7 (range 1 10000))
new-consumer)]
(is (= (range 1 6) (:result result)))
Expand All @@ -168,8 +168,8 @@
(is (thrown-with-msg? Exception #"require*"
(run new-producer
rs/consume))))
(let [new-consumer (*c rs/consume
(rs/require* 8))]
(let [new-consumer (*c (rs/require* 8)
rs/consume)]
(is (thrown-with-msg? Exception #"require*"
(run (rs/produce-seq 2 (range 1 8))
new-consumer)))))
Expand All @@ -181,8 +181,8 @@
rs/consume)]
(is (yield? result))
(is (= [1 2 3 4 5 6 7] (:result result))))
(let [new-consumer (*c rs/consume
(rs/require* 1))
(let [new-consumer (*c (rs/require* 1)
rs/consume)
result (run (rs/produce-seq (range 1 8))
new-consumer)]
(is (yield? result))
Expand All @@ -195,8 +195,8 @@
rs/consume)]
(is (= (range 1 15) (:result result)))
(is (range 15 20) (:remainder result)))
(let [new-consumer (*c rs/consume
(rs/stream-while* not-fizzbuzz))
(let [new-consumer (*c (rs/stream-while* not-fizzbuzz)
rs/consume)
result (run (rs/produce-seq 10 (range 1 20))
new-consumer)]
(is (= (range 1 15) (:result result)))
Expand All @@ -210,8 +210,8 @@
rs/consume)]
(is (= [[1 2 3] [4 5 6] [7 8 9] [10 11]] (:result result)))
(is eof (:remainder result)))
(let [new-consumer (*c rs/consume
(rs/split-when* #(= 0 (mod % 3))))
(let [new-consumer (*c (rs/split-when* #(= 0 (mod % 3)))
rs/consume)
result (run (rs/produce-seq 10 (range 1 12))
new-consumer)]
(is (= [[1 2 3] [4 5 6] [7 8 9] [10 11]] (:result result)))
Expand Down

0 comments on commit 85c9646

Please sign in to comment.