Permalink
Browse files

Reimplementing most of the filters

  • Loading branch information...
1 parent 16bc58b commit 73368094e1d9068b2ad9417c8d7f55d55a0e6ee3 @roman committed Feb 17, 2012
Showing with 181 additions and 120 deletions.
  1. +3 −23 src/river/core.clj
  2. +122 −73 src/river/seq.clj
  3. +56 −24 test/river/test/seq.clj
View
@@ -256,26 +256,6 @@
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-(defn mapcat** [mapping-f]
- ".."
- (letfn [
- (perform-loop [inner-consumer0 [x & xs :as stream]]
- (cond
- (empty? stream) (continue #(step inner-consumer0 %))
- :else
- (let [inner-consumer (inner-consumer0 (mapping-f x))]
- (cond
- (continue? inner-consumer) (recur inner-consumer xs)
- (yield? inner-consumer) (yield inner-consumer xs)))))
- (step [inner-consumer stream]
- (cond
- (eof? stream)
- (yield (continue inner-consumer) stream)
- :else
- (perform-loop inner-consumer stream)))
- ]
- (fn to-outer-consumer [inner-consumer]
- #(step inner-consumer %))))
(defn attach-to-consumer
"..."
@@ -293,14 +273,14 @@
result (check inner-consumer)]
result)))
-(defn attach-to-producer
+(defn attach-to-producer
"..."
[producer some-filter]
(fn new-producer [consumer]
(let [new-consumer (produce-eof (producer (some-filter consumer)))]
(cond
(yield? new-consumer)
- (:result new-consumer)
- (continue? new-consumer)
+ (:result new-consumer)
+ :else
(throw (Exception. "attach-filter: missbehaving consumer"))))))
View
@@ -13,6 +13,15 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
+;; Utility Functions
+;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(defn- span [pred xs]
+ ((core/juxt #(core/take-while pred %) #(core/drop-while pred %)) xs))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;
;; Consumers
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -27,7 +36,7 @@
(consumer [buffer n-elems stream]
(cond
(eof? stream) (yield buffer eof)
- (empty-chunk? stream) (continue #(take buffer n-elems %))
+ (empty-chunk? stream) (continue #((take buffer n-elems) %))
:else
(let [taken-elems (concat buffer (core/take n-elems stream))
new-size (- n-elems (count stream))]
@@ -224,37 +233,43 @@
f will be a function that receives an item and will return a seq, the
resulting seqs will be later concatenated and be feeded to the given
consumer."
- [f inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (eof? stream) (inner-consumer eof)
- :else
- (mapcat* f (inner-consumer (mapcat f stream)))))))
+ [f]
+ (letfn [
+ (feed-loop [inner-consumer0 [x & xs :as stream]]
+ (cond
+ (empty? stream)
+ (continue #(feed-inner-consumer inner-consumer0 %))
+ :else
+ (let [inner-consumer (inner-consumer0 (f x))]
+ (cond
+ (continue? inner-consumer) (recur inner-consumer xs)
+ (yield? inner-consumer) (yield inner-consumer xs)))))
+ (feed-inner-consumer [inner-consumer stream]
+ (cond
+ (eof? stream)
+ (yield (continue inner-consumer) stream)
+ :else
+ (feed-loop inner-consumer stream)))
+ ]
+ (fn to-outer-consumer [inner-consumer]
+ #(feed-inner-consumer inner-consumer %))))
(defn map*
"Transform the stream by applying function f to each element in the stream.
f will be a function that receives an item and return another of (possibly)
a different type, this items will be feeded to the consumer."
- [f inner-consumer]
- (mapcat* (comp vector f) inner-consumer))
+ [f]
+ (fn to-outer-consumer [inner-consumer]
+ ((mapcat* (comp vector f)) inner-consumer)))
(defn filter*
"Removes elements from the stream by using the function pred. pred will
receive an element from the stream and will return a boolean indicating if
the element should be kept in the stream or not. The consumer will be
feed with the elements of the stream in which pred returns true."
- [pred inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (eof? stream) (inner-consumer eof)
- :else
- (filter* pred (inner-consumer (core/filter pred stream)))))))
+ [pred]
+ (fn to-outer-consumer [inner-consumer]
+ ((mapcat* (comp #(core/filter pred %) vector)) inner-consumer)))
(defn zip*
"Multiplexes the stream into multiple consumers, each of the consumers
@@ -268,75 +283,109 @@
:else
(apply zip* (for [c inner-consumers] (ensure-done c stream))))))
+(defn ensure-inner-done [f consumer]
+ (fn [stream]
+ (cond
+ (yield? consumer) (yield consumer stream)
+ :else (f consumer stream))))
+
(defn drop-while*
"Works similarly to the drop-while consumer, it will drop elements from
the stream until pred holds false, at that point the given inner-consumer
will be feed with the receiving stream."
- [f inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (eof? stream) (inner-consumer eof)
- :else
- (let [result (core/drop-while f stream)]
- (if (-> result empty? not)
- (inner-consumer result)
- (drop-while* f inner-consumer)))))))
+ [pred]
+ (letfn [
+ (feed-inner-consumer [inner-consumer stream]
+ (cond
+ (empty? stream)
+ (continue #(feed-inner-consumer inner-consumer %))
+ (eof? stream)
+ (yield inner-consumer eof)
+ :else
+ (let [new-stream (core/drop-while pred stream)]
+ (if (not (empty? new-stream))
+ (yield (inner-consumer new-stream) [])
+ (continue #(feed-inner-consumer inner-consumer %))))))]
+ (fn to-outer-consumer [consumer]
+ (ensure-inner-done feed-inner-consumer consumer))))
(defn isolate*
"Prevents the consumer from receiving more stream than the specified in
n, as soon as n elements had been feed, the filter will feed an EOF to
the inner-consumer."
- [n inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (eof? stream) (inner-consumer eof)
- :else
- (let [stream-count (count stream)]
- (if (> stream-count n)
- (produce-eof (inner-consumer (core/take n stream)))
- (isolate* (- n stream-count) (inner-consumer stream))))))))
+ [n]
+ (letfn [
+ (feed-inner-consumer [total-count inner-consumer stream]
+ (cond
+ (eof? stream)
+ (yield inner-consumer eof)
+ (empty? stream)
+ (continue #(feed-inner-consumer total-count
+ inner-consumer
+ %))
+ :else
+ (let [stream-count (count stream)]
+ (if (> stream-count total-count)
+ (yield (inner-consumer
+ (core/take total-count stream))
+ (core/drop total-count stream))
+ (continue
+ (ensure-inner-done (partial feed-inner-consumer
+ (- total-count stream-count))
+ (inner-consumer stream)))))))]
+ (fn to-outer-consumer [consumer]
+ (ensure-inner-done (partial feed-inner-consumer n)
+ consumer))))
(defn require*
"Throws an exception if there is not at least n elements streamed to
the inner-consumer."
- [n inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (and (eof? stream)
- (> n 0))
- (throw (Exception. "ERROR: require* wasn't satisfied"))
-
- (<= n 0)
- (inner-consumer stream)
-
- :else
- (require* (- n (count stream))
- (inner-consumer stream))))))
+ [n]
+ (letfn [
+ (feed-inner-consumer [total-count inner-consumer stream]
+ (cond
+ (eof? stream)
+ (if (> total-count 0)
+ (throw (Exception. "require*: minimum count wasn't satisifed"))
+ (yield inner-consumer eof))
+ (empty? stream)
+ (continue #(feed-inner-consumer total-count
+ inner-consumer
+ %))
+ :else
+ (if (<= total-count 0)
+ (yield (inner-consumer stream) [])
+ (continue
+ (ensure-inner-done (partial feed-inner-consumer
+ (- total-count (count stream)))
+ (inner-consumer stream))))))]
+ (fn to-outer-consumer [inner-consumer]
+ (ensure-inner-done (partial feed-inner-consumer n)
+ inner-consumer))))
(defn stream-while*
"Streams elements to the inner-consumer until the f function returns a falsy
value for a given item."
- [f inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (fn outer-consumer [stream]
- (cond
- (eof? stream) (inner-consumer eof)
- :else
- (let [result (core/take-while f stream)]
- (if (= result stream)
- (stream-while* f (inner-consumer result))
- (produce-eof (inner-consumer result))))))))
+ [f]
+ (letfn [
+ (outer-consumer [inner-consumer stream]
+ (cond
+
+ (eof? stream)
+ (yield inner-consumer stream)
+
+ (empty? stream)
+ (continue #(outer-consumer inner-consumer %))
+
+ :else
+ (let [[to-feed to-drop] (span f stream)]
+ (if (empty? to-drop)
+ (continue (ensure-inner-done outer-consumer
+ (inner-consumer to-feed)))
+ (yield (inner-consumer to-feed) to-drop)))))]
+
+ (fn to-outer-consumer [inner-consumer]
+ (ensure-inner-done outer-consumer inner-consumer))))
(defn- split-when-consumer [f]
(do-consumer
Oops, something went wrong.

0 comments on commit 7336809

Please sign in to comment.