forked from nathanmarz/storm
/
IsolationScheduler.clj
208 lines (187 loc) · 8.36 KB
/
IsolationScheduler.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
(ns backtype.storm.scheduler.IsolationScheduler
(:use [backtype.storm util config log])
(:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
(:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot SchedulerAssignment
EvenScheduler ExecutorDetails])
(:gen-class
:init init
:constructors {[] []}
:state state
:implements [backtype.storm.scheduler.IScheduler]))
(defn -init []
[[] (container)])
(defn -prepare [this conf]
(container-set! (.state this) conf))
(defn- compute-worker-specs "Returns list of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
reverse-map
(map second)
(apply interleave-all)
(partition-fixed (.getNumWorkers details))
(map set)))
(defn- compute-worker-specs "Returns mutable set of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
reverse-map
(map second)
(apply concat)
(map vector (repeat-seq (range (.getNumWorkers details))))
(group-by first)
(map-val #(map second %))
vals
(map set)
(HashSet.)
))
(defn isolated-topologies [conf topologies]
(let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
(filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
))
;; map from topology id -> set of sets of executors
(defn topology-worker-specs [iso-topologies]
(->> iso-topologies
(map (fn [t] {(.getId t) (compute-worker-specs t)}))
(apply merge)))
(defn machine-distribution [conf ^TopologyDetails topology]
(let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
machines (get name->machines (.getName topology))
workers (.getNumWorkers topology)]
(-> (integer-divided workers machines)
(dissoc 0)
(HashMap.)
)))
(defn topology-machine-distribution [conf iso-topologies]
(->> iso-topologies
(map (fn [t] {(.getId t) (machine-distribution conf t)}))
(apply merge)))
(defn host-assignments [^Cluster cluster]
(letfn [(to-slot-specs [^SchedulerAssignment ass]
(->> ass
.getExecutorToSlot
reverse-map
(map (fn [[slot executors]]
[slot (.getTopologyId ass) (set executors)]))))]
(->> cluster
.getAssignments
vals
(mapcat to-slot-specs)
(group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
)))
(defn- decrement-distribution! [^Map distribution value]
(let [v (-> distribution (get value) dec)]
(if (zero? v)
(.remove distribution value)
(.put distribution value v))))
;; returns list of list of slots, reverse sorted by number of slots
(defn- host-assignable-slots [^Cluster cluster]
(-<> cluster
.getAssignableSlots
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
(dissoc <> nil)
(sort-by #(-> % second count -) <>)
shuffle
(LinkedList. <>)
))
(defn- host->used-slots [^Cluster cluster]
(->> cluster
.getUsedSlots
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
))
(defn- distribution->sorted-amts [distribution]
(->> distribution
(mapcat (fn [[val amt]] (repeat amt val)))
(sort-by -)
))
(defn- allocated-topologies [topology-worker-specs]
(->> topology-worker-specs
(filter (fn [[_ worker-specs]] (empty? worker-specs)))
(map first)
set
))
(defn- leftover-topologies [^Topologies topologies filter-ids-set]
(->> topologies
.getTopologies
(filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
(map (fn [^TopologyDetails t] {(.getId t) t}))
(apply merge)
(Topologies.)
))
;; for each isolated topology:
;; compute even distribution of executors -> workers on the number of workers specified for the topology
;; compute distribution of workers to machines
;; determine host -> list of [slot, topology id, executors]
;; iterate through hosts and: a machine is good if:
;; 1. only running workers from one isolated topology
;; 2. all workers running on it match one of the distributions of executors for that topology
;; 3. matches one of the # of workers
;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
;; otherwise unassign all other workers for isolated topologies if assigned
(defn remove-elem-from-set! [^Set aset]
(let [elem (-> aset .iterator .next)]
(.remove aset elem)
elem
))
;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
;; blacklist all machines who had production slots defined
;; log isolated topologies who weren't able to get enough slots / machines
;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
;; set blacklist to what it was initially
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
(let [conf (container-get (.state this))
orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
iso-topologies (isolated-topologies conf (.getTopologies topologies))
iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
topology-worker-specs (topology-worker-specs iso-topologies)
topology-machine-distribution (topology-machine-distribution conf iso-topologies)
host-assignments (host-assignments cluster)]
(doseq [[host assignments] host-assignments]
(let [top-id (-> assignments first second)
distribution (get topology-machine-distribution top-id)
^Set worker-specs (get topology-worker-specs top-id)
num-workers (count assignments)
]
(if (and (contains? iso-ids-set top-id)
(every? #(= (second %) top-id) assignments)
(contains? distribution num-workers)
(every? #(contains? worker-specs (nth % 2)) assignments))
(do (decrement-distribution! distribution num-workers)
(doseq [[_ _ executors] assignments] (.remove worker-specs executors))
(.blacklistHost cluster host))
(doseq [[slot top-id _] assignments]
(when (contains? iso-ids-set top-id)
(.freeSlot cluster slot)
))
)))
(let [host->used-slots (host->used-slots cluster)
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
(doseq [[top-id worker-specs] topology-worker-specs
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
(doseq [amt amts
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
(when (and host-slots (>= (count host-slots) amt))
(.poll sorted-assignable-hosts)
(.freeSlots cluster (get host->used-slots host))
(doseq [slot (take amt host-slots)
:let [executors-set (remove-elem-from-set! worker-specs)]]
(.assign cluster slot top-id executors-set))
(.blacklistHost cluster host))
)))
(let [failed-iso-topologies (->> topology-worker-specs
(mapcat (fn [[top-id worker-specs]]
(if-not (empty? worker-specs) [top-id])
)))]
(if (empty? failed-iso-topologies)
;; run default scheduler on non-isolated topologies
(-<> topology-worker-specs
allocated-topologies
(leftover-topologies topologies <>)
(DefaultScheduler/default-schedule <> cluster))
(log-warn "Unstable to isolate topologies " (pr-str failed-iso-topologies) ". Will wait for enough resources for isolated topologies before allocating any other resources.")
))
(.setBlacklistedHosts cluster orig-blacklist)
))