Permalink
Browse files

Merge branch '0.9.0' of github.com:nathanmarz/storm into 0.9.0

  • Loading branch information...
2 parents 33b25c0 + fd3d4c9 commit 81d46181ac200e7a03edbcac852e61a763837e4a Jason Jackson committed Dec 20, 2012
View
@@ -8,6 +8,7 @@
## Unreleased (0.8.2)
+ * Added backtype.storm.scheduler.IsolationScheduler. This lets you run topologies that are completely isolated at the machine level. Configure Nimbus to isolate certain topologies, and how many machines to give to each of those topologies, with the isolation.scheduler.machines config in Nimbus's storm.yaml. Topologies run on the cluster that are not listed there will share whatever remaining machines there are on the cluster after machines are allocated to the listed topologies.
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
* Added report-error! to Clojure DSL
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -36,12 +37,14 @@
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
* Added MultiScheme interface (thanks sritchie)
* Added MockTridentTuple for testing (thanks emblem)
+ * Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
* Bug fix: fixed NPE when emitting during emit method of Aggregator
+ * Bug fix: URLs with periods in them in Storm UI now route correctly
## 0.8.1
View
@@ -12,7 +12,7 @@
:exclusions [log4j/log4j]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
- [compojure "0.6.4"]
+ [compojure "1.1.3"]
[hiccup "0.3.6"]
[ring/ring-jetty-adapter "0.3.11"]
[org.clojure/tools.logging "0.2.3"]
@@ -569,7 +569,7 @@
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
- cluster (Cluster. supervisors topology->scheduler-assignment)
+ cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
@@ -28,14 +28,15 @@
(->> slots
(filter
(fn [[node port]]
- (if-let [supervisor (.getSupervisorById cluster node)]
- (.contains (.getAllPorts supervisor) (int port))
- )))))
+ (if-not (.isBlackListed cluster node)
+ (if-let [supervisor (.getSupervisorById cluster node)]
+ (.contains (.getAllPorts supervisor) (int port))
+ ))))))
(defn -prepare [this conf]
)
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+(defn default-schedule [^Topologies topologies ^Cluster cluster]
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
@@ -54,3 +55,6 @@
[])]]
(.freeSlots cluster bad-slots)
(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
+
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (default-schedule topologies cluster))
@@ -0,0 +1,198 @@
+(ns backtype.storm.scheduler.IsolationScheduler
+ (:use [backtype.storm util config log])
+ (:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
+ (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
+ (:import [backtype.storm.scheduler IScheduler Topologies
+ Cluster TopologyDetails WorkerSlot SchedulerAssignment
+ EvenScheduler ExecutorDetails])
+ (:gen-class
+ :init init
+ :constructors {[] []}
+ :state state
+ :implements [backtype.storm.scheduler.IScheduler]))
+
+(defn -init []
+ [[] (container)])
+
+(defn -prepare [this conf]
+ (container-set! (.state this) conf))
+
+
+(defn- compute-worker-specs "Returns list of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply interleave-all)
+ (partition-fixed (.getNumWorkers details))
+ (map set)))
+
+(defn- compute-worker-specs "Returns mutable set of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply concat)
+ (map vector (repeat-seq (range (.getNumWorkers details))))
+ (group-by first)
+ (map-val #(map second %))
+ vals
+ (map set)
+ (HashSet.)
+ ))
+
+(defn isolated-topologies [conf topologies]
+ (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
+ (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
+ ))
+
+;; map from topology id -> set of sets of executors
+(defn topology-worker-specs [iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (compute-worker-specs t)}))
+ (apply merge)))
+
+(defn machine-distribution [conf ^TopologyDetails topology]
+ (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
+ machines (get name->machines (.getName topology))
+ workers (.getNumWorkers topology)]
+ (-> (integer-divided workers machines)
+ (dissoc 0)
+ (HashMap.)
+ )))
+
+(defn topology-machine-distribution [conf iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (machine-distribution conf t)}))
+ (apply merge)))
+
+(defn host-assignments [^Cluster cluster]
+ (letfn [(to-slot-specs [^SchedulerAssignment ass]
+ (->> ass
+ .getExecutorToSlot
+ reverse-map
+ (map (fn [[slot executors]]
+ [slot (.getTopologyId ass) (set executors)]))))]
+ (->> cluster
+ .getAssignments
+ vals
+ (mapcat to-slot-specs)
+ (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
+ )))
+
+(defn- decrement-distribution! [^Map distribution value]
+ (let [v (-> distribution (get value) dec)]
+ (if (zero? v)
+ (.remove distribution value)
+ (.put distribution value v))))
+
+;; returns list of list of slots, reverse sorted by number of slots
+(defn- host-assignable-slots [^Cluster cluster]
+ (-<> cluster
+ .getAssignableSlots
+ (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
+ (dissoc <> nil)
+ (sort-by #(-> % second count -) <>)
+ (LinkedList. <>)
+ ))
+
+(defn- distribution->sorted-amts [distribution]
+ (->> distribution
+ (mapcat (fn [[val amt]] (repeat amt val)))
+ (sort-by -)
+ ))
+
+(defn- allocated-topologies [topology-worker-specs]
+ (->> topology-worker-specs
+ (filter (fn [[_ worker-specs]] (empty? worker-specs)))
+ (map first)
+ set
+ ))
+
+(defn- leftover-topologies [^Topologies topologies filter-ids-set]
+ (->> topologies
+ .getTopologies
+ (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
+ (map (fn [^TopologyDetails t] {(.getId t) t}))
+ (apply merge)
+ (Topologies.)
+ ))
+
+;; for each isolated topology:
+;; compute even distribution of executors -> workers on the number of workers specified for the topology
+;; compute distribution of workers to machines
+;; determine host -> list of [slot, topology id, executors]
+;; iterate through hosts and: a machine is good if:
+;; 1. only running workers from one isolated topology
+;; 2. all workers running on it match one of the distributions of executors for that topology
+;; 3. matches one of the # of workers
+;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
+;; otherwise unassign all other workers for isolated topologies if assigned
+
+(defn remove-elem-from-set! [^Set aset]
+ (let [elem (-> aset .iterator .next)]
+ (.remove aset elem)
+ elem
+ ))
+
+;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
+;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
+;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
+;; blacklist all machines who had production slots defined
+;; log isolated topologies who weren't able to get enough slots / machines
+;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
+;; set blacklist to what it was initially
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (let [conf (container-get (.state this))
+ orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
+ iso-topologies (isolated-topologies conf (.getTopologies topologies))
+ iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
+ topology-worker-specs (topology-worker-specs iso-topologies)
+ topology-machine-distribution (topology-machine-distribution conf iso-topologies)
+ host-assignments (host-assignments cluster)]
+ (doseq [[host assignments] host-assignments]
+ (let [top-id (-> assignments first second)
+ distribution (get topology-machine-distribution top-id)
+ ^Set worker-specs (get topology-worker-specs top-id)
+ num-workers (count host-assignments)
+ ]
+ (if (and (every? #(= (second %) top-id) assignments)
+ (contains? distribution num-workers)
+ (every? #(contains? worker-specs (nth % 2)) assignments))
+ (do (decrement-distribution! distribution num-workers)
+ (doseq [[_ _ executors] assignments] (.remove worker-specs executors))
+ (.blacklistHost cluster host))
+ (doseq [[slot top-id _] assignments]
+ (when (contains? iso-ids-set top-id)
+ (.freeSlot cluster slot)
+ ))
+ )))
+
+ (let [^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
+ ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
+ (doseq [[top-id worker-specs] topology-worker-specs
+ :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
+ (doseq [amt amts
+ :let [[host host-slots] (.peek sorted-assignable-hosts)]]
+ (when (and host-slots (>= (count host-slots) amt))
+ (.poll sorted-assignable-hosts)
+ (.freeSlots cluster host-slots)
+ (doseq [slot (take amt host-slots)
+ :let [executors-set (remove-elem-from-set! worker-specs)]]
+ (.assign cluster slot top-id executors-set))
+ (.blacklistHost cluster host))
+ )))
+
+ (doseq [[top-id worker-specs] topology-worker-specs]
+ (if-not (empty? worker-specs)
+ (log-warn "Unable to isolate topology " top-id)
+ ))
+
+
+ ;; run default scheduler on iso topologies that didn't have enough slot + non-isolated topologies
+ (-<> topology-worker-specs
+ allocated-topologies
+ (leftover-topologies topologies <>)
+ (DefaultScheduler/default-schedule <> cluster))
+ (.setBlacklistedHosts cluster orig-blacklist)
+ ))
@@ -96,7 +96,7 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
daemon-conf (merge (read-storm-config)
@@ -113,7 +113,7 @@
port-counter (mk-counter)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (nimbus/standalone-nimbus))
+ (if inimbus inimbus (nimbus/standalone-nimbus)))
context (mk-shared-context daemon-conf)
cluster-map {:nimbus nimbus
:port-counter port-counter
@@ -813,3 +813,13 @@
(let [klass (if (string? klass) (Class/forName klass) klass)]
(.newInstance klass)
))
+
+(defmacro -<>
+ ([x] x)
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
@@ -602,6 +602,12 @@
* it is not a production grade zookeeper setup.
*/
public static String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
+
+ /**
+ * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
+ * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
+ */
+ public static String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
Oops, something went wrong.

0 comments on commit 81d4618

Please sign in to comment.