Skip to content

Commit

Permalink
Fixes walkthrough docs and number of peers required
Browse files Browse the repository at this point in the history
  • Loading branch information
anujsrc committed Jul 31, 2016
1 parent c5646ce commit 127070e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
40 changes: 21 additions & 19 deletions WALKTHROUGH.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,31 @@ At some point, we need to actually specify what the functions that we named *are
You'll notice above that I mentioned we're using core.async for both input and output. One of the things we need to do is get a *handle* to the core.async channels to put data into them and get data out of them. Onyx ships with a Task Lifecycle API, allowing you to start and stop stateful entities used in your program. Let's add those channels in:

```clojure
(def input-chan (chan capacity))
(def input-channel-capacity 10000)

(def loud-output-chan (chan capacity))
(def output-channel-capacity (inc input-channel-capacity))

(def question-output-chan (chan capacity))
(def get-input-channel
(memoize
(fn [id]
(chan input-channel-capacity))))

(defn inject-input-ch [event lifecycle]
{:core.async/chan input-chan})
(def get-output-channel
(memoize
(fn [id]
(chan (sliding-buffer output-channel-capacity)))))

(defn inject-loud-output-ch [event lifecycle]
{:core.async/chan loud-output-chan})
(defn inject-in-ch [event lifecycle]
{:core.async/chan (get-input-channel (:core.async/id lifecycle))})

(defn inject-question-output-ch [event lifecycle]
{:core.async/chan question-output-chan})
(defn inject-out-ch [event lifecycle]
{:core.async/chan (get-output-channel (:core.async/id lifecycle))})

(def input-calls
{:lifecycle/before-task-start inject-input-ch})
(def in-calls
{:lifecycle/before-task-start inject-in-ch})

(def loud-output-calls
{:lifecycle/before-task-start inject-loud-output-ch})

(def question-output-calls
{:lifecycle/before-task-start inject-question-output-ch})
(def out-calls
{:lifecycle/before-task-start inject-out-ch})

(defn build-lifecycles []
[{:lifecycle/task :in
Expand Down Expand Up @@ -233,8 +235,8 @@ We use some helper functions in the test to bind the inputs and collect the outp

```clojure
(deftest test-sample-dev-job
;; 8 peers for 8 distinct tasks in the workflow
(let [dev-env (component/start (onyx-dev-env 8))]
;; 7 peers for 7 distinct tasks in the workflow
(let [dev-env (component/start (onyx-dev-env 7))]
(try
(let [[loud-out question-out] (submit-sample/submit-job dev-env)]
(clojure.pprint/pprint loud-out)
Expand Down Expand Up @@ -279,7 +281,7 @@ And the output is:
A few notes:
- The last piece of input we sent through is not a segment, but `:done`! It's called the Sentinel. This is a specially recognized value in Onyx which *completes* the current running task. This value is used to switch transparently between batch and streaming modes.
- The `id` on the Peers in a distributed environment *must* match up for them to work together. This is how they find each other when there are multiple deployments.
- We start **8** virtual peers, which is a unit of local parallelism in Onyx. You need at least one virtual peer per task for your job to start.
- We start **7** virtual peers, which is a unit of local parallelism in Onyx. You need at least one virtual peer per task for your job to start.
- The sentinel is helpfully propagated downstream, so you know when you've got to the end of an output stream.

#### Conclusion
Expand Down
4 changes: 2 additions & 2 deletions test/onyx_starter/jobs/sample_job_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
[onyx.api]))

(deftest test-sample-dev-job
;; 8 peers for 8 distinct tasks in the workflow
(let [dev-env (component/start (onyx-dev-env 8))]
;; 7 peers for 7 distinct tasks in the workflow
(let [dev-env (component/start (onyx-dev-env 7))]
(try
(let [{:keys [loud-output question-output]} (submit-sample/submit-job dev-env)]
(clojure.pprint/pprint loud-output)
Expand Down

0 comments on commit 127070e

Please sign in to comment.