Permalink
Browse files

finished rebalance implementation

  • Loading branch information...
1 parent 2d0df52 commit 943cfa93a14cdc43f6f8bfecf6523e3c415a2b0f @nathanmarz committed Dec 18, 2011
Showing with 16 additions and 9 deletions.
  1. +16 −9 src/clj/backtype/storm/daemon/nimbus.clj
@@ -7,6 +7,7 @@
(:import [java.nio.channels Channels WritableByteChannel])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
+ (:use [clojure.contrib.def :only [defnk]])
(:gen-class))
(bootstrap)
@@ -420,22 +421,28 @@
;; public so it can be mocked out
(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state callback task-heartbeats-cache scratch?]
- ;; TODO: implement scratch?
- (let [available-slots (available-slots conf storm-cluster-state callback)
- existing-assigned (reverse-map (:task->node+port existing-assignment))
+ (let [available-slots (available-slots conf storm-cluster-state callback)
storm-conf (read-storm-conf conf storm-id)
+ all-task-ids (set (.task-ids storm-cluster-state storm-id))
- ;; if scratch?, slots = existing slots + available-slots
+
+ ;; if scratch?, slots = existing slots + available-slots, reassign-ids = all-ids
;; otherwise, do timeout stuff
+ existing-assigned (reverse-map (:task->node+port existing-assignment))
+ alive-ids (if scratch?
+ all-task-ids
+ (set (alive-tasks conf storm-id storm-cluster-state
+ all-task-ids (:task->start-time-secs existing-assignment)
+ task-heartbeats-cache)))
- all-task-ids (set (.task-ids storm-cluster-state storm-id))
- alive-ids (set (alive-tasks conf storm-id storm-cluster-state
- all-task-ids (:task->start-time-secs existing-assignment) task-heartbeats-cache))
alive-assigned (filter-val (partial every? alive-ids) existing-assigned)
- alive-node-ids (map first (keys alive-assigned))
+
total-slots-to-use (min (storm-conf TOPOLOGY-WORKERS)
(+ (count available-slots) (count alive-assigned)))
- keep-assigned (keeper-slots alive-assigned (count all-task-ids) total-slots-to-use)
+ keep-assigned (if scratch?
+ {}
+ (keeper-slots alive-assigned (count all-task-ids) total-slots-to-use))
+
freed-slots (keys (apply dissoc alive-assigned (keys keep-assigned)))
reassign-slots (take (- total-slots-to-use (count keep-assigned))
(sort-slots (concat available-slots freed-slots)))

0 comments on commit 943cfa9

Please sign in to comment.