This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
job.clj
47 lines (42 loc) · 1.93 KB
/
job.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
(ns onyx.job
(:require [schema.core :as s]
[onyx.schema :as os]))
(def base-schemas
{:task-map os/TaskMap
:lifecycles [os/Lifecycle]
:triggers [os/Trigger]
:windows [os/Window]
:flow-conditions [os/FlowCondition]
:resume-point os/ResumePoint})
(defn vschema-merge [schema base-schema]
[(os/combine-restricted-ns (merge (first schema) (or (:schema (first base-schema))
(first base-schema))))])
(defn compose-schemas [{:keys [task schema]} base-schema]
(let [{:keys [task-map lifecycles
triggers windows
flow-conditions]} schema]
(-> schema
(update :task-map os/UniqueTaskMap)
(update :lifecycles vschema-merge (:lifecycles base-schema))
(update :triggers vschema-merge (:triggers base-schema))
(update :windows vschema-merge (:windows base-schema))
(update :flow-conditions vschema-merge (:flow-conditions base-schema))
(update :job-metadata vschema-merge (:job-metadata base-schema))
(update :resume-point vschema-merge (:resume-point base-schema))
(select-keys (keys task)))))
(s/defn ^:always-validate add-task :- os/PartialJob
"Adds a task's task-definition to a job"
([job task-definition & behaviors]
(add-task job (reduce (fn [acc f] (f acc)) task-definition behaviors)))
([job task-bundle]
(let [{:keys [task schema]} task-bundle
{:keys [task-map lifecycles triggers windows flow-conditions]} task
composed-schema (compose-schemas task-bundle base-schemas)]
(s/validate composed-schema task)
(cond-> job
task-map (update :catalog conj task-map)
lifecycles (update :lifecycles into lifecycles)
triggers (update :triggers into triggers)
windows (update :windows into windows)
flow-conditions (update :flow-conditions into flow-conditions)))))
(defmulti register-job (fn [job-name config] job-name))