Permalink
Browse files

refactor mk-assignments

  • Loading branch information...
1 parent 861d9ab commit 940e136dbff621d40f6155578e07b963fcc0266f @nathanmarz committed Dec 18, 2011
Showing with 11 additions and 12 deletions.
  1. +11 −12 src/clj/backtype/storm/daemon/nimbus.clj
@@ -457,12 +457,17 @@
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many tasks should be in each slot (e.g., 4, 4, 4, 5)
;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
;; edge case for slots with no task timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the task will timeout and won't assign here next time around
-(defn- mk-assignments [conf storm-id storm-cluster-state callback task-heartbeats-cache]
+(defn- mk-assignments [nimbus storm-id]
(log-debug "Determining assignment for " storm-id)
- (let [existing-assignment (.assignment-info storm-cluster-state storm-id nil)
+ (let [conf (:conf nimbus)
+ storm-cluster-state (:storm-cluster-state nimbus)
+ callback (fn [& ignored] (transition! nimbus storm-id :monitor))
+
+ existing-assignment (.assignment-info storm-cluster-state storm-id nil)
[node->host available-slots] (available-slots conf storm-cluster-state callback)
task->node+port (compute-new-task->node+port conf storm-id existing-assignment
- storm-cluster-state available-slots callback task-heartbeats-cache)
+ storm-cluster-state available-slots
+ callback (:task-heartbeats-cache nimbus))
all-node->host (merge (:node->host existing-assignment) node->host)
reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)
now-secs (current-time-secs)
@@ -493,12 +498,8 @@
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)]
(when (conf NIMBUS-REASSIGN)
- (mk-assignments conf
- storm-id
- storm-cluster-state
- (fn [& ignored] (transition! nimbus storm-id :monitor))
- (:task-heartbeats-cache nimbus))
- )))
+ (mk-assignments nimbus
+ storm-id))))
(defn- start-storm [storm-name storm-cluster-state storm-id]
(log-message "Activating " storm-name ": " storm-id)
@@ -632,9 +633,7 @@
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(setup-storm-static conf storm-id storm-cluster-state)
- (mk-assignments conf storm-id storm-cluster-state
- (fn [& ignored] (transition! nimbus storm-id :monitor))
- (:task-heartbeats-cache nimbus))
+ (mk-assignments nimbus storm-id)
(start-storm storm-name storm-cluster-state storm-id))
))

0 comments on commit 940e136

Please sign in to comment.