Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' into 0.9.0

  • Loading branch information...
commit 1f1511e39f8d415d280653ca7f8ec37ba42cd1a9 2 parents cb5b413 + b467750
@nathanmarz nathanmarz authored
View
3  CHANGELOG.md
@@ -8,6 +8,7 @@
## Unreleased (0.8.2)
+ * Added backtype.storm.scheduler.IsolationScheduler. This lets you run topologies that are completely isolated at the machine level. Configure Nimbus to isolate certain topologies, and how many machines to give to each of those topologies, with the isolation.scheduler.machines config in Nimbus's storm.yaml. Topologies run on the cluster that are not listed there will share whatever remaining machines there are on the cluster after machines are allocated to the listed topologies.
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
* Added report-error! to Clojure DSL
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -36,12 +37,14 @@
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
* Added MultiScheme interface (thanks sritchie)
* Added MockTridentTuple for testing (thanks emblem)
+ * Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
* Bug fix: fixed NPE when emitting during emit method of Aggregator
+ * Bug fix: URLs with periods in them in Storm UI now route correctly
## 0.8.1
View
2  project.clj
@@ -12,7 +12,7 @@
:exclusions [log4j/log4j]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
- [compojure "0.6.4"]
+ [compojure "1.1.3"]
[hiccup "0.3.6"]
[ring/ring-jetty-adapter "0.3.11"]
[org.clojure/tools.logging "0.2.3"]
View
2  src/clj/backtype/storm/daemon/nimbus.clj
@@ -569,7 +569,7 @@
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
- cluster (Cluster. supervisors topology->scheduler-assignment)
+ cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
View
12 src/clj/backtype/storm/scheduler/DefaultScheduler.clj
@@ -28,14 +28,15 @@
(->> slots
(filter
(fn [[node port]]
- (if-let [supervisor (.getSupervisorById cluster node)]
- (.contains (.getAllPorts supervisor) (int port))
- )))))
+ (if-not (.isBlackListed cluster node)
+ (if-let [supervisor (.getSupervisorById cluster node)]
+ (.contains (.getAllPorts supervisor) (int port))
+ ))))))
(defn -prepare [this conf]
)
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+(defn default-schedule [^Topologies topologies ^Cluster cluster]
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
@@ -54,3 +55,6 @@
[])]]
(.freeSlots cluster bad-slots)
(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
+
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (default-schedule topologies cluster))
View
198 src/clj/backtype/storm/scheduler/IsolationScheduler.clj
@@ -0,0 +1,198 @@
+(ns backtype.storm.scheduler.IsolationScheduler
+ (:use [backtype.storm util config log])
+ (:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
+ (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
+ (:import [backtype.storm.scheduler IScheduler Topologies
+ Cluster TopologyDetails WorkerSlot SchedulerAssignment
+ EvenScheduler ExecutorDetails])
+ (:gen-class
+ :init init
+ :constructors {[] []}
+ :state state
+ :implements [backtype.storm.scheduler.IScheduler]))
+
+(defn -init []
+ [[] (container)])
+
+(defn -prepare [this conf]
+ (container-set! (.state this) conf))
+
+
+(defn- compute-worker-specs "Returns list of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply interleave-all)
+ (partition-fixed (.getNumWorkers details))
+ (map set)))
+
+(defn- compute-worker-specs "Returns mutable set of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply concat)
+ (map vector (repeat-seq (range (.getNumWorkers details))))
+ (group-by first)
+ (map-val #(map second %))
+ vals
+ (map set)
+ (HashSet.)
+ ))
+
+(defn isolated-topologies [conf topologies]
+ (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
+ (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
+ ))
+
+;; map from topology id -> set of sets of executors
+(defn topology-worker-specs [iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (compute-worker-specs t)}))
+ (apply merge)))
+
+(defn machine-distribution [conf ^TopologyDetails topology]
+ (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
+ machines (get name->machines (.getName topology))
+ workers (.getNumWorkers topology)]
+ (-> (integer-divided workers machines)
+ (dissoc 0)
+ (HashMap.)
+ )))
+
+(defn topology-machine-distribution [conf iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (machine-distribution conf t)}))
+ (apply merge)))
+
+(defn host-assignments [^Cluster cluster]
+ (letfn [(to-slot-specs [^SchedulerAssignment ass]
+ (->> ass
+ .getExecutorToSlot
+ reverse-map
+ (map (fn [[slot executors]]
+ [slot (.getTopologyId ass) (set executors)]))))]
+ (->> cluster
+ .getAssignments
+ vals
+ (mapcat to-slot-specs)
+ (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
+ )))
+
+(defn- decrement-distribution! [^Map distribution value]
+ (let [v (-> distribution (get value) dec)]
+ (if (zero? v)
+ (.remove distribution value)
+ (.put distribution value v))))
+
+;; returns list of list of slots, reverse sorted by number of slots
+(defn- host-assignable-slots [^Cluster cluster]
+ (-<> cluster
+ .getAssignableSlots
+ (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
+ (dissoc <> nil)
+ (sort-by #(-> % second count -) <>)
+ (LinkedList. <>)
+ ))
+
+(defn- distribution->sorted-amts [distribution]
+ (->> distribution
+ (mapcat (fn [[val amt]] (repeat amt val)))
+ (sort-by -)
+ ))
+
+(defn- allocated-topologies [topology-worker-specs]
+ (->> topology-worker-specs
+ (filter (fn [[_ worker-specs]] (empty? worker-specs)))
+ (map first)
+ set
+ ))
+
+(defn- leftover-topologies [^Topologies topologies filter-ids-set]
+ (->> topologies
+ .getTopologies
+ (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
+ (map (fn [^TopologyDetails t] {(.getId t) t}))
+ (apply merge)
+ (Topologies.)
+ ))
+
+;; for each isolated topology:
+;; compute even distribution of executors -> workers on the number of workers specified for the topology
+;; compute distribution of workers to machines
+;; determine host -> list of [slot, topology id, executors]
+;; iterate through hosts and: a machine is good if:
+;; 1. only running workers from one isolated topology
+;; 2. all workers running on it match one of the distributions of executors for that topology
+;; 3. matches one of the # of workers
+;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
+;; otherwise unassign all other workers for isolated topologies if assigned
+
+(defn remove-elem-from-set! [^Set aset]
+ (let [elem (-> aset .iterator .next)]
+ (.remove aset elem)
+ elem
+ ))
+
+;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
+;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
+;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
+;; blacklist all machines who had production slots defined
+;; log isolated topologies who weren't able to get enough slots / machines
+;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
+;; set blacklist to what it was initially
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (let [conf (container-get (.state this))
+ orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
+ iso-topologies (isolated-topologies conf (.getTopologies topologies))
+ iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
+ topology-worker-specs (topology-worker-specs iso-topologies)
+ topology-machine-distribution (topology-machine-distribution conf iso-topologies)
+ host-assignments (host-assignments cluster)]
+ (doseq [[host assignments] host-assignments]
+ (let [top-id (-> assignments first second)
+ distribution (get topology-machine-distribution top-id)
+ ^Set worker-specs (get topology-worker-specs top-id)
+ num-workers (count host-assignments)
+ ]
+ (if (and (every? #(= (second %) top-id) assignments)
+ (contains? distribution num-workers)
+ (every? #(contains? worker-specs (nth % 2)) assignments))
+ (do (decrement-distribution! distribution num-workers)
+ (doseq [[_ _ executors] assignments] (.remove worker-specs executors))
+ (.blacklistHost cluster host))
+ (doseq [[slot top-id _] assignments]
+ (when (contains? iso-ids-set top-id)
+ (.freeSlot cluster slot)
+ ))
+ )))
+
+ (let [^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
+ ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
+ (doseq [[top-id worker-specs] topology-worker-specs
+ :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
+ (doseq [amt amts
+ :let [[host host-slots] (.peek sorted-assignable-hosts)]]
+ (when (and host-slots (>= (count host-slots) amt))
+ (.poll sorted-assignable-hosts)
+ (.freeSlots cluster host-slots)
+ (doseq [slot (take amt host-slots)
+ :let [executors-set (remove-elem-from-set! worker-specs)]]
+ (.assign cluster slot top-id executors-set))
+ (.blacklistHost cluster host))
+ )))
+
+ (doseq [[top-id worker-specs] topology-worker-specs]
+ (if-not (empty? worker-specs)
+ (log-warn "Unable to isolate topology " top-id)
+ ))
+
+
+ ;; run default scheduler on iso topologies that didn't have enough slot + non-isolated topologies
+ (-<> topology-worker-specs
+ allocated-topologies
+ (leftover-topologies topologies <>)
+ (DefaultScheduler/default-schedule <> cluster))
+ (.setBlacklistedHosts cluster orig-blacklist)
+ ))
View
4 src/clj/backtype/storm/testing.clj
@@ -96,7 +96,7 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
daemon-conf (merge (read-storm-config)
@@ -113,7 +113,7 @@
port-counter (mk-counter)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (nimbus/standalone-nimbus))
+ (if inimbus inimbus (nimbus/standalone-nimbus)))
context (mk-shared-context daemon-conf)
cluster-map {:nimbus nimbus
:port-counter port-counter
View
10 src/clj/backtype/storm/util.clj
@@ -813,3 +813,13 @@
(let [klass (if (string? klass) (Class/forName klass) klass)]
(.newInstance klass)
))
+
+(defmacro -<>
+ ([x] x)
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
View
6 src/jvm/backtype/storm/Config.java
@@ -602,6 +602,12 @@
* it is not a production grade zookeeper setup.
*/
public static String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
+
+ /**
+ * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
+ * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
+ */
+ public static String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
View
71 src/jvm/backtype/storm/scheduler/Cluster.java
@@ -23,8 +23,12 @@
* a map from hostname to supervisor id.
*/
private Map<String, List<String>> hostToId;
+
+ private Set<String> blackListedHosts = new HashSet<String>();
+ private INimbus inimbus;
- public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
+ public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
+ this.inimbus = nimbus;
this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size());
this.supervisors.putAll(supervisors);
this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
@@ -39,7 +43,31 @@ public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, Scheduler
this.hostToId.get(host).add(nodeId);
}
}
-
+
+ public void setBlacklistedHosts(Set<String> hosts) {
+ blackListedHosts = hosts;
+ }
+
+ public Set<String> getBlacklistedHosts() {
+ return blackListedHosts;
+ }
+
+ public void blacklistHost(String host) {
+ // this is so it plays well with setting blackListedHosts to an immutable list
+ if(blackListedHosts==null) blackListedHosts = new HashSet<String>();
+ if(!(blackListedHosts instanceof HashSet))
+ blackListedHosts = new HashSet<String>(blackListedHosts);
+ blackListedHosts.add(host);
+ }
+
+ public boolean isBlackListed(String supervisorId) {
+ return blackListedHosts != null && blackListedHosts.contains(getHost(supervisorId));
+ }
+
+ public String getHost(String supervisorId) {
+ return inimbus.getHostName(supervisors, supervisorId);
+ }
+
/**
* Gets all the topologies which needs scheduling.
*
@@ -123,9 +151,9 @@ public boolean needsScheduling(TopologyDetails topology) {
* @param cluster
* @return
*/
- public List<Integer> getUsedPorts(SupervisorDetails supervisor) {
+ public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
Map<String, SchedulerAssignment> assignments = this.getAssignments();
- List<Integer> usedPorts = new ArrayList<Integer>();
+ Set<Integer> usedPorts = new HashSet<Integer>();
for (SchedulerAssignment assignment : assignments.values()) {
for (WorkerSlot slot : assignment.getExecutorToSlot().values()) {
@@ -144,15 +172,20 @@ public boolean needsScheduling(TopologyDetails topology) {
* @param cluster
* @return
*/
- public List<Integer> getAvailablePorts(SupervisorDetails supervisor) {
- List<Integer> usedPorts = this.getUsedPorts(supervisor);
+ public Set<Integer> getAvailablePorts(SupervisorDetails supervisor) {
+ Set<Integer> usedPorts = this.getUsedPorts(supervisor);
- List<Integer> ret = new ArrayList<Integer>();
- ret.addAll(supervisor.allPorts);
+ Set<Integer> ret = new HashSet();
+ ret.addAll(getAssignablePorts(supervisor));
ret.removeAll(usedPorts);
return ret;
}
+
+ public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {
+ if(isBlackListed(supervisor.id)) return new HashSet();
+ return supervisor.allPorts;
+ }
/**
* Return all the available slots on this supervisor.
@@ -161,7 +194,7 @@ public boolean needsScheduling(TopologyDetails topology) {
* @return
*/
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
- List<Integer> ports = this.getAvailablePorts(supervisor);
+ Set<Integer> ports = this.getAvailablePorts(supervisor);
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
for (Integer port : ports) {
@@ -171,6 +204,17 @@ public boolean needsScheduling(TopologyDetails topology) {
return slots;
}
+ public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
+ Set<Integer> ports = this.getAssignablePorts(supervisor);
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
+
+ for (Integer port : ports) {
+ slots.add(new WorkerSlot(supervisor.getId(), port));
+ }
+
+ return slots;
+ }
+
/**
* get the unassigned executors of the topology.
*/
@@ -246,6 +290,15 @@ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetail
return slots;
}
+
+ public List<WorkerSlot> getAssignableSlots() {
+ List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
+ for (SupervisorDetails supervisor : this.supervisors.values()) {
+ slots.addAll(this.getAssignableSlots(supervisor));
+ }
+
+ return slots;
+ }
/**
* Free the specified slot.
View
1  src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Set;
+//TODO: improve this by maintaining slot -> executors as well for more efficient operations
public class SchedulerAssignmentImpl implements SchedulerAssignment {
/**
* topology-id this assignment is for.
View
1  src/jvm/backtype/storm/scheduler/Topologies.java
@@ -9,6 +9,7 @@
Map<String, String> nameToId;
public Topologies(Map<String, TopologyDetails> topologies) {
+ if(topologies==null) topologies = new HashMap();
this.topologies = new HashMap<String, TopologyDetails>(topologies.size());
this.topologies.putAll(topologies);
this.nameToId = new HashMap<String, String>(topologies.size());
View
83 test/clj/backtype/storm/nimbus_test.clj
@@ -3,6 +3,7 @@
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
+ (:import [backtype.storm.scheduler INimbus])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
@@ -37,6 +38,32 @@
(count (reverse-map (:executor->node+port assignment)))
))
+(defn topology-nodes [state storm-name]
+ (let [storm-id (get-storm-id state storm-name)
+ assignment (.assignment-info state storm-id nil)]
+ (->> assignment
+ :executor->node+port
+ vals
+ (map first)
+ set
+ )))
+
+(defn topology-node-distribution [state storm-name]
+ (let [storm-id (get-storm-id state storm-name)
+ assignment (.assignment-info state storm-id nil)]
+ (->> assignment
+ :executor->node+port
+ vals
+ set
+ (group-by first)
+ (map-val count)
+ (map (fn [[_ amt]] {amt 1}))
+ (apply merge-with +)
+ )))
+
+(defn topology-num-nodes [state storm-name]
+ (count (topology-nodes state storm-name)))
+
(defn executor-assignment [cluster storm-id executor-id]
(let [state (:storm-cluster-state cluster)
assignment (.assignment-info state storm-id nil)]
@@ -139,6 +166,62 @@
)
)))
+(defn isolation-nimbus []
+ (let [standalone (nimbus/standalone-nimbus)]
+ (reify INimbus
+ (prepare [this conf local-dir]
+ (.prepare standalone conf local-dir)
+ )
+ (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
+ (.allSlotsAvailableForScheduling standalone supervisors topologies topologies-missing-assignments))
+ (assignSlots [this topology slots]
+ (.assignSlots standalone topology slots)
+ )
+ (getForcedScheduler [this]
+ (.getForcedScheduler standalone))
+ (getHostName [this supervisors node-id]
+ node-id
+ ))))
+
+(deftest test-isolated-assignment
+ (with-local-cluster [cluster :supervisors 6
+ :ports-per-supervisor 3
+ :inimbus (isolation-nimbus)
+ :daemon-conf {SUPERVISOR-ENABLE false
+ TOPOLOGY-ACKER-EXECUTORS 0
+ STORM-SCHEDULER "backtype.storm.scheduler.IsolationScheduler"
+ ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
+ }]
+ (letlocals
+ (bind state (:storm-cluster-state cluster))
+ (bind nimbus (:nimbus cluster))
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. false) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" :none} (TestPlannerBolt.) :parallelism-hint 5)
+ "3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}))
+
+ (submit-local-topology nimbus "noniso" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 4} topology)
+ (is (= 4 (topology-num-nodes state "noniso")))
+ (is (= 4 (storm-num-workers state "noniso")))
+
+ (submit-local-topology nimbus "tester1" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
+ (submit-local-topology nimbus "tester2" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
+
+ (bind task-info-tester1 (storm-component->task-info cluster "tester1"))
+ (bind task-info-tester2 (storm-component->task-info cluster "tester2"))
+
+
+ (is (= 1 (topology-num-nodes state "noniso")))
+ (is (= 3 (storm-num-workers state "noniso")))
+
+ (is (= {2 3} (topology-node-distribution state "tester1")))
+ (is (= {3 2} (topology-node-distribution state "tester2")))
+
+ (check-consistency cluster "tester1")
+ (check-consistency cluster "tester2")
+ (check-consistency cluster "noniso")
+ )))
+
(deftest test-zero-executor-or-tasks
(with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
View
4 test/clj/backtype/storm/scheduler_test.clj
@@ -1,6 +1,7 @@
(ns backtype.storm.scheduler-test
(:use [clojure test])
(:use [backtype.storm bootstrap config testing])
+ (:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [backtype.storm.generated StormTopology])
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
SchedulerAssignmentImpl Topologies TopologyDetails]))
@@ -112,7 +113,8 @@
assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1)
assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2)
assignment3 (SchedulerAssignmentImpl. "topology3" executor->slot3)
- cluster (Cluster. {"supervisor1" supervisor1 "supervisor2" supervisor2}
+ cluster (Cluster. (nimbus/standalone-nimbus)
+ {"supervisor1" supervisor1 "supervisor2" supervisor2}
{"topology1" assignment1 "topology2" assignment2 "topology3" assignment3})]
;; test Cluster constructor
(is (= #{"supervisor1" "supervisor2"}
Please sign in to comment.
Something went wrong with that request. Please try again.