Browse files

isolate from scratch reassignment to one function

  • Loading branch information...
1 parent cf7c9d1 commit 2d0df523c245dac8e60dfd4e91ec8ff467685dac @nathanmarz committed Dec 18, 2011
Showing with 10 additions and 4 deletions.
  1. +10 −4 src/clj/backtype/storm/daemon/nimbus.clj
14 src/clj/backtype/storm/daemon/nimbus.clj
@@ -124,7 +124,7 @@
:kill (kill-transition nimbus storm-id)
:do-rebalance (fn []
- ;; TODO: reassign from scratch
+ (mk-assignments nimbus storm-id :scratch? true)
(:old-status status))
@@ -419,10 +419,15 @@
;; TODO: slots that have dead task should be reused as long as supervisor is active
;; public so it can be mocked out
-(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state callback task-heartbeats-cache]
+(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state callback task-heartbeats-cache scratch?]
+ ;; TODO: implement scratch?
(let [available-slots (available-slots conf storm-cluster-state callback)
existing-assigned (reverse-map (:task->node+port existing-assignment))
storm-conf (read-storm-conf conf storm-id)
+ ;; if scratch?, slots = existing slots + available-slots
+ ;; otherwise, do timeout stuff
all-task-ids (set (.task-ids storm-cluster-state storm-id))
alive-ids (set (alive-tasks conf storm-id storm-cluster-state
all-task-ids (:task->start-time-secs existing-assignment) task-heartbeats-cache))
@@ -462,7 +467,7 @@
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many tasks should be in each slot (e.g., 4, 4, 4, 5)
;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
;; edge case for slots with no task timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the task will timeout and won't assign here next time around
-(defn- mk-assignments [nimbus storm-id]
+(defnk mk-assignments [nimbus storm-id :scratch? false]
(log-debug "Determining assignment for " storm-id)
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
@@ -472,7 +477,8 @@
existing-assignment (.assignment-info storm-cluster-state storm-id nil)
task->node+port (compute-new-task->node+port conf storm-id existing-assignment
storm-cluster-state callback
- (:task-heartbeats-cache nimbus))
+ (:task-heartbeats-cache nimbus)
+ scratch?)
all-node->host (merge (:node->host existing-assignment) node->host)
reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)

0 comments on commit 2d0df52

Please sign in to comment.