Permalink
Browse files

Fixed several bugs on the p* function and removing ensure-inner-done

ensure-inner-done was removed given that the stacktraces wouldn't tell
what filter we were using. We remove this clever function in favor of
more clear stacktraces.
  • Loading branch information...
1 parent 9931ee9 commit c4acfe3beaacfbcf82f9f54d51b25bb1e28ca389 @roman committed Mar 6, 2012
Showing with 174 additions and 97 deletions.
  1. +5 −1 CHANGELOG
  2. +53 −29 src/river/core.clj
  3. +114 −65 src/river/seq.clj
  4. +2 −2 test/river/test/core.clj
View
6 CHANGELOG
@@ -2,7 +2,11 @@ Unknown Date Roman Gonzalez
* release v0.1.2
* fix typing hint error on write-bytes-to-output-stream consumer
* fix produce-seq bug, was not working properly on initial consumers
- that were yield.
+ that were yield
+ * fix _super nasty bug_ on the p* function, now the composition
+ of multiple filters on producers is guaranteed
+ * removed the ensure-inner-done utility function, this was causing serious
+ conceals of bugs on the stacktraces
2012-02-29 Roman Gonzalez
* release v0.1.1
View
82 src/river/core.clj
@@ -227,54 +227,78 @@
(= s2 eof)) eof
:else (concat s1 s2)))
-(defn ensure-inner-done
- ([f consumer] (ensure-inner-done [] f consumer))
- ([extra f consumer]
- (fn [stream]
- (cond
- (yield? consumer) (yield consumer (concat-stream extra stream))
- :else (f consumer (concat-stream extra stream))))))
+;(defn ensure-inner-done
+; ([f inner-consumer] (ensure-inner-done [] f inner-consumer))
+; ([extra f inner-consumer]
+; (fn ensure-inner-done-consumer [stream]
+; (cond
+;
+; (yield? inner-consumer)
+; (yield inner-consumer (concat-stream extra stream))
+;
+; (continue? inner-consumer)
+; (f inner-consumer (concat-stream extra stream))
+;
+; :else
+; (throw
+; (Exception.
+; "ensure-inner-done: invalid consumer (not yield nor continue)"))))))
(defn to-filter
"Transforms a consumer into a filter by feeding the outer input elements
into the provided consumer until it yields an inner input, passes that to
the inner consumer and then loops."
[consumer0*]
(letfn [
- (loop-consumer* [acc consumer* stream]
+ (loop-consumer* [acc consumer* outer-stream]
; ^ this function will feed all the stream possible
; to the filter consumer (consumer*), once the whole
; stream is empty, we return whatever the consumer* was
; able to parse from it, and the current state of
; consumer*
- (let [new-stream (concat (:remainder consumer*) stream)]
- (cond
- (empty? new-stream) [acc consumer*]
- (yield? consumer*)
+ (cond
+
+ (and (empty-chunk? outer-stream)
+ (continue? consumer*)) [acc consumer*]
+
+ (yield? consumer*)
(recur (conj acc (:result consumer*))
consumer0*
- (concat (:remainder consumer*) stream))
- (continue? consumer*)
- (recur acc (consumer* stream) []))))
+ (concat (:remainder consumer*) outer-stream))
+
+ (continue? consumer*)
+ (recur acc (consumer* outer-stream) [])))
- (outer-consumer [consumer* inner-consumer stream]
+ (outer-consumer [consumer* inner-consumer outer-stream]
(cond
- (eof? stream)
+
+ (yield? inner-consumer)
+ (yield inner-consumer outer-stream)
+
+ (continue? inner-consumer)
+ (cond
+
+ (eof? outer-stream)
(let [final-result (produce-eof consumer*)]
(yield (inner-consumer [(:result final-result)])
- stream))
+ outer-stream))
- (empty? stream)
+ (empty-chunk? outer-stream)
(continue #(outer-consumer consumer* inner-consumer %))
:else
- (let [[new-stream consumer1*] (loop-consumer* [] consumer* stream)]
- (ensure-inner-done (partial outer-consumer consumer1*)
- (inner-consumer new-stream)))))]
+ (let [[inner-stream consumer1*] (loop-consumer* []
+ consumer*
+ outer-stream)]
+
+ (recur consumer1* (inner-consumer inner-stream) []))
+ :else
+ (throw
+ (Exception.
+ "to-filter: invalid inner consumer (not yield nor continue)")))))]
(fn to-outer-consumer [inner-consumer]
- (ensure-inner-done (partial outer-consumer consumer0*)
- inner-consumer))))
+ #(outer-consumer consumer0* inner-consumer %))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
@@ -288,10 +312,10 @@
(letfn [
(check [step]
(cond
- (continue? step) (recur (produce-eof step))
- (yield? step) step
- :else
- (throw (Exception. "*c: Invalid step (not continue nor yield)"))))]
+ (continue? step)
+ (do
+ (produce-eof step))
+ (yield? step) step))]
(do-consumer [
:let [outer-consumer (a-filter consumer)]
inner-consumer outer-consumer
@@ -312,7 +336,7 @@
(yield? new-consumer)
(:result new-consumer)
:else
- (throw (Exception. "attach-filter: missbehaving consumer"))))))
+ (throw (Exception. "p*: missbehaving consumer"))))))
([producer a-filter & more]
(reduce p* (p* producer a-filter) more)))
View
179 src/river/seq.clj
@@ -270,21 +270,31 @@
(outer-consumer [inner-consumer stream]
(cond
- (eof? stream)
- (yield (inner-consumer stream) stream)
- (empty-chunk? stream)
- (continue #(outer-consumer inner-consumer %))
+ (yield? inner-consumer)
+ (yield inner-consumer stream)
+ (continue? inner-consumer)
+ (cond
+
+ (eof? stream)
+ ;(yield (inner-consumer stream) stream)
+ (yield inner-consumer stream)
+
+ (empty-chunk? stream)
+ (continue #(outer-consumer inner-consumer %))
+
+ :else
+ (let [[inner-consumer remainder] (feed-inner-loop inner-consumer
+ stream)]
+ (recur inner-consumer remainder)))
:else
- (let [[inner-consumer remainder] (feed-inner-loop inner-consumer
- stream)]
- (continue
- ((ensure-inner-done outer-consumer
- inner-consumer) remainder)))))]
+ (throw
+ (Exception.
+ "mapcat*: invalid consumer (not yield nor continue)"))))]
(fn to-outer-consumer [inner-consumer]
- (ensure-inner-done outer-consumer inner-consumer))))
+ (continue #(outer-consumer inner-consumer %)))))
(defn map*
"Transform the stream by applying function `f` to each element in the stream.
@@ -311,20 +321,30 @@
(letfn [
(outer-consumer [inner-consumer stream]
(cond
- (eof? stream)
- (yield (inner-consumer stream) stream)
- (empty-chunk? stream)
- (continue #(outer-consumer inner-consumer %))
+ (yield? inner-consumer)
+ (yield inner-consumer stream)
- :else
- (let [new-stream (core/drop-while pred stream)]
- (if (not (empty-chunk? new-stream))
- (yield (inner-consumer new-stream) [])
- (continue (ensure-inner-done outer-consumer inner-consumer))))))]
+ (continue? inner-consumer)
+ (cond
+ (eof? stream)
+ (yield (inner-consumer stream) stream)
+
+ (empty-chunk? stream)
+ (continue #(outer-consumer inner-consumer %))
+
+ :else
+ (let [new-stream (core/drop-while pred stream)]
+ (if (not (empty-chunk? new-stream))
+ (yield (inner-consumer new-stream) [])
+ (continue #(outer-consumer inner-consumer %)))))
+ :else
+ (throw
+ (Exception.
+ "drop-while*: invalid consumer (not yield nor continue)"))))]
(fn to-outer-consumer [inner-consumer]
- (ensure-inner-done outer-consumer inner-consumer))))
+ (continue #(outer-consumer inner-consumer %)))))
(defn isolate*
"Prevents the consumer from receiving more stream than the specified in
@@ -334,28 +354,38 @@
(letfn [
(outer-consumer [total-count inner-consumer stream]
(cond
- (eof? stream)
- (yield (inner-consumer stream) eof)
- (empty-chunk? stream)
- (continue #(outer-consumer total-count
- inner-consumer
- %))
- :else
- (let [stream-count (count stream)
- total-count1 (- total-count stream-count)]
+ (yield? inner-consumer)
+ (yield inner-consumer stream)
+
+ (continue? inner-consumer)
+ (cond
- (if (> stream-count total-count)
- (yield (inner-consumer (core/take total-count stream))
- (core/drop total-count stream))
+ (eof? stream)
+ (yield inner-consumer eof)
+
+ (empty-chunk? stream)
+ (continue #(outer-consumer total-count
+ inner-consumer
+ %))
+ :else
+ (let [stream-count (count stream)
+ total-count1 (- total-count stream-count)]
+
+ (if (> stream-count total-count)
+ (yield (inner-consumer (core/take total-count stream))
+ (core/drop total-count stream))
+
+ (continue #(outer-consumer total-count1
+ (inner-consumer stream)
+ %)))))
+ :else
+ (throw
+ (Exception.
+ "isolate*: invalid consumer (not yield nor continue)"))))]
- (continue
- (ensure-inner-done (partial outer-consumer
- total-count1)
- (inner-consumer stream)))))))]
(fn to-outer-consumer [consumer]
- (ensure-inner-done (partial outer-consumer n)
- consumer))))
+ (continue #(outer-consumer n consumer %)))))
(defn require*
"Throws an exception if there is not at least `n` elements streamed to
@@ -364,26 +394,37 @@
(letfn [
(outer-consumer [total-count inner-consumer stream]
(cond
- (eof? stream)
- (if (> total-count 0)
- (throw (Exception. "require*: minimum count wasn't satisifed"))
- (yield (inner-consumer stream) stream))
-
- (empty-chunk? stream)
- (continue
- (ensure-inner-done (partial outer-consumer total-count)
- inner-consumer))
+
+ (yield? inner-consumer)
+ (yield inner-consumer stream)
+
+ (continue? inner-consumer)
+ (cond
+
+ (eof? stream)
+ (if (> total-count 0)
+ (throw (Exception. "require*: minimum count wasn't satisifed"))
+ (yield inner-consumer stream))
+
+ (empty-chunk? stream)
+ (continue #(outer-consumer total-count
+ inner-consumer
+ %))
+
+ :else
+ (let [total-count1 (- total-count (count stream))]
+ (if (<= total-count 0)
+ (yield (inner-consumer stream) [])
+ (continue #(outer-consumer total-count1
+ (inner-consumer stream)
+ %)))))
:else
- (let [total-count1 (- total-count (count stream))]
- (if (<= total-count 0)
- (yield (inner-consumer stream) [])
+ (throw
+ (Exception.
+ "require*: invalid consumer (not yield nor continue)"))))]
- (continue
- (ensure-inner-done (partial outer-consumer total-count1)
- (inner-consumer stream)))))))]
(fn to-outer-consumer [inner-consumer]
- (ensure-inner-done (partial outer-consumer n)
- inner-consumer))))
+ (continue #(outer-consumer n inner-consumer %)))))
(defn stream-while*
"Streams elements to the consumer until the `f` function returns a falsy
@@ -392,21 +433,29 @@
(letfn [
(outer-consumer [inner-consumer stream]
(cond
- (eof? stream)
- (yield (inner-consumer stream) stream)
+ (yield? inner-consumer)
+ (yield inner-consumer stream)
+
+ (continue? inner-consumer)
+ (cond
+ (eof? stream)
+ (yield (inner-consumer stream) stream)
- (empty-chunk? stream)
- (continue #(outer-consumer inner-consumer %))
+ (empty-chunk? stream)
+ (continue #(outer-consumer inner-consumer %))
+ :else
+ (let [[to-feed to-drop] (span f stream)]
+ (if (empty? to-drop)
+ (continue #(outer-consumer (inner-consumer to-feed) %))
+ (yield (inner-consumer to-feed) to-drop))))
:else
- (let [[to-feed to-drop] (span f stream)]
- (if (empty? to-drop)
- (continue (ensure-inner-done outer-consumer
- (inner-consumer to-feed)))
- (yield (inner-consumer to-feed) to-drop)))))]
+ (throw
+ (Exception.
+ "stream-while*: invalid consumer (not yield nor continue)"))))]
(fn to-outer-consumer [inner-consumer]
- (ensure-inner-done outer-consumer inner-consumer))))
+ (continue #(outer-consumer inner-consumer %)))))
(defn- split-when-consumer [f]
(do-consumer
View
4 test/river/test/core.clj
@@ -14,15 +14,15 @@
(rs/stream-while* (constantly true))
(rs/split-when* (constantly false))]
- consumers [rs/consume
+ consumers [(rs/consume)
(rs/take 2)
(rs/take-while (constantly true))
(rs/drop 2)
(rs/drop-while (constantly true))
rs/first
rs/peek]]
(doseq [f filters*
- c consumers]
+ c consumers]
(is (yield? (run (*c f c)))
(str "expected " f " and " c " to yield a result")))))

0 comments on commit c4acfe3

Please sign in to comment.