Skip to content

Commit

Permalink
Now fanout output yields items which are produced by each pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Artemenko committed Jun 23, 2015
1 parent b3971cc commit 68111c4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
11 changes: 9 additions & 2 deletions src/processor/outputs/fanout.hy
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@
(setv pipelines (list (map ensure-list pipelines)))

(fn [input]
(setv results [])

(for [pipeline pipelines]
(setv msg input)

(for [output pipeline]
(setv msg (output msg))
(if-not msg
(break))))))
(lisp-if-not msg
(break)))

;; at the end of each pipeline
;; we yield result if any
(lisp-if msg
(yield msg)))))
17 changes: 17 additions & 0 deletions tests/outputs.hy
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,20 @@
(eq_ [1 3 5 7] odds)
(eq_ [2 4 6] evens))


(defn test_fanout_outputs_items []
(setv source [1 2])
(setv results [])

;; here we use `identity` function to make
;; pipeline longer and ensure that intermediate
;; items are not yielded from `fanout`
(run-pipeline source
[(outputs.fanout
[identity
(fn [item] (+ item 10))]
[identity
(fn [item] (* item item))])
results.append])

(eq_ [11 1 12 4] results))

0 comments on commit 68111c4

Please sign in to comment.