diff --git a/src/clj/backtype/storm/daemon/nimbus.clj b/src/clj/backtype/storm/daemon/nimbus.clj index 02bec262a..0675ced13 100644 --- a/src/clj/backtype/storm/daemon/nimbus.clj +++ b/src/clj/backtype/storm/daemon/nimbus.clj @@ -255,7 +255,7 @@ ))) (defn- available-slots - [nimbus topologies] + [nimbus topologies-missing-assignments topologies] (let [storm-cluster-state (:storm-cluster-state nimbus) ^INimbus inimbus (:inimbus nimbus) @@ -274,6 +274,7 @@ supervisor-details worker-slots topologies + topologies-missing-assignments ) ] (for [^WorkerSlot slot ret] @@ -542,8 +543,17 @@ topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus existing-assignments topology->alive-executors) + + missing-assignment-topologies (->> topologies + .getTopologies + (map (memfn getId)) + (filter (fn [t] + (let [alle (get topology->executors t) + alivee (get topology->alive-executors t)] + (or (empty? alle) (not= alle alivee)) + )))) available-slots (->> topologies - (available-slots nimbus) + (available-slots nimbus missing-assignment-topologies) (map (fn [[node-id port]] {node-id #{port}})) (apply merge-with set/union)) assigned-slots (assigned-slots storm-cluster-state) @@ -1101,7 +1111,7 @@ (reify INimbus (prepare [this conf local-dir] ) - (availableSlots [this supervisors used-slots topologies] + (availableSlots [this supervisors used-slots topologies topologies-missing-assignments] (let [all-slots (->> supervisors (mapcat (fn [^SupervisorDetails s] (for [p (.getMeta s)] diff --git a/src/jvm/backtype/storm/scheduler/INimbus.java b/src/jvm/backtype/storm/scheduler/INimbus.java index 0ea01e324..65066b3e4 100644 --- a/src/jvm/backtype/storm/scheduler/INimbus.java +++ b/src/jvm/backtype/storm/scheduler/INimbus.java @@ -10,7 +10,7 @@ public interface INimbus { // 1. if some slots are used, return as much as it currently has available // 2. otherwise return nothing until it has enough slots, or enough time has passed // sets the node id as {normalized hostname (invalid chars removed}-{topologyid} - Collection availableSlots(Collection existingSupervisors, Collection usedSlots, Topologies topologies); + Collection availableSlots(Collection existingSupervisors, Collection usedSlots, Topologies topologies, Collection topologiesWithMissingAssignments); // mesos should call launchTasks on an executor for this topology... // gives it the executor with: // - name: the node id