Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Dec 20, 2012
2 parents f771774 + a0e52c7 commit 70242cd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
14 changes: 11 additions & 3 deletions src/clj/backtype/storm/scheduler/IsolationScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
(LinkedList. <>)
))

(defn- host->used-slots [^Cluster cluster]
(->> cluster
.getUsedSlots
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
))

(defn- distribution->sorted-amts [distribution]
(->> distribution
(mapcat (fn [[val amt]] (repeat amt val)))
Expand Down Expand Up @@ -156,7 +162,8 @@
^Set worker-specs (get topology-worker-specs top-id)
num-workers (count host-assignments)
]
(if (and (every? #(= (second %) top-id) assignments)
(if (and (contains? iso-ids-set top-id)
(every? #(= (second %) top-id) assignments)
(contains? distribution num-workers)
(every? #(contains? worker-specs (nth % 2)) assignments))
(do (decrement-distribution! distribution num-workers)
Expand All @@ -168,15 +175,16 @@
))
)))

(let [^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
(let [host->used-slots (host->used-slots cluster)
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
(doseq [[top-id worker-specs] topology-worker-specs
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
(doseq [amt amts
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
(when (and host-slots (>= (count host-slots) amt))
(.poll sorted-assignable-hosts)
(.freeSlots cluster host-slots)
(.freeSlots cluster (get host->used-slots host))
(doseq [slot (take amt host-slots)
:let [executors-set (remove-elem-from-set! worker-specs)]]
(.assign cluster slot top-id executors-set))
Expand Down
6 changes: 4 additions & 2 deletions src/jvm/backtype/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,10 @@ public void freeSlot(WorkerSlot slot) {
* @param slots
*/
public void freeSlots(Collection<WorkerSlot> slots) {
for (WorkerSlot slot : slots) {
this.freeSlot(slot);
if(slots!=null) {
for (WorkerSlot slot : slots) {
this.freeSlot(slot);
}
}
}

Expand Down

0 comments on commit 70242cd

Please sign in to comment.