Permalink
Browse files

fix pluggable scheduler to respect dynamic number of workers

  • Loading branch information...
1 parent 0d86392 commit 5df7182d52d00734f665f8e9e14a6d6fac4e2b6f @nathanmarz committed Jun 6, 2012
@@ -299,6 +299,7 @@
))))
(declare compute-executor->component)
+
(defn read-topology-details [nimbus storm-id]
(let [conf (:conf nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
@@ -310,6 +311,7 @@
(TopologyDetails. storm-id
topology-conf
topology
+ (:num-workers storm-base)
executor->component
)))
@@ -29,15 +29,14 @@
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
- topology-conf (.getConf topology)
available-slots (->> (.getAvailableSlots cluster)
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
- total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
+ total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
bad-slots (bad-slots alive-assigned (count all-executors) total-slots-to-use)]]
(.freeSlots cluster bad-slots)
@@ -25,15 +25,14 @@
(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
(let [topology-id (.getId topology)
- topology-conf (.getConf topology)
available-slots (->> (.getAvailableSlots cluster)
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
- total-slots-to-use (min (topology-conf TOPOLOGY-WORKERS)
+ total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
reassign-slots (take (- total-slots-to-use (count alive-assigned))
(sort-slots available-slots))
@@ -69,7 +69,7 @@ public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, Scheduler
* </ul>
*/
public boolean needsScheduling(TopologyDetails topology) {
- int desiredNumWorkers = ((Number) topology.getConf().get(Config.TOPOLOGY_WORKERS)).intValue();
+ int desiredNumWorkers = topology.getNumWorkers();
int assignedNumWorkers = this.getAssignedNumWorkers(topology);
if (desiredNumWorkers > assignedNumWorkers) {
@@ -13,17 +13,17 @@
Map topologyConf;
StormTopology topology;
Map<ExecutorDetails, String> executorToComponent;
+ int numWorkers;
- public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology) {
+ public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) {
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.topology = topology;
+ this.numWorkers = numWorkers;
}
- public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, Map<ExecutorDetails, String> executorToComponents) {
- this.topologyId = topologyId;
- this.topologyConf = topologyConf;
- this.topology = topology;
+ public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
+ this(topologyId, topologyConf, topology, numWorkers);
this.executorToComponent = new HashMap<ExecutorDetails, String>(0);
if (executorToComponents != null) {
this.executorToComponent.putAll(executorToComponents);
@@ -42,6 +42,10 @@ public Map getConf() {
return topologyConf;
}
+ public int getNumWorkers() {
+ return numWorkers;
+ }
+
public StormTopology getTopology() {
return topology;
}
@@ -52,15 +52,15 @@
(deftest test-topologies
(let [executor1 (ExecutorDetails. (int 1) (int 5))
executor2 (ExecutorDetails. (int 6) (int 10))
- topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.)
+ topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1
{executor1 "spout1"
executor2 "bolt1"})
;; test topology.selectExecutorToComponent
executor->comp (.selectExecutorToComponent topology1 (list executor1))
_ (is (= (clojurify-executor->comp {executor1 "spout1"})
(clojurify-executor->comp executor->comp)))
;; test topologies.getById
- topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) {})
+ topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} (StormTopology.) 1 {})
topologies (Topologies. {"topology1" topology1 "topology2" topology2})
_ (is (= "topology1" (->> "topology1"
(.getById topologies)
@@ -84,19 +84,22 @@
executor21 (ExecutorDetails. (int 201) (int 205))
executor22 (ExecutorDetails. (int 206) (int 210))
;; topology1 needs scheduling: executor3 is NOT assigned a slot.
- topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1" TOPOLOGY-WORKERS 2}
+ topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"}
(StormTopology.)
+ 2
{executor1 "spout1"
executor2 "bolt1"
executor3 "bolt2"})
;; topology2 is fully scheduled
- topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2" TOPOLOGY-WORKERS 2}
+ topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"}
(StormTopology.)
+ 2
{executor11 "spout11"
executor12 "bolt12"})
;; topology3 needs scheduling, since the assignment is squeezed
- topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3" TOPOLOGY-WORKERS 2}
+ topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"}
(StormTopology.)
+ 2
{executor21 "spout21"
executor22 "bolt22"})
topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3})

0 comments on commit 5df7182

Please sign in to comment.