This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
core_async.clj
40 lines (37 loc) · 1.76 KB
/
core_async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
(ns onyx.tasks.core-async
(:require [clojure.core.async :refer [chan]]
[onyx.schema :as os]
[onyx.static.uuid :refer [random-uuid]]
[onyx.plugin.core-async]
[schema.core :as s]))
(def default-channel-size 1000)
(s/defn input
([task-name :- s/Keyword opts]
(input task-name opts default-channel-size))
([task-name :- s/Keyword opts chan-size]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}
opts)
:lifecycles [{:lifecycle/task task-name
:core.async/id (random-uuid)
:core.async/size chan-size
:lifecycle/calls :onyx.plugin.core-async/in-calls}]}}))
(s/defn output
([task-name :- s/Keyword opts]
(output task-name opts default-channel-size))
([task-name :- s/Keyword opts chan-size]
{:task {:task-map (merge {:onyx/name task-name
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/doc "Writes segments to a core.async channel"}
opts)
:lifecycles [{:lifecycle/task task-name
:core.async/id (random-uuid)
:core.async/size (inc chan-size)
:lifecycle/calls :onyx.plugin.core-async/out-calls}]}}))