Skip to content

Commit

Permalink
fix pluggable scheduler to respect dynamic number of workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Jun 6, 2012
1 parent 0d86392 commit 5df7182
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -299,6 +299,7 @@
))))

(declare compute-executor->component)

(defn read-topology-details [nimbus storm-id]
(let [conf (:conf nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
Expand All @@ -310,6 +311,7 @@
(TopologyDetails. storm-id
topology-conf
topology
(:num-workers storm-base)
executor->component
)))

Expand Down
3 changes: 1 addition & 2 deletions src/clj/backtype/storm/scheduler/DefaultScheduler.clj
Expand Up @@ -29,15 +29,14 @@
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
topology-conf (.getConf topology)
available-slots (->> (.getAvailableSlots cluster)
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
bad-slots (bad-slots alive-assigned (count all-executors) total-slots-to-use)]]
(.freeSlots cluster bad-slots)
Expand Down
3 changes: 1 addition & 2 deletions src/clj/backtype/storm/scheduler/EvenScheduler.clj
Expand Up @@ -25,15 +25,14 @@

(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
(let [topology-id (.getId topology)
topology-conf (.getConf topology)
available-slots (->> (.getAvailableSlots cluster)
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
reassign-slots (take (- total-slots-to-use (count alive-assigned))
(sort-slots available-slots))
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/scheduler/Cluster.java
Expand Up @@ -69,7 +69,7 @@ public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {
* </ul>
*/
public boolean needsScheduling(TopologyDetails topology) {
int desiredNumWorkers = ((Number) topology.getConf().get(Config.TOPOLOGY_WORKERS)).intValue();
int desiredNumWorkers = topology.getNumWorkers();
int assignedNumWorkers = this.getAssignedNumWorkers(topology);

if (desiredNumWorkers > assignedNumWorkers) {
Expand Down
14 changes: 9 additions & 5 deletions src/jvm/backtype/storm/scheduler/TopologyDetails.java
Expand Up @@ -13,17 +13,17 @@ public class TopologyDetails {
Map topologyConf;
StormTopology topology;
Map<ExecutorDetails, String> executorToComponent;
int numWorkers;

public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology) {
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) {
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.topology = topology;
this.numWorkers = numWorkers;
}

public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, Map<ExecutorDetails, String> executorToComponents) {
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.topology = topology;
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
this(topologyId, topologyConf, topology, numWorkers);
this.executorToComponent = new HashMap<ExecutorDetails, String>(0);
if (executorToComponents != null) {
this.executorToComponent.putAll(executorToComponents);
Expand All @@ -42,6 +42,10 @@ public Map getConf() {
return topologyConf;
}

public int getNumWorkers() {
return numWorkers;
}

public StormTopology getTopology() {
return topology;
}
Expand Down
13 changes: 8 additions & 5 deletions test/clj/backtype/storm/scheduler_test.clj
Expand Up @@ -52,15 +52,15 @@
(deftest test-topologies
(let [executor1 (ExecutorDetails. (int 1) (int 5))
executor2 (ExecutorDetails. (int 6) (int 10))
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.)
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1
{executor1 "spout1"
executor2 "bolt1"})
;; test topology.selectExecutorToComponent
executor->comp (.selectExecutorToComponent topology1 (list executor1))
_ (is (= (clojurify-executor->comp {executor1 "spout1"})
(clojurify-executor->comp executor->comp)))
;; test topologies.getById
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) {})
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {})
topologies (Topologies. {"topology1" topology1 "topology2" topology2})
_ (is (= "topology1" (->> "topology1"
(.getById topologies)
Expand All @@ -84,19 +84,22 @@
executor21 (ExecutorDetails. (int 201) (int 205))
executor22 (ExecutorDetails. (int 206) (int 210))
;; topology1 needs scheduling: executor3 is NOT assigned a slot.
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" TOPOLOGY-WORKERS 2}
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"}
(StormTopology.)
2
{executor1 "spout1"
executor2 "bolt1"
executor3 "bolt2"})
;; topology2 is fully scheduled
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-WORKERS 2}
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"}
(StormTopology.)
2
{executor11 "spout11"
executor12 "bolt12"})
;; topology3 needs scheduling, since the assignment is squeezed
topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" TOPOLOGY-WORKERS 2}
topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"}
(StormTopology.)
2
{executor21 "spout21"
executor22 "bolt22"})
topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3})
Expand Down

0 comments on commit 5df7182

Please sign in to comment.