Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Reimplementing the `to-filter` function

Also reimplementing the stream-while* filter.
  • Loading branch information...
commit 4bdfa91539263d6afceba42ae54ec713f7ebd148 1 parent 03a6a2d
@roman authored
Showing with 81 additions and 56 deletions.
  1. +54 −33 src/river/core.clj
  2. +18 −21 src/river/seq.clj
  3. +9 −2 test/river/test/seq.clj
View
87 src/river/core.clj
@@ -137,39 +137,6 @@
(println stream)
(continue print-chunks))))
-(defn- gen-filter-fn [filter-consumer0 filter-consumer inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (cond
- (yield? filter-consumer)
- (let [filter-result (:result filter-consumer)
- filter-remainder (:remainder filter-consumer)
- next-inner-consumer (inner-consumer [filter-result])]
-
- (if (no-remainder? filter-consumer)
- (recur filter-consumer0
- filter-consumer
- (ensure-done next-inner-consumer filter-remainder))
-
- (recur filter-consumer0
- (filter-consumer0 filter-remainder)
- next-inner-consumer)))
-
- :else
- (fn outer-consumer [stream]
- (gen-filter-fn filter-consumer0
- (filter-consumer stream)
- inner-consumer)))))
-
-(defn to-filter
- "Transforms a consumer into a filter by feeding the outer input elements
- into the provided consumer until it yields an inner input, passes that to
- the inner consumer and then loops."
- [filter-consumer0 inner-consumer]
- (gen-filter-fn filter-consumer0 filter-consumer0 inner-consumer))
-
-
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Utility macros and functions to run consumers
@@ -256,6 +223,60 @@
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn concat-stream [s1 s2]
+ (cond
+ (or (= s1 eof)
+ (= s2 eof)) eof
+ :else (concat s1 s2)))
+
+(defn ensure-inner-done
+ ([f consumer] (ensure-inner-done [] f consumer))
+ ([extra f consumer]
+ (fn [stream]
+ (cond
+ (yield? consumer) (yield consumer (concat-stream extra stream))
+ :else (f consumer (concat-stream extra stream))))))
+
+(defn to-filter
+ "Transforms a consumer into a filter by feeding the outer input elements
+ into the provided consumer until it yields an inner input, passes that to
+ the inner consumer and then loops."
+ [consumer0*]
+ (letfn [
+ (loop-consumer* [acc consumer* stream]
+ ; ^ this function will feed all the stream possible
+ ; to the filter consumer (consumer*), once the whole
+ ; stream is empty, we return whatever the consumer* was
+ ; able to parse from it, and the current state of
+ ; consumer*
+ (let [new-stream (concat (:remainder consumer*) stream)]
+ (cond
+ (empty? new-stream) [acc consumer*]
+ (yield? consumer*)
+ (recur (conj acc (:result consumer*))
+ consumer0*
+ (concat (:remainder consumer*) stream))
+ (continue? consumer*)
+ (recur acc (consumer* stream) []))))
+
+ (outer-consumer [consumer* inner-consumer stream]
+ (cond
+ (eof? stream)
+ (let [final-result (produce-eof consumer*)]
+ (yield (inner-consumer [(:result final-result)])
+ stream))
+
+ (empty? stream)
+ (continue #(outer-consumer consumer* inner-consumer %))
+
+ :else
+ (let [[new-stream consumer1*] (loop-consumer* [] consumer* stream)]
+ (ensure-inner-done (partial outer-consumer consumer1*)
+ (inner-consumer new-stream)))))]
+
+ (fn to-outer-consumer [inner-consumer]
+ (ensure-inner-done (partial outer-consumer consumer0*)
+ inner-consumer))))
(defn attach-to-consumer
"..."
View
39 src/river/seq.clj
@@ -227,12 +227,6 @@
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-(defn ensure-inner-done [f consumer]
- (fn [stream]
- (cond
- (yield? consumer) (yield consumer stream)
- :else (f consumer stream))))
-
(defn mapcat*
"Transform the stream by applying function f to each element in the stream.
f will be a function that receives an item and will return a seq, the
@@ -303,16 +297,18 @@
(letfn [
(outer-consumer [inner-consumer stream]
(cond
- (empty? stream)
- (continue #(outer-consumer inner-consumer %))
- (eof? stream)
- (yield inner-consumer eof)
- :else
- (let [new-stream (core/drop-while pred stream)]
- (if (not (empty? new-stream))
- (yield (inner-consumer new-stream) [])
- (continue (ensure-inner-done outer-consumer
- inner-consumer))))))]
+ (empty? stream)
+ (continue #(outer-consumer inner-consumer %))
+
+ (eof? stream)
+ (yield inner-consumer eof)
+
+ :else
+ (let [new-stream (core/drop-while pred stream)]
+ (if (not (empty? new-stream))
+ (yield (inner-consumer new-stream) [])
+ (continue (ensure-inner-done outer-consumer inner-consumer))))))]
+
(fn to-outer-consumer [inner-consumer]
(ensure-inner-done outer-consumer inner-consumer))))
@@ -355,15 +351,16 @@
(letfn [
(outer-consumer [total-count inner-consumer stream]
(cond
+
(eof? stream)
(if (> total-count 0)
(throw (Exception. "require*: minimum count wasn't satisifed"))
(yield inner-consumer eof))
(empty? stream)
- (continue
- (ensure-inner-done (partial outer-consumer total-count)
- inner-consumer))
+ (continue
+ (ensure-inner-done (partial outer-consumer total-count)
+ inner-consumer))
:else
(let [total-count1 (- total-count (count stream))]
(if (<= total-count 0)
@@ -408,8 +405,8 @@
first-chunks
(concat first-chunks last-chunk))))
-(defn split-when* [f inner-consumer]
+(defn split-when* [f]
"Splits on elements satisfiying the given f function, the inner-consumer
will receive chunks of collections from the stream."
- (to-filter (split-when-consumer f) inner-consumer))
+ (to-filter (split-when-consumer f)))
View
11 test/river/test/seq.clj
@@ -205,9 +205,16 @@
(deftest split-when*-test
- (let [result (run> (rs/produce-seq 10 (range 1 12))
- (rs/split-when* #(= 0 (mod % 3)))
+ (let [new-producer (attach-to-producer #(rs/produce-seq 10 (range 1 12) %)
+ (rs/split-when* #(= 0 (mod % 3))))
+ result (run> new-producer
rs/consume)]
(is (= [[1 2 3] [4 5 6] [7 8 9] [10 11]] (:result result)))
+ (is eof (:remainder result)))
+ (let [new-consumer (attach-to-consumer rs/consume
+ (rs/split-when* #(= 0 (mod % 3))))
+ result (run> (rs/produce-seq 10 (range 1 12))
+ new-consumer)]
+ (is (= [[1 2 3] [4 5 6] [7 8 9] [10 11]] (:result result)))
(is eof (:remainder result))))
Please sign in to comment.
Something went wrong with that request. Please try again.