Clojure concurrent pipeline on core.async.
What is a pipeline ?
A pipeline is a series of stages connected by channels, where each
stage is a group of
goroutines go blocks running the same
function. In each stage, the goroutines go blocks
- receive values from upstream via inbound channels
- perform some function on that data, usually producing new values
- send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
clojure.core/comp, with concurrent magic.
(require '[papaline.core :refer :all])
A stage is a simple clojure function wrapped with the
A pipeline is an ordered sequence of stages.
(defn save-msg [user-id msg] ;; write the msg to mysql and get the msg id [user-id msg-id]) (defn query-followers [id msg-id] ;; ask mysql for some ids [follower-ids msg] ) (defn fan-out-to-redis [follower-ids msg] ;; spread msg to redis lists ) (def save-status (pipeline (map stage [save-msg query-followers fan-out-to-redis])))
You can also give stage a name by
named-stage. The name will be
error-handler so you can log which stage is having
;; you can also make a macro for this (named-stage "fan-out-to-redis" fan-out-to-redis)
Run the pipeline
(run-pipeline save-status "userid" "Putin orders to approve draft bill on integration of #Crimea into Russia http://on.rt.com/xhz2zu")
Run pipeline and wait for the results
(println (run-pipeline-wait save-status "userid" "Putin orders to approve draft bill on integration of #Crimea into Russia http://on.rt.com/xhz2zu"))
And wait with a timeout
(println (run-pipeline-timeout save-status 5000 ;; timeout in msecs nil ;; return value on timeout ;; arguments "userid" "Putin orders to approve draft bill on integration of #Crimea into Russia http://on.rt.com/xhz2zu"))
Stop a pipeline: the entire pipeline will be stopped and no more input will be processed.
Special return types in stage functions
You can abort task execution at any stage of pipeline. Just return in your stage function.
(defn my-stage  (if ... ... (abort)))
You can also add some return value for
;; returns 123 as pipeline result (abort 123)
(fork) and (join)
New in 0.3, you can use
(fork) to split tasks, to execute them in
parallel in the next stage.
(join)ed results will wait for all forked tasks to finish, then pass
them as a whole result to next stage.
Note that if you do not join the results, then the first sub-task
result in forked staged will be returned value for
(defn find-followers  ... (fork (map #(vector % msg-id) followers))) (defn fanout-to-redis [user-id msg-id] ... (join true))
Dealing with Except!ons
It is possible to define an error handler for each pipeline. An error handler is a simple function takes the Exception and current input as arguments.
(pipeline (map stage [save-msg query-followers fan-out-to-redis]) :error-handler (fn [e] ;; logging... (logging/warn e "error" (ex-data e)))
If you are using
named-stage to define the stage, the name will be
For synchronous call like
(run-pipeline-timeout), the exception will be thrown to caller
Copyright © 2014 Sun Ning email@example.com
Distributed under the Eclipse Public License either version 1.0 or any later version.