Skip to content

Commit

Permalink
work on pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
vanjakom committed Jan 28, 2019
1 parent 8cea947 commit 3bf0f94
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion src/clj_common/pipeline.clj
Expand Up @@ -9,7 +9,26 @@
;;; stopping chain is not working, close of underneath chan would not stop reader
;;; since reader buffers elements ...
;;; meaning take is not functioning as expected
;;; 20190127, update, close-and-exaust should be used within all stopping go-s, note tested

;;; todo
;;; channel closing, should it be bottom -> down, or down -> up, or both?, pipeline stopping
;;; think more about this

;;; todo
;;; return of gos, currently in some it's :sucesss, should it be channel to stop go or
;;; some control thing ...

(defn close-and-exhaust
"To be used within pipeline to close and exhaust channel. Exhaust is important
for stopping of pipeline, read -> take example, once take obtained enough elements
It will close channel read is emitting to and read one more element read created"
[chan]
(async/close! chan)
(async/go
(loop [element (async/<! chan)]
(when element
(recur (async/<! chan))))))

(defn read-edn-go
"Reads contents of file to given channel. Channel is closed when file is read."
Expand Down Expand Up @@ -165,10 +184,25 @@
(when (async/>! out element)
(when (> left 1)
(recur (dec left) (async/<! in))))))
(async/close! in)
(close-and-exhaust in)
(async/close! out)
(context/set-state context "completion")))

(defn constantly-go
"Reads value from in channel once and emitts same value to out until out is not closed"
[context in out]
(async/go
(context/set-state context "init")
(let [value (async/<! in)]
(when value
(context/set-state context "step")
(context/counter context "in")
(loop [result (async/>! out value)]
(when result
(context/counter context "out")
(recur (async/>! out value))))
(context/set-state context "completion")))))

(defn filter-go
"To be replaced with single combining transducer once I learn how to setup it."
[context in filter-fn out]
Expand Down

0 comments on commit 3bf0f94

Please sign in to comment.