Skip to content

Commit

Permalink
Updating examples and README
Browse files Browse the repository at this point in the history
  • Loading branch information
roman committed Feb 18, 2012
1 parent 81cab04 commit 6bbbce3
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 61 deletions.
102 changes: 62 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ the Iteratee concepts in Clojure.

## Usage

To execute a consumer you will use the `run>` macro:
To execute a consumer you will use the `run` function:

```clojure
(river.core/run> (river.seq/produce-seq (range 1 100))
(river.seq/filter* #(= 0 (mod % 2)))
(river.seq/take 5))
(river.core/run (river.seq/produce-seq (range 1 100))
(river.core/*c (river.seq/filter* #(= 0 (mod % 2)))
(river.seq/take 5)))

; => #river.core/ConsumerDome {:result (2 4 6 8 10), :remainder (12 14)}
```

The code above streams a seq from 1 to 99, then goes through a filter
and finally takes 5 items from the feed. The result of this execution will be
a `ConsumerDone` record that has the yielded _result_ and the _remainder_ of
the given chunks.
The code above streams a seq from 1 to 99, then calls `\*c` to bind a filter
to a consumer that takes 5 items from the feed. The result of this execution
will be a `ConsumerDone` record that has the yielded _result_ and the
_remainder_ of the given chunks.

### Stream

Expand Down Expand Up @@ -109,58 +109,80 @@ list, etc. To implement this using the clj-stream library, you would do
something like the following:

```clojure
(ns river.core.examples.sum
(ns river.examples.sum

(require [clojure.string :as string])

(use [river.core])
(require [river.seq :as sseq]
[river.io :as sio]))

(def words* (partial sseq/mapcat* #(string/split % #"\s+")))

(def numbers* (partial sseq/map* #(Integer/parseInt %)))

(defn produce-numbers-from-file [file-path consumer]
; running producers and filters without the run> macro
; decorating each filter.
(sio/produce-file-lines file-path
(words*
(numbers* consumer))))

(println (run> ; producing input from a file
(produce-numbers-from-file "input.in")

; producing input from a seq
(sseq/produce-seq (range 1 10))

; consuming numbers from both input sources
(sseq/reduce + 0)))
(require [river.seq :as rs]
[river.io :as rio]))

(def words* (rs/mapcat* #(string/split % #"\s+")))
; ^ a filter that applies a split to whatever it recieves
; in the stream, this is assuming the stream is of strings

(def numbers* (rs/map* #(Integer/parseInt %)))
; ^ a fitler that transforms each received item into an
; Integer using the parseInt function, this is assuming
; that the stream is of strings

(defn produce-numbers-from-file
([] (produce-numbers-from-file "input.in"))
([file-path]
(p*
; ^ whe bind a producer with some filters
; using the p* function.
(rio/produce-file-lines file-path)
; ^ produces a stream of lines from a file path
words*
; ^ applies the words filter
numbers*)))
; ^ applies the number stream

(defn -main []
(println (run
; ^ executes a group of producers/consumers
(produce-numbers-from-file)
; ^ produce a stream of numbers from a file
(rs/produce-seq (range 1 10))
; ^ produce a stream of numbers from a seq
(rs/reduce + 0))))
; ^ sums up the numbers ignoring where they come from
```

### Building consumers using algo.monad ###
### Building consumers using monadic interface ###

Sometimes we want to create a new consumer composing simple consumers together,
we can do that using the monadic API:

```clojure
(ns river.core.examples.monadic
(ns river.examples.monadic
(:use [river.core])
(:require [river.seq :as sseq]
[river.io :as sio]))
(:require [river.seq :as rs]
[river.io :as rio]))

(def drop-and-head
(do-consumer
[_ (sseq/drop-while #(< % 5))
b sseq/first]
; ^ allows you to create a new consumer using
; a monadic notation as in clojure.algo.monads
[_ (rs/drop-while #(< % 5))
; ^ drops from the stream until the condition is met
b rs/first]
; ^ gets the first element after the dropping is done
b))

(println (run> (sseq/produce-seq (range -20 20))
drop-and-head))
(defn -main []
(println (run
; ^ function to execute producers/consumers
(rs/produce-seq (range -20 20))
; ^ produce a stream of numbers from a seq
drop-and-head)))
; drops until < 5 and then gives the first element found
; (in this case 6)
```

## License

Copyright (C) 2011 Roman Gonzalez
Copyright (C) 2012 Roman Gonzalez

Distributed under the Eclipse Public License, the same as Clojure.
18 changes: 14 additions & 4 deletions src/river/examples/monadic.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
(ns river.examples.monadic
(:use [river.core]
(:use [river.core])
(:require [river.seq :as rs]
[river.io :as rio]))
[river.io :as rio]))

(def drop-and-head
(do-consumer
; ^ allows you to create a new consumer using
; a monadic notation as in clojure.algo.monads
[_ (rs/drop-while #(< % 5))
; ^ drops from the stream until the condition is met
b rs/first]
; ^ gets the first element after the dropping is done
b))

(println (run> (rs/produce-seq (range -20 20))
drop-and-head))
(defn -main []
(println (run
; ^ function to execute producers/consumers
(rs/produce-seq (range -20 20))
; ^ produce a stream of numbers from a seq
drop-and-head)))
; drops until < 5 and then gives the first element found
; (in this case 6)
45 changes: 28 additions & 17 deletions src/river/examples/sum.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,37 @@
(require [clojure.string :as string])

(use [river.core])
(require [river.seq :as sseq]
[river.io :as sio]))
(require [river.seq :as rs]
[river.io :as rio]))

(def words* (partial sseq/mapcat* #(string/split % #"\s+")))
(def words* (rs/mapcat* #(string/split % #"\s+")))
; ^ a filter that applies a split to whatever it recieves
; in the stream, this is assuming the stream is of strings

(def numbers* (partial sseq/map* #(Integer/parseInt %)))
(def numbers* (rs/map* #(Integer/parseInt %)))
; ^ a fitler that transforms each received item into an
; Integer using the parseInt function, this is assuming
; that the stream is of strings

(defn produce-numbers-from-file
([consumer] (produce-numbers-from-file "input.in" consumer))
([file-path consumer]
; running producers and filters without the run* macro
; decorating each filter.
(sio/produce-file-lines file-path
(words*
(numbers* consumer)))))
([] (produce-numbers-from-file "input.in"))
([file-path]
(p*
; ^ whe bind a producer with some filters
; using the p* function.
(rio/produce-file-lines file-path)
; ^ produces a stream of lines from a file path
words*
; ^ applies the words filter
numbers*)))
; ^ applies the number stream

(defn -main []
(println (run> ; producing input from a file
produce-numbers-from-file
; producing input from a seq
(sseq/produce-seq (range 1 10))
; consuming numbers from both input sources
(sseq/reduce + 0))))
(println (run
; ^ executes a group of producers/consumers
(produce-numbers-from-file)
; ^ produce a stream of numbers from a file
(rs/produce-seq (range 1 10))
; ^ produce a stream of numbers from a seq
(rs/reduce + 0))))
; ^ sums up the numbers ignoring where they come from

0 comments on commit 6bbbce3

Please sign in to comment.