Permalink
Browse files

Merge branch 'new-filters'

  • Loading branch information...
2 parents 11d70e4 + efce508 commit ac9a0f85bbe01a7faa9891588eb713162651743f @roman committed Feb 18, 2012
Showing with 581 additions and 402 deletions.
  1. +64 −42 README.md
  2. +1 −1 project.clj
  3. +99 −99 src/river/core.clj
  4. +14 −4 src/river/examples/monadic.clj
  5. +28 −17 src/river/examples/sum.clj
  6. +54 −50 src/river/io.clj
  7. +188 −115 src/river/seq.clj
  8. +15 −12 test/river/test/core.clj
  9. +1 −1 test/river/test/io.clj
  10. +117 −61 test/river/test/seq.clj
View
106 README.md
@@ -34,24 +34,24 @@ the Iteratee concepts in Clojure.
## Install
-[org.van-clj/river "0.0.1"]
+[org.van-clj/river "0.1.0"]
## Usage
-To execute a consumer you will use the `run>` macro:
+To execute a consumer you will use the `run` function:
```clojure
-(river.core/run> (river.seq/produce-seq (range 1 100))
- (river.seq/filter* #(= 0 (mod % 2)))
- (river.seq/take 5))
+(river.core/run (river.seq/produce-seq (range 1 100))
+ (river.core/*c (river.seq/filter* #(= 0 (mod % 2)))
+ (river.seq/take 5)))
; => #river.core/ConsumerDome {:result (2 4 6 8 10), :remainder (12 14)}
```
-The code above streams a seq from 1 to 99, then goes through a filter
-and finally takes 5 items from the feed. The result of this execution will be
-a `ConsumerDone` record that has the yielded _result_ and the _remainder_ of
-the given chunks.
+The code above streams a seq from 1 to 99, then calls `\*c` to bind a filter
+to a consumer that takes 5 items from the feed. The result of this execution
+will be a `ConsumerDone` record that has the yielded _result_ and the
+_remainder_ of the given chunks.
### Stream
@@ -105,62 +105,84 @@ some sort of _decorator_ in the OO world.
Say for example you want to be able to sum a list of numbers, but this
numbers may come from different resources, some from stdin, others from a
-list, etc. To implement this using the clj-stream library, you would do
+list, etc. To implement this using the river library, you would do
something like the following:
```clojure
-(ns river.core.examples.sum
+(ns river.examples.sum
(require [clojure.string :as string])
(use [river.core])
- (require [river.seq :as sseq]
- [river.io :as sio]))
-
-(def words* (partial sseq/mapcat* #(string/split % #"\s+")))
-
-(def numbers* (partial sseq/map* #(Integer/parseInt %)))
-
-(defn produce-numbers-from-file [file-path consumer]
- ; running producers and filters without the run> macro
- ; decorating each filter.
- (sio/produce-file-lines file-path
- (words*
- (numbers* consumer))))
-
-(println (run> ; producing input from a file
- (produce-numbers-from-file "input.in")
-
- ; producing input from a seq
- (sseq/produce-seq (range 1 10))
-
- ; consuming numbers from both input sources
- (sseq/reduce + 0)))
+ (require [river.seq :as rs]
+ [river.io :as rio]))
+
+(def words* (rs/mapcat* #(string/split % #"\s+")))
+; ^ a filter that applies a split to whatever it recieves
+; in the stream, this is assuming the stream is of strings
+
+(def numbers* (rs/map* #(Integer/parseInt %)))
+; ^ a fitler that transforms each received item into an
+; Integer using the parseInt function, this is assuming
+; that the stream is of strings
+
+(defn produce-numbers-from-file
+ ([] (produce-numbers-from-file "input.in"))
+ ([file-path]
+ (p*
+ ; ^ whe bind a producer with some filters
+ ; using the p* function.
+ (rio/produce-file-lines file-path)
+ ; ^ produces a stream of lines from a file path
+ words*
+ ; ^ applies the words filter
+ numbers*)))
+ ; ^ applies the number stream
+
+(defn -main []
+ (println (run
+ ; ^ executes a group of producers/consumers
+ (produce-numbers-from-file)
+ ; ^ produce a stream of numbers from a file
+ (rs/produce-seq (range 1 10))
+ ; ^ produce a stream of numbers from a seq
+ (rs/reduce + 0))))
+ ; ^ sums up the numbers ignoring where they come from
```
-### Building consumers using algo.monad ###
+### Building consumers using monadic interface ###
Sometimes we want to create a new consumer composing simple consumers together,
we can do that using the monadic API:
```clojure
-(ns river.core.examples.monadic
+(ns river.examples.monadic
(:use [river.core])
- (:require [river.seq :as sseq]
- [river.io :as sio]))
+ (:require [river.seq :as rs]
+ [river.io :as rio]))
(def drop-and-head
(do-consumer
- [_ (sseq/drop-while #(< % 5))
- b sseq/first]
+ ; ^ allows you to create a new consumer using
+ ; a monadic notation as in clojure.algo.monads
+ [_ (rs/drop-while #(< % 5))
+ ; ^ drops from the stream until the condition is met
+ b rs/first]
+ ; ^ gets the first element after the dropping is done
b))
-(println (run> (sseq/produce-seq (range -20 20))
- drop-and-head))
+(defn -main []
+ (println (run
+ ; ^ function to execute producers/consumers
+ (rs/produce-seq (range -20 20))
+ ; ^ produce a stream of numbers from a seq
+ drop-and-head)))
+ ; drops until < 5 and then gives the first element found
+ ; (in this case 6)
```
## License
-Copyright (C) 2011 Roman Gonzalez
+Copyright (C) 2012 Roman Gonzalez
Distributed under the Eclipse Public License, the same as Clojure.
View
@@ -1,4 +1,4 @@
-(defproject org.van-clj/river "0.0.2-SNAPSHOT"
+(defproject org.van-clj/river "0.1.0-SNAPSHOT"
:description "A monadic stream library in Clojure (port of Haskell's enumerator)"
:author "Roman Gonzalez"
:repositories { "sonatype" {:url "https://oss.sonatype.org/content/repositories/snapshots/"}}
View
@@ -137,116 +137,116 @@
(println stream)
(continue print-chunks))))
-(defn- gen-filter-fn [filter-consumer0 filter-consumer inner-consumer]
- (cond
- (yield? inner-consumer) inner-consumer
- :else
- (cond
- (yield? filter-consumer)
- (let [filter-result (:result filter-consumer)
- filter-remainder (:remainder filter-consumer)
- next-inner-consumer (inner-consumer [filter-result])]
-
- (if (no-remainder? filter-consumer)
- (recur filter-consumer0
- filter-consumer
- (ensure-done next-inner-consumer filter-remainder))
-
- (recur filter-consumer0
- (filter-consumer0 filter-remainder)
- next-inner-consumer)))
-
- :else
- (fn outer-consumer [stream]
- (gen-filter-fn filter-consumer0
- (filter-consumer stream)
- inner-consumer)))))
-
-(defn to-filter
- "Transforms a consumer into a filter by feeding the outer input elements
- into the provided consumer until it yields an inner input, passes that to
- the inner consumer and then loops."
- [filter-consumer0 inner-consumer]
- (gen-filter-fn filter-consumer0 filter-consumer0 inner-consumer))
-
-
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; Utility macros and functions to run consumers
;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-(def run
+(defn run
"Allows to terminate ask for termination of producer, filter or consumer."
- produce-eof)
-
-(defn- ensure-in-list [producer-or-filter]
- (if (seq? producer-or-filter)
- producer-or-filter
- (list producer-or-filter)))
-
-(defn- nest-producer-filter-consumer
- ([consumers] consumers)
- ([producer-or-filter0 & more]
- (let [producer-or-filter (ensure-in-list producer-or-filter0)]
- ; when last item is a vector (multiple consumers),
- ; we just concat that to the producer/filter.
- ;
- ; This feature is being used currently zip* filter
- (if (and (nil? (next more))
- (vector? (first more)))
-
- (concat producer-or-filter
- (first more))
-
- (concat producer-or-filter
- `(~(apply nest-producer-filter-consumer more)))))))
-
-(defn gen-producer [& producer+filters-fn]
- "Composes a seq of partially applied producers/filters, and returns a
- new producer that receives either a producer, filter or consumer.
-
- Usage:
- > (def new-producer (gen-producer #(produce-seq (range 10 20) %)
- > #(produce-seq (range 1 10) %)
- > #(filter* even? %))"
- (fn composed-producer [consumer0]
- (reduce
- (fn [consumer producer+filter]
- (producer+filter consumer))
- consumer0
- (reverse producer+filters-fn))))
-
-
-(defmacro gen-producer> [& producers+filters]
- "Composes a seq of producers and filters, and returns a new
- producer that receives either a new producer, filter or consumer.
-
- Usage:
- > (def new-producer (gen-producer> (produce-seq (range 10 20))
- > (produce-seq (range 1 10))
- > (filter* even?)))"
- `(fn composed-producer [~'consumer]
- ~(apply nest-producer-filter-consumer
- (concat producers+filters `(~'consumer)))))
-
-(defmacro run>
- "Works the same way as river/run, its purpose is to ease the building
- of compositions between producers, filters and consumers by allowing
- to specify all of them without nesting one into the other.
-
- With river/run:
-
- > (river/run (producer2 (producer1 (filter1 (filter2 consumer)))))
-
- With river/run>:
-
- > (river/run> producer2 producer1 filter1 filter2 consumer)"
[& more]
- `(run ~(apply nest-producer-filter-consumer more)))
+ (produce-eof (reduce #(%2 %1) (reverse more))))
(defmacro do-consumer [steps result]
"Binds the river-m monadic implementation to the domonad macro,
check clojure.algo.monads/domonad for further info."
`(monad/domonad river-m ~steps ~result))
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;
+;; Filter functions
+;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(defn concat-stream [s1 s2]
+ (cond
+ (or (= s1 eof)
+ (= s2 eof)) eof
+ :else (concat s1 s2)))
+
+(defn ensure-inner-done
+ ([f consumer] (ensure-inner-done [] f consumer))
+ ([extra f consumer]
+ (fn [stream]
+ (cond
+ (yield? consumer) (yield consumer (concat-stream extra stream))
+ :else (f consumer (concat-stream extra stream))))))
+
+(defn to-filter
+ "Transforms a consumer into a filter by feeding the outer input elements
+ into the provided consumer until it yields an inner input, passes that to
+ the inner consumer and then loops."
+ [consumer0*]
+ (letfn [
+ (loop-consumer* [acc consumer* stream]
+ ; ^ this function will feed all the stream possible
+ ; to the filter consumer (consumer*), once the whole
+ ; stream is empty, we return whatever the consumer* was
+ ; able to parse from it, and the current state of
+ ; consumer*
+ (let [new-stream (concat (:remainder consumer*) stream)]
+ (cond
+ (empty? new-stream) [acc consumer*]
+ (yield? consumer*)
+ (recur (conj acc (:result consumer*))
+ consumer0*
+ (concat (:remainder consumer*) stream))
+ (continue? consumer*)
+ (recur acc (consumer* stream) []))))
+
+ (outer-consumer [consumer* inner-consumer stream]
+ (cond
+ (eof? stream)
+ (let [final-result (produce-eof consumer*)]
+ (yield (inner-consumer [(:result final-result)])
+ stream))
+
+ (empty? stream)
+ (continue #(outer-consumer consumer* inner-consumer %))
+
+ :else
+ (let [[new-stream consumer1*] (loop-consumer* [] consumer* stream)]
+ (ensure-inner-done (partial outer-consumer consumer1*)
+ (inner-consumer new-stream)))))]
+
+ (fn to-outer-consumer [inner-consumer]
+ (ensure-inner-done (partial outer-consumer consumer0*)
+ inner-consumer))))
+
+(defn *c
+ "Binds a filter to a consumer."
+ ([a-filter consumer]
+ (letfn [
+ (check [step]
+ (cond
+ (continue? step) (recur (produce-eof step))
+ (yield? step) step
+ :else
+ (throw (Exception. "Something terrible happened!"))))]
+ (do-consumer [
+ :let [outer-consumer (a-filter consumer)]
+ inner-consumer outer-consumer
+ result (check inner-consumer)]
+ result)))
+
+ ([a-filter b-filter & filters]
+ (let [more (->> filters (cons b-filter) (cons a-filter))
+ [consumer a-filter & more] (reverse more)]
+ (reduce #(*c %2 %1) (*c a-filter consumer) more))))
+
+(defn p*
+ "Binds a filter to a producer."
+ ([producer a-filter]
+ (fn new-producer [consumer]
+ (let [new-consumer (produce-eof (producer (a-filter consumer)))]
+ (cond
+ (yield? new-consumer)
+ (:result new-consumer)
+ :else
+ (throw (Exception. "attach-filter: missbehaving consumer"))))))
+
+ ([producer a-filter & more]
+ (reduce p* (p* producer a-filter) more)))
+
+
Oops, something went wrong.

0 comments on commit ac9a0f8

Please sign in to comment.