This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
planning.cljc
68 lines (58 loc) · 2.37 KB
/
planning.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
(ns onyx.static.planning
(:require [com.stuartsierra.dependency :as dep]
[taoensso.timbre :as timbre :refer [debug info]]))
(defn only [coll]
(when (next coll)
(throw (ex-info "More than one element in collection, expected count of 1" {:coll coll})))
(if-let [result (first coll)]
result
(throw (ex-info "Zero elements in collection, expected exactly one" {:coll coll}))))
(defn find-task [catalog task-name]
(let [matches (filter #(= task-name (:onyx/name %)) catalog)]
(only matches)))
(defn grouping-task? [task-map]
(boolean (or (:onyx/group-by-fn task-map)
(:onyx/group-by-key task-map))))
(defn create-task [task-ids catalog task-name parents children-names]
(let [element (find-task catalog task-name)
children (mapv (partial find-task catalog) children-names)
element-name (:onyx/name element)
task-id (task-ids element-name)]
{:id task-id
:name element-name
:ingress-tasks (set (map :name parents))
:egress-tasks-batch-sizes (into {} (mapv (juxt :onyx/name :onyx/batch-size) children))
:egress-tasks (set (map :onyx/name children))}))
(defn to-dependency-graph [workflow]
(reduce (fn [g edge]
(apply dep/depend g (reverse edge)))
(dep/graph)
workflow))
(defn max-depth [g node]
(if-let [ds (seq (dep/immediate-dependents g node))]
(inc (apply max (map #(max-depth g %) ds)))
0))
(defn workflow-depth [workflow]
(let [edges (set (reduce into [] workflow))
g (to-dependency-graph workflow)]
(inc (apply max (map #(max-depth g %) edges)))))
(defn remove-dupes [coll]
(map last (vals (group-by :name coll))))
(defn gen-task-ids [nodes]
(into {} (map (juxt identity identity) nodes)))
(defn discover-tasks
([catalog workflow]
(discover-tasks catalog workflow gen-task-ids))
([catalog workflow gen-task-fn]
(let [dag (to-dependency-graph workflow)
sorted-dag (dep/topo-sort dag)
task-ids (gen-task-fn sorted-dag)]
(remove-dupes
(reduce
(fn [tasks element]
(let [parents (dep/immediate-dependencies dag element)
children (dep/immediate-dependents dag element)
parent-entries (filter #(some #{(:name %)} parents) tasks)]
(conj tasks (create-task task-ids catalog element parent-entries children))))
[]
sorted-dag)))))