Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

more mk-assignments refactoring

  • Loading branch information...
commit cf7c9d1bd32ba8797eab1fef13c206c0d115c2d4 1 parent 940e136
Nathan Marz authored
Showing with 19 additions and 13 deletions.
  1. +19 −13 src/clj/backtype/storm/daemon/nimbus.clj
32 src/clj/backtype/storm/daemon/nimbus.clj
View
@@ -57,6 +57,7 @@
(declare reassign-topology)
(declare delay-event)
+(declare mk-assignments)
(defn kill-transition [nimbus storm-id]
(fn [kill-time]
@@ -246,6 +247,10 @@
supervisor-ids))
)))
+(defn get-node->host [storm-cluster-state callback]
+ (->> (all-supervisor-info storm-cluster-state callback)
+ (map-val :hostname)))
+
(defn- available-slots
[conf storm-cluster-state callback]
(let [supervisor-ids (.supervisors storm-cluster-state callback)
@@ -260,14 +265,13 @@
;; )
all-slots (map-val (comp set :worker-ports) supervisor-infos)
existing-slots (assigned-slots storm-cluster-state)
- ]
- [(map-val :hostname supervisor-infos)
- (mapcat
- (fn [[id slots]]
- (for [s (set/difference slots (existing-slots id))]
- [id s]))
- all-slots)
- ]))
+ ]
+ (mapcat
+ (fn [[id slots]]
+ (for [s (set/difference slots (existing-slots id))]
+ [id s]))
+ all-slots)
+ ))
(defn state-spout-parallelism [state-spout-spec]
(-> state-spout-spec .get_common thrift/parallelism-hint))
@@ -415,8 +419,9 @@
;; TODO: slots that have dead task should be reused as long as supervisor is active
;; public so it can be mocked out
-(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state available-slots callback task-heartbeats-cache]
- (let [existing-assigned (reverse-map (:task->node+port existing-assignment))
+(defn compute-new-task->node+port [conf storm-id existing-assignment storm-cluster-state callback task-heartbeats-cache]
+ (let [available-slots (available-slots conf storm-cluster-state callback)
+ existing-assigned (reverse-map (:task->node+port existing-assignment))
storm-conf (read-storm-conf conf storm-id)
all-task-ids (set (.task-ids storm-cluster-state storm-id))
alive-ids (set (alive-tasks conf storm-id storm-cluster-state
@@ -462,12 +467,13 @@
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
callback (fn [& ignored] (transition! nimbus storm-id :monitor))
+ node->host (get-node->host storm-cluster-state callback)
existing-assignment (.assignment-info storm-cluster-state storm-id nil)
- [node->host available-slots] (available-slots conf storm-cluster-state callback)
task->node+port (compute-new-task->node+port conf storm-id existing-assignment
- storm-cluster-state available-slots
- callback (:task-heartbeats-cache nimbus))
+ storm-cluster-state callback
+ (:task-heartbeats-cache nimbus))
+
all-node->host (merge (:node->host existing-assignment) node->host)
reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)
now-secs (current-time-secs)
Please sign in to comment.
Something went wrong with that request. Please try again.