Permalink
Switch branches/tags
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
108 lines (79 sloc) 2.21 KB

onyx-core-async

Onyx plugin providing read and write facilities for Clojure core.async.

Installation

This plugin is included with Onyx. You do not need to add it as a separate dependency.

In your peer boot-up namespace:

(:require [onyx.plugin.core-async])

Functions

read-from-chan

Catalog entry:

{:onyx/name :in
 :onyx/plugin :onyx.plugin.core-async/input
 :onyx/type :input
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Reads segments from a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/in-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/reader-calls}]

There’s a little extra baggage with core.async because you need a reference to the channel. Make sure that my.ns/in-calls is a map that references a function to inject the channel in:

(def in-chan (chan capacity))
(def in-buffer (atom {}))

(defn inject-in-ch [event lifecycle]
  {:core.async/buffer in-buffer
   :core.async/chan in-chan})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

write-to-chan

Catalog entry:

{:onyx/name :out
 :onyx/plugin :onyx.plugin.core-async/output
 :onyx/type :output
 :onyx/medium :core.async
 :onyx/batch-size batch-size
 :onyx/max-peers 1
 :onyx/doc "Writes segments to a core.async channel"}

Lifecycle entries:

[{:lifecycle/task :your-task-name
  :lifecycle/calls :my.ns/out-calls}
 {:lifecycle/task :your-task-name
  :lifecycle/calls :onyx.plugin.core-async/writer-calls}]

Again, as with read-from-chan, there’s a little extra to do since core.async has some exceptional behavior compared to other plugins:

(def out-chan (chan capacity))

(defn inject-out-ch [event lifecycle]
  {:core.async/chan out-chan})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

Utility Functions

take-segments!

This additional function is provided as a utility for removing segments from a channel until nothing is read for a provided timeout (in milliseconds).