This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
colocated_task_scheduler.clj
143 lines (129 loc) · 5.44 KB
/
colocated_task_scheduler.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
(ns onyx.scheduling.colocated-task-scheduler
(:require [onyx.scheduling.common-task-scheduler :as cts]
[onyx.scheduling.common-job-scheduler :as cjs]
[onyx.log.commands.common :as common]
[onyx.messaging.protocols.messenger :as m]
[onyx.extensions :as extensions])
(:import [org.btrplace.model.constraint Fence SplitAmong Ban]))
(defn get-peer-site [replica p]
(get-in replica [:peer-sites p]))
(defn site->peers [replica]
(group-by
(fn [p] (get-peer-site replica p))
(:peers replica)))
(defn large-enough-sites [site->peers-mapping min-peers]
(reduce-kv
(fn [result peer-site peers]
(if (>= (count peers) min-peers)
(assoc result peer-site peers)
result))
{}
site->peers-mapping))
(defn global-saturation-lower-bound [replica job-id task-ids]
(let [saturation-values
(remove nil?
(map #(get-in replica [:task-saturation job-id %])
task-ids))]
(if (seq saturation-values)
(apply min saturation-values)
Double/POSITIVE_INFINITY)))
(defmethod cts/task-distribute-peer-count :onyx.task-scheduler/colocated
[replica job-id n]
(let [task-ids (get-in replica [:tasks job-id])
capacity (count task-ids)
site->peers-mapping (large-enough-sites (site->peers replica) capacity)
n-candidate-peers (apply + (map count (vals site->peers-mapping)))
lower-bound (global-saturation-lower-bound replica job-id task-ids)
upper-bound (int (/ n-candidate-peers capacity))]
(zipmap task-ids (repeat (min lower-bound upper-bound)))))
(defn ban-smaller-sites [replica jobs peer->vm task->node large-sites rejected]
(let [sites (keys large-sites)
peer-ids (into
((group-by #(some #{(get-peer-site replica %)} sites)
(:peers replica))
nil)
rejected)
jobs (filter #(= (get-in replica [:task-schedulers %])
:onyx.task-scheduler/colocated)
jobs)
nodes (mapcat
(fn [job-id]
(map
(fn [task-id]
(task->node [job-id task-id]))
(get-in replica [:tasks job-id])))
jobs)]
(map #(Ban. (peer->vm %) nodes) peer-ids)))
(defn non-colocated-tasks [replica jobs]
(reduce
(fn [result job-id]
(if (not= (get-in replica [:task-schedulers job-id])
:onyx.task-scheduler/colocated)
(into result
(map
(fn [task-id] [job-id task-id])
(get-in replica [:tasks job-id])))
result))
[]
jobs))
(defn select-peers [site->peers total size]
(assert (zero? (mod total size)))
(reduce-kv
(fn [{:keys [selected rejected] :as result} site peer-ids]
(if (>= (count selected) total)
(update-in result [:rejected] into peer-ids)
(let [difference (- total (count selected))
sel (take difference (drop-last (mod (count peer-ids) size) peer-ids))
rej (take-last (mod (count peer-ids) size) peer-ids)]
(-> result
(update-in [:selected] into sel)
(update-in [:rejected] into rej)))))
{:selected []
:rejected []}
site->peers))
(defmethod cts/task-constraints :onyx.task-scheduler/colocated
[replica jobs task-capacities peer->vm task->node no-op-node job-id]
(let [task-ids (get-in replica [:tasks job-id])
capacity (count task-ids)
boxes (second (first task-capacities))
n-peers (* capacity boxes)
site->peers-mapping (large-enough-sites (site->peers replica) capacity)
peers (mapcat second (into [] site->peers-mapping))
{:keys [selected rejected]} (select-peers site->peers-mapping n-peers capacity)
unrestricted-tasks (conj (map task->node (non-colocated-tasks replica jobs)) no-op-node)]
(into
(reduce
(fn [result peer-ids]
(conj result
(SplitAmong.
(map (comp vector peer->vm) peer-ids)
(map #(vector (get task->node [job-id %])) task-ids))))
[]
(partition capacity selected))
(ban-smaller-sites replica jobs peer->vm task->node site->peers-mapping rejected))))
(defmethod cts/assign-capacity-constraint? :onyx.task-scheduler/colocated
[replica job-id]
false)
(defn find-colocated-peers [replica this-peer other-peers]
(let [my-site (m/get-peer-site replica this-peer)]
(filter #(= my-site (m/get-peer-site replica %)) other-peers)))
(defn choose-candidates [replica peer-config this-peer downstream-peers]
(if (:onyx.task-scheduler.colocated/only-send-local? peer-config)
(find-colocated-peers replica this-peer downstream-peers)
downstream-peers))
(defmethod cts/choose-downstream-peers :onyx.task-scheduler/colocated
[replica job-id peer-config this-peer downstream-peers]
(let [candidates (choose-candidates replica peer-config this-peer downstream-peers)]
(fn [hash-group]
(rand-nth candidates))))
(defmethod cts/choose-acker :onyx.task-scheduler/colocated
[replica job-id peer-config this-peer ackers]
(let [candidates (choose-candidates replica peer-config this-peer ackers)]
(if (not (seq candidates))
(throw
(ex-info
(format
"Job %s does not have an acker per machine, which is needed for the colocated task scheduler. Raise the limit via the job parameter :acker/percentage." job-id)
{}))
(fn []
(rand-nth candidates)))))