-
Notifications
You must be signed in to change notification settings - Fork 42
/
challenge_4_1.clj
84 lines (65 loc) · 2.44 KB
/
challenge_4_1.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
(ns workshop.challenge-4-1
(:require [workshop.workshop-utils :as u]))
;;; Workflows ;;;
(def workflow
[[:read-segments :times-three]
[:times-three :write-segments]])
;;; Catalogs ;;;
(defn build-catalog
([] (build-catalog 5 50))
([batch-size batch-timeout]
[{:onyx/name :read-segments
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/batch-timeout batch-timeout
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}
{:onyx/name :times-three
:onyx/fn :workshop.challenge-4-1/times-three
:onyx/type :function
:onyx/batch-size batch-size
:onyx/batch-timeout batch-timeout
:onyx/doc "Multiplies :n in the segment by 3"}
{:onyx/name :write-segments
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/batch-timeout batch-timeout
:onyx/max-peers 1
:onyx/doc "Writes segments to a core.async channel"}]))
;;; Functions ;;;
(defn times-three [segment]
(update-in segment [:n] (partial * 3)))
;;; Lifecycles ;;;
(def logger (agent nil))
(defn log-segments [event lifecycle]
(doseq [m (:onyx.core/batch event)]
(send logger (fn [_] (println m))))
{})
(def logger-lifecycle
{:lifecycle/after-batch log-segments})
(defn inject-writer-ch [event lifecycle]
{:core.async/chan (u/get-output-channel (:core.async/id lifecycle))})
(def writer-lifecycle
{:lifecycle/before-task-start inject-writer-ch})
(defn build-lifecycles []
[{:lifecycle/task :times-three
:lifecycle/calls :workshop.challenge-4-1/logger-lifecycle
:onyx/doc "Logs segments as they're processed"}
{:lifecycle/task :read-segments
:lifecycle/calls :workshop.workshop-utils/in-calls
:core.async/id (java.util.UUID/randomUUID)
:onyx/doc "Injects the core.async reader channel"}
{:lifecycle/task :read-segments
:lifecycle/calls :onyx.plugin.core-async/reader-calls
:onyx/doc "core.async plugin base lifecycle"}
{:lifecycle/task :write-segments
:lifecycle/calls :workshop.challenge-4-1/writer-lifecycle
:core.async/id (java.util.UUID/randomUUID)
:onyx/doc "Injects the core.async writer channel"}
{:lifecycle/task :write-segments
:lifecycle/calls :onyx.plugin.core-async/writer-calls
:onyx/doc "core.async plugin base lifecycle"}])