This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
system.clj
177 lines (158 loc) · 6.65 KB
/
system.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
(ns onyx.system
(:require [clojure.core.async :refer [chan close!]]
[com.stuartsierra.component :as component]
[taoensso.timbre :refer [fatal info]]
[onyx.static.logging-configuration :as logging-config]
[onyx.peer.virtual-peer :refer [virtual-peer]]
[onyx.peer.task-lifecycle :refer [task-lifecycle new-task-information]]
[onyx.monitoring.metrics-monitoring :as metrics-monitoring]
[onyx.messaging.protocols.messenger :as m]
[onyx.messaging.aeron.messaging-group]
[onyx.messaging.aeron.messenger]
[onyx.peer.queryable-state-manager :as queryable-state]
[onyx.peer.peer-group-manager :as pgm]
[onyx.monitoring.no-op-monitoring]
[onyx.monitoring.custom-monitoring]
[onyx.peer.function]
[onyx.log.zookeeper :refer [zookeeper]]
[onyx.query :as qs]
[onyx.static.validation :as validator]
[onyx.state.lmdb]
[onyx.state.memory]
[onyx.log.commands.prepare-join-cluster]
[onyx.log.commands.accept-join-cluster]
[onyx.log.commands.abort-join-cluster]
[onyx.log.commands.notify-join-cluster]
[onyx.log.commands.set-replica]
[onyx.log.commands.group-leave-cluster]
[onyx.log.commands.leave-cluster]
[onyx.log.commands.submit-job]
[onyx.log.commands.kill-job]
[onyx.log.commands.gc]
[onyx.log.commands.add-virtual-peer]
[onyx.log.commands.complete-job]
[onyx.scheduling.greedy-job-scheduler]
[onyx.scheduling.balanced-job-scheduler]
[onyx.scheduling.percentage-job-scheduler]
[onyx.scheduling.balanced-task-scheduler]
[onyx.scheduling.percentage-task-scheduler]
[onyx.scheduling.colocated-task-scheduler]
[onyx.windowing.units]
[onyx.windowing.window-extensions]
[onyx.windowing.aggregation]
[onyx.triggers]
[onyx.refinements]
[onyx.compression.nippy]
[onyx.plugin.protocols]
[onyx.plugin.core-async]
[onyx.storage.s3]
[onyx.extensions :as extensions]
[onyx.checkpoint :as checkpoint]
[onyx.interop]))
(def development-components [:monitoring :logging-config :log])
(def peer-group-components [:logging-config :monitoring :query-server :messenger-group :peer-group-manager :state-store-group])
(def client-components [:monitoring :log])
(def task-components
[:task-lifecycle :task-information])
(def peer-components
[:virtual-peer])
(defn rethrow-component [f]
(try
(f)
(catch Throwable e
(fatal e)
(throw (.getCause e)))))
(defrecord OnyxDevelopmentEnv []
component/Lifecycle
(start [this]
(rethrow-component
#(component/start-system this development-components)))
(stop [this]
(rethrow-component
#(component/stop-system this development-components))))
(defn onyx-development-env
[peer-config]
(map->OnyxDevelopmentEnv
{:monitoring (extensions/monitoring-agent {:monitoring :no-op})
:logging-config (logging-config/logging-configuration peer-config)
:log (component/using (zookeeper peer-config) [:monitoring :logging-config])}))
(defrecord OnyxClient []
component/Lifecycle
(start [this]
(rethrow-component
#(component/start-system this client-components)))
(stop [this]
(rethrow-component
#(component/stop-system this client-components))))
(defrecord OnyxTask [peer-site peer-state task-state]
component/Lifecycle
(start [component]
(rethrow-component
#(component/start-system component task-components)))
(stop [component]
(rethrow-component
#(component/stop-system component task-components))))
(defrecord OnyxPeer []
component/Lifecycle
(start [this]
(rethrow-component
#(component/start-system this peer-components)))
(stop [this]
(rethrow-component
#(component/stop-system this peer-components))))
(defrecord OnyxPeerGroup []
component/Lifecycle
(start [this]
(rethrow-component
#(component/start-system this peer-group-components)))
(stop [this]
(rethrow-component
#(component/stop-system this peer-group-components))))
(defn onyx-client
[peer-client-config]
(validator/validate-peer-client-config peer-client-config)
(map->OnyxClient
{:peer-config peer-client-config
:monitoring (extensions/monitoring-agent {:monitoring :no-op})
:log (component/using (zookeeper peer-client-config) [:monitoring])}))
(defn onyx-task
[peer-state task-state]
(map->OnyxTask
{:logging-config (:logging-config peer-state)
:peer-state peer-state
:task-state task-state
:task-information (new-task-information peer-state task-state)
:task-lifecycle (component/using (task-lifecycle peer-state task-state) [:task-information])}))
(defn onyx-vpeer-system
[group-ch outbox-ch peer-config messenger-group state-store-group monitoring log group-id vpeer-id]
(map->OnyxPeer
{:group-id group-id
:messenger-group messenger-group
:state-store-group state-store-group
:logging-config (logging-config/logging-configuration peer-config)
:monitoring monitoring
:virtual-peer (component/using
(virtual-peer group-ch outbox-ch log peer-config onyx-task vpeer-id)
[:group-id :messenger-group :monitoring :logging-config :state-store-group])}))
(defn onyx-peer-group
[peer-config]
(map->OnyxPeerGroup
{:config peer-config
:logging-config (logging-config/logging-configuration peer-config)
:monitoring (component/using (metrics-monitoring/new-monitoring) [:logging-config])
:state-store-group (component/using (queryable-state/new-state-store-group peer-config) [:logging-config])
:messenger-group (component/using (m/build-messenger-group peer-config) [:logging-config])
:query-server (component/using (qs/query-server peer-config) [:logging-config :state-store-group :monitoring])
:peer-group-manager (component/using (pgm/peer-group-manager peer-config onyx-vpeer-system)
[:logging-config :monitoring
:state-store-group :messenger-group
:query-server])}))
(defmethod clojure.core/print-method OnyxPeer
[system ^java.io.Writer writer]
(.write writer "#<Onyx Peer>"))
(defmethod clojure.core/print-method OnyxPeerGroup
[system ^java.io.Writer writer]
(.write writer "#<Onyx Peer Group>"))
(defmethod clojure.core/print-method OnyxTask
[system ^java.io.Writer writer]
(.write writer "#<Onyx Task>"))