Permalink
Browse files

merge master and fix conflicts

  • Loading branch information...
2 parents 6b5eb55 + 1ae5ce4 commit 8dba94a290b9c2913abf8e2788e95a4d0e007daf @nathanmarz committed Jan 22, 2013
View
@@ -1,5 +1,6 @@
-## Unreleased
+## 0.8.2
+ * Added backtype.storm.scheduler.IsolationScheduler. This lets you run topologies that are completely isolated at the machine level. Configure Nimbus to isolate certain topologies, and how many machines to give to each of those topologies, with the isolation.scheduler.machines config in Nimbus's storm.yaml. Topologies run on the cluster that are not listed there will share whatever remaining machines there are on the cluster after machines are allocated to the listed topologies.
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
* Added report-error! to Clojure DSL
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -28,11 +29,18 @@
* Storm UI displays exception instead of blank page when there's an error rendering the page (thanks Frostman)
* Added MultiScheme interface (thanks sritchie)
* Added MockTridentTuple for testing (thanks emblem)
+ * Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
+ * Number of DRPC server worker threads now customizable (thanks xiaokang)
+ * DRPC server now uses a bounded queue for requests to prevent being overloaded with requests (thanks xiaokang)
+ * Add __hash__ method to all generated Python Thrift objects so that Python code can read Nimbus stats which use Thrift objects as dict keys
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
+ * Bug fix: fixed NPE when emitting during emit method of Aggregator
+ * Bug fix: URLs with periods in them in Storm UI now route correctly
+ * Bug fix: Fix occasional cascading worker crashes due when a worker dies due to not removing connections from connection cache appropriately
## 0.8.1
View
@@ -36,6 +36,8 @@ ui.port: 8080
ui.childopts: "-Xmx768m"
drpc.port: 3772
+drpc.worker.threads: 64
+drpc.queue.size: 128
drpc.invocations.port: 3773
drpc.request.timeout.secs: 600
View
@@ -20,7 +20,7 @@
[com.netflix.curator/curator-framework "1.0.1"]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
- [compojure "0.6.4"]
+ [compojure "1.1.3"]
[hiccup "0.3.6"]
[ring/ring-jetty-adapter "0.3.11"]
[org.clojure/tools.logging "0.2.3"]
@@ -6,7 +6,7 @@
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor])
- (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
+ (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
(:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress])
(:use [backtype.storm bootstrap config log])
@@ -100,6 +100,8 @@
(defn launch-server!
([]
(let [conf (read-storm-config)
+ worker-threads (int (conf DRPC-WORKER-THREADS))
+ queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
@@ -108,6 +110,8 @@
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
+ (.executorService (ThreadPoolExecutor. worker-threads worker-threads
+ 60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler))
))
@@ -566,7 +566,7 @@
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
- cluster (Cluster. supervisors topology->scheduler-assignment)
+ cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
@@ -252,7 +252,7 @@
(.close (get @(:cached-node+port->socket worker) endpoint)))
(apply swap!
(:cached-node+port->socket worker)
- #(HashMap. (dissoc (into {} %1) %&))
+ #(HashMap. (apply dissoc (into {} %1) %&))
remove-connections)
(let [missing-tasks (->> needed-tasks
@@ -28,14 +28,15 @@
(->> slots
(filter
(fn [[node port]]
- (if-let [supervisor (.getSupervisorById cluster node)]
- (.contains (.getAllPorts supervisor) (int port))
- )))))
+ (if-not (.isBlackListed cluster node)
+ (if-let [supervisor (.getSupervisorById cluster node)]
+ (.contains (.getAllPorts supervisor) (int port))
+ ))))))
(defn -prepare [this conf]
)
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+(defn default-schedule [^Topologies topologies ^Cluster cluster]
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
@@ -54,3 +55,6 @@
[])]]
(.freeSlots cluster bad-slots)
(EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
+
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (default-schedule topologies cluster))
@@ -0,0 +1,208 @@
+(ns backtype.storm.scheduler.IsolationScheduler
+ (:use [backtype.storm util config log])
+ (:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
+ (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
+ (:import [backtype.storm.scheduler IScheduler Topologies
+ Cluster TopologyDetails WorkerSlot SchedulerAssignment
+ EvenScheduler ExecutorDetails])
+ (:gen-class
+ :init init
+ :constructors {[] []}
+ :state state
+ :implements [backtype.storm.scheduler.IScheduler]))
+
+(defn -init []
+ [[] (container)])
+
+(defn -prepare [this conf]
+ (container-set! (.state this) conf))
+
+
+(defn- compute-worker-specs "Returns list of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply interleave-all)
+ (partition-fixed (.getNumWorkers details))
+ (map set)))
+
+(defn- compute-worker-specs "Returns mutable set of sets of executors"
+ [^TopologyDetails details]
+ (->> (.getExecutorToComponent details)
+ reverse-map
+ (map second)
+ (apply concat)
+ (map vector (repeat-seq (range (.getNumWorkers details))))
+ (group-by first)
+ (map-val #(map second %))
+ vals
+ (map set)
+ (HashSet.)
+ ))
+
+(defn isolated-topologies [conf topologies]
+ (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
+ (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
+ ))
+
+;; map from topology id -> set of sets of executors
+(defn topology-worker-specs [iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (compute-worker-specs t)}))
+ (apply merge)))
+
+(defn machine-distribution [conf ^TopologyDetails topology]
+ (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
+ machines (get name->machines (.getName topology))
+ workers (.getNumWorkers topology)]
+ (-> (integer-divided workers machines)
+ (dissoc 0)
+ (HashMap.)
+ )))
+
+(defn topology-machine-distribution [conf iso-topologies]
+ (->> iso-topologies
+ (map (fn [t] {(.getId t) (machine-distribution conf t)}))
+ (apply merge)))
+
+(defn host-assignments [^Cluster cluster]
+ (letfn [(to-slot-specs [^SchedulerAssignment ass]
+ (->> ass
+ .getExecutorToSlot
+ reverse-map
+ (map (fn [[slot executors]]
+ [slot (.getTopologyId ass) (set executors)]))))]
+ (->> cluster
+ .getAssignments
+ vals
+ (mapcat to-slot-specs)
+ (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
+ )))
+
+(defn- decrement-distribution! [^Map distribution value]
+ (let [v (-> distribution (get value) dec)]
+ (if (zero? v)
+ (.remove distribution value)
+ (.put distribution value v))))
+
+;; returns list of list of slots, reverse sorted by number of slots
+(defn- host-assignable-slots [^Cluster cluster]
+ (-<> cluster
+ .getAssignableSlots
+ (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
+ (dissoc <> nil)
+ (sort-by #(-> % second count -) <>)
+ shuffle
+ (LinkedList. <>)
+ ))
+
+(defn- host->used-slots [^Cluster cluster]
+ (->> cluster
+ .getUsedSlots
+ (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
+ ))
+
+(defn- distribution->sorted-amts [distribution]
+ (->> distribution
+ (mapcat (fn [[val amt]] (repeat amt val)))
+ (sort-by -)
+ ))
+
+(defn- allocated-topologies [topology-worker-specs]
+ (->> topology-worker-specs
+ (filter (fn [[_ worker-specs]] (empty? worker-specs)))
+ (map first)
+ set
+ ))
+
+(defn- leftover-topologies [^Topologies topologies filter-ids-set]
+ (->> topologies
+ .getTopologies
+ (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
+ (map (fn [^TopologyDetails t] {(.getId t) t}))
+ (apply merge)
+ (Topologies.)
+ ))
+
+;; for each isolated topology:
+;; compute even distribution of executors -> workers on the number of workers specified for the topology
+;; compute distribution of workers to machines
+;; determine host -> list of [slot, topology id, executors]
+;; iterate through hosts and: a machine is good if:
+;; 1. only running workers from one isolated topology
+;; 2. all workers running on it match one of the distributions of executors for that topology
+;; 3. matches one of the # of workers
+;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
+;; otherwise unassign all other workers for isolated topologies if assigned
+
+(defn remove-elem-from-set! [^Set aset]
+ (let [elem (-> aset .iterator .next)]
+ (.remove aset elem)
+ elem
+ ))
+
+;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
+;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
+;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
+;; blacklist all machines who had production slots defined
+;; log isolated topologies who weren't able to get enough slots / machines
+;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
+;; set blacklist to what it was initially
+(defn -schedule [this ^Topologies topologies ^Cluster cluster]
+ (let [conf (container-get (.state this))
+ orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
+ iso-topologies (isolated-topologies conf (.getTopologies topologies))
+ iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
+ topology-worker-specs (topology-worker-specs iso-topologies)
+ topology-machine-distribution (topology-machine-distribution conf iso-topologies)
+ host-assignments (host-assignments cluster)]
+ (doseq [[host assignments] host-assignments]
+ (let [top-id (-> assignments first second)
+ distribution (get topology-machine-distribution top-id)
+ ^Set worker-specs (get topology-worker-specs top-id)
+ num-workers (count assignments)
+ ]
+ (if (and (contains? iso-ids-set top-id)
+ (every? #(= (second %) top-id) assignments)
+ (contains? distribution num-workers)
+ (every? #(contains? worker-specs (nth % 2)) assignments))
+ (do (decrement-distribution! distribution num-workers)
+ (doseq [[_ _ executors] assignments] (.remove worker-specs executors))
+ (.blacklistHost cluster host))
+ (doseq [[slot top-id _] assignments]
+ (when (contains? iso-ids-set top-id)
+ (.freeSlot cluster slot)
+ ))
+ )))
+
+ (let [host->used-slots (host->used-slots cluster)
+ ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
+ ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
+ (doseq [[top-id worker-specs] topology-worker-specs
+ :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
+ (doseq [amt amts
+ :let [[host host-slots] (.peek sorted-assignable-hosts)]]
+ (when (and host-slots (>= (count host-slots) amt))
+ (.poll sorted-assignable-hosts)
+ (.freeSlots cluster (get host->used-slots host))
+ (doseq [slot (take amt host-slots)
+ :let [executors-set (remove-elem-from-set! worker-specs)]]
+ (.assign cluster slot top-id executors-set))
+ (.blacklistHost cluster host))
+ )))
+
+ (let [failed-iso-topologies (->> topology-worker-specs
+ (mapcat (fn [[top-id worker-specs]]
+ (if-not (empty? worker-specs) [top-id])
+ )))]
+ (if (empty? failed-iso-topologies)
+ ;; run default scheduler on non-isolated topologies
+ (-<> topology-worker-specs
+ allocated-topologies
+ (leftover-topologies topologies <>)
+ (DefaultScheduler/default-schedule <> cluster))
+ (log-warn "Unable to isolate topologies " (pr-str failed-iso-topologies) ". Will wait for enough resources for isolated topologies before allocating any other resources.")
+ ))
+ (.setBlacklistedHosts cluster orig-blacklist)
+ ))
@@ -96,7 +96,7 @@
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {}]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
nimbus-tmp (local-temp-path)
@@ -114,7 +114,7 @@
port-counter (mk-counter)
nimbus (nimbus/service-handler
(assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
- (nimbus/standalone-nimbus))
+ (if inimbus inimbus (nimbus/standalone-nimbus)))
context (mk-shared-context daemon-conf)
cluster-map {:nimbus nimbus
:port-counter port-counter
@@ -824,3 +824,12 @@
(str left "/" right)
(str left right)))))
+(defmacro -<>
+ ([x] x)
+ ([x form] (if (seq? form)
+ (with-meta
+ (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+ (concat begin [x] end))
+ (meta form))
+ (list form x)))
+ ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
@@ -227,6 +227,16 @@
public static String DRPC_PORT = "drpc.port";
/**
+ * DRPC thrift server worker threads
+ */
+ public static String DRPC_WORKER_THREADS = "drpc.worker.threads";
+
+ /**
+ * DRPC thrift server queue size
+ */
+ public static String DRPC_QUEUE_SIZE = "drpc.queue.size";
+
+ /**
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
public static String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
@@ -594,6 +604,12 @@
* it is not a production grade zookeeper setup.
*/
public static String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
+
+ /**
+ * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler
+ * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler.
+ */
+ public static String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
public static void setDebug(Map conf, boolean isOn) {
conf.put(Config.TOPOLOGY_DEBUG, isOn);
Oops, something went wrong.

0 comments on commit 8dba94a

Please sign in to comment.