Permalink
Browse files

fix conflicts

  • Loading branch information...
nathanmarz committed Oct 24, 2012
2 parents 0c559cd + 4870bd9 commit 810e6769565827d57e5669025cd9ad1e890236f6
View
@@ -8,7 +8,6 @@
## Unreleased (0.8.2)
- * Added high water mark to ZeroMQ sockets (defaults to 10000) configurable with zmq.hwm
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
* Added report-error! to Clojure DSL
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
@@ -23,8 +22,10 @@
* Added ITupleCollection interface for TridentState's and TupleCollectionGet QueryFunction for getting the full contents of a state. MemoryMapState and LRUMemoryMapState implement this
* Can now submit a topology in inactive state. Storm will wait to call open/prepare on the spouts/bolts until it is first activated.
* Can now activate, deactive, rebalance, and kill topologies from the Storm UI (thanks Frostman)
+ * Can now use --config option to override which yaml file from ~/.storm to use for the config (thanks tjun)
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
-
+ * Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
+
## 0.8.1
* Exposed Storm's unit testing facilities via the backtype.storm.Testing class. Notable functions are Testing/withLocalCluster and Testing/completeTopology (thanks xumingming)
View
@@ -61,6 +61,7 @@ You must not remove this notice, or any other, from this software.
* Barry Hart ([@barrywhart](https://github.com/barrywhart))
* Sergey Lukjanov ([@Frostman](https://github.com/Frostman))
* Ross Feinstein ([@rnfein](https://github.com/rnfein))
+* Junichiro Takagi ([@tjun](https://github.com/tjun))
## Acknowledgements
View
@@ -25,6 +25,7 @@ else:
CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2])
CONFIG_OPTS = []
+CONFFILE = ""
def get_config_opts():
global CONFIG_OPTS
@@ -52,8 +53,9 @@ def get_classpath(extrajars):
return normclasspath(":".join(ret))
def confvalue(name, extrapaths):
+ global CONFFILE
command = [
- "java", "-client", get_config_opts(), "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
+ "java", "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
]
p = sub.Popen(command, stdout=sub.PIPE)
output, errors = p.communicate()
@@ -104,10 +106,12 @@ def parse_args(string):
return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
+ global CONFFILE
all_args = [
"java", jvmtype, get_config_opts(),
"-Dstorm.home=" + STORM_DIR,
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
+ "-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrajars),
] + jvmopts + [klass] + list(args)
print "Running: " + " ".join(all_args)
@@ -385,6 +389,9 @@ def parse_config_opts(args):
token = curr.pop()
if token == "-c":
config_list.append(curr.pop())
+ elif token == "--config":
+ global CONFFILE
+ CONFFILE = curr.pop()
else:
args_list.append(token)
@@ -399,7 +406,7 @@ def main():
parse_config(config_list)
COMMAND = args[0]
ARGS = args[1:]
- (COMMANDS.get(COMMAND, "help"))(*ARGS)
+ (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
if __name__ == "__main__":
main()
View
@@ -50,7 +50,7 @@ supervisor.slots.ports:
- 6701
- 6702
- 6703
-supervisor.childopts: "-Xmx1024m"
+supervisor.childopts: "-Xmx256m"
#how long supervisor will wait to ensure that a worker process is started
supervisor.worker.start.timeout.secs: 120
#how long between heartbeats until supervisor considers that worker dead and tries to restart it
@@ -70,7 +70,7 @@ task.refresh.poll.secs: 10
zmq.threads: 1
zmq.linger.millis: 5000
-zmq.hwm: 10000
+zmq.hwm: 0
### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
@@ -33,7 +33,7 @@
;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
-(defrecord SupervisorInfo [time-secs hostname meta scheduler-meta uptime-secs])
+(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
(defprotocol DaemonCommon
(waiting? [this]))
@@ -256,30 +256,23 @@
supervisor-ids))
)))
-(defn- available-slots
- [nimbus topologies-missing-assignments topologies]
+(defn- all-scheduling-slots
+ [nimbus topologies missing-assignment-topologies]
(let [storm-cluster-state (:storm-cluster-state nimbus)
^INimbus inimbus (:inimbus nimbus)
- supervisor-ids (.supervisors storm-cluster-state nil)
supervisor-infos (all-supervisor-info storm-cluster-state nil)
- existing-slots (assigned-slots storm-cluster-state)
- supervisor-details (for [[id info] supervisor-infos]
+ supervisor-details (dofor [[id info] supervisor-infos]
(SupervisorDetails. id (:meta info)))
- worker-slots (mapcat (fn [[id ports]]
- (for [p ports]
- (WorkerSlot. id p)))
- existing-slots)
- ret (.availableSlots inimbus
+ ret (.allSlotsAvailableForScheduling inimbus
supervisor-details
- worker-slots
topologies
- topologies-missing-assignments
+ (set missing-assignment-topologies)
)
]
- (for [^WorkerSlot slot ret]
+ (dofor [^WorkerSlot slot ret]
[(.getNodeId slot) (.getPort slot)]
)))
@@ -471,19 +464,18 @@
{})))]]
{tid (SchedulerAssignmentImpl. tid executor->slot)})))
-(defn- read-all-supervisor-details [nimbus all-slots available-slots supervisor->dead-ports]
+(defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports]
"return a map: {topology-id SupervisorDetails}"
(let [storm-cluster-state (:storm-cluster-state nimbus)
supervisor-infos (all-supervisor-info storm-cluster-state)
- nonexistent-supervisor-slots (apply dissoc available-slots (keys supervisor-infos))
+ nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos))
all-supervisor-details (into {} (for [[sid supervisor-info] supervisor-infos
:let [hostname (:hostname supervisor-info)
scheduler-meta (:scheduler-meta supervisor-info)
dead-ports (supervisor->dead-ports sid)
;; hide the dead-ports from the all-ports
;; these dead-ports can be reused in next round of assignments
- all-ports (-> sid
- all-slots
+ all-ports (-> (get all-scheduling-slots sid)
(set/difference dead-ports)
((fn [ports] (map int ports))))
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
@@ -526,6 +518,11 @@
;; (apply merge-with set/union)
;; ))
+(defn num-used-workers [^SchedulerAssignment scheduler-assignment]
+ (if scheduler-assignment
+ (count (.getSlots scheduler-assignment))
+ 0 ))
+
;; public so it can be mocked out
(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
(let [conf (:conf nimbus)
@@ -545,23 +542,26 @@
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
existing-assignments
topology->alive-executors)
-
+
missing-assignment-topologies (->> topologies
.getTopologies
(map (memfn getId))
(filter (fn [t]
(let [alle (get topology->executors t)
alivee (get topology->alive-executors t)]
- (or (empty? alle) (not= alle alivee))
+ (or (empty? alle)
+ (not= alle alivee)
+ (< (-> topology->scheduler-assignment
+ (get t)
+ num-used-workers )
+ (-> topologies (.getById t) .getNumWorkers)
+ ))
))))
- available-slots (->> topologies
- (available-slots nimbus missing-assignment-topologies)
- (map (fn [[node-id port]] {node-id #{port}}))
- (apply merge-with set/union))
- assigned-slots (assigned-slots storm-cluster-state)
- all-slots (merge-with set/union available-slots assigned-slots)
-
- supervisors (read-all-supervisor-details nimbus all-slots available-slots supervisor->dead-ports)
+ all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
+ (map (fn [[node-id port]] {node-id #{port}}))
+ (apply merge-with set/union))
+
+ supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
cluster (Cluster. supervisors topology->scheduler-assignment)
;; call scheduler.schedule to schedule all the topologies
@@ -611,6 +611,9 @@
[id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil)]))
(into {}))))
+(defn- to-worker-slot [[node port]]
+ (WorkerSlot. node port))
+
;; get existing assignment (just the executor->node+port map) -> default to {}
;; filter out ones which have a executor timeout
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
@@ -679,13 +682,14 @@
(log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
(.set-assignment! storm-cluster-state topology-id assignment)
)))
- (->> (dofor [[topology-id assignment] new-assignments
- :let [existing-assignment (get existing-assignments topology-id)]]
- (newly-added-slots existing-assignment assignment))
- (apply concat)
- (map (fn [[id port]] (WorkerSlot. id port)))
- (.assignSlots inimbus topologies)
- )))
+ (->> new-assignments
+ (map (fn [[topology-id assignment]]
+ (let [existing-assignment (get existing-assignments topology-id)]
+ [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
+ )))
+ (into {})
+ (.assignSlots inimbus topologies))
+ ))
(defn- start-storm [nimbus storm-name storm-id topology-initial-status]
{:pre [(#{:active :inactive} topology-initial-status)]}
@@ -1013,17 +1017,17 @@
(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus)
- assigned (assigned-slots storm-cluster-state)
supervisor-infos (all-supervisor-info storm-cluster-state)
;; TODO: need to get the port info about supervisors...
;; in standalone just look at metadata, otherwise just say N/A?
supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info))
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
]
(SupervisorSummary. (:hostname info)
(:uptime-secs info)
(count ports)
- (count (assigned id)))
+ (count (:used-ports info))
+ id )
))
nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state)
@@ -1136,15 +1140,13 @@
(reify INimbus
(prepare [this conf local-dir]
)
- (availableSlots [this supervisors used-slots topologies topologies-missing-assignments]
- (let [all-slots (->> supervisors
- (mapcat (fn [^SupervisorDetails s]
- (for [p (.getMeta s)]
- (WorkerSlot. (.getId s) p))))
- set)]
- (set/difference all-slots (set used-slots))
- ))
- (assignSlots [this topologies slots]
+ (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
+ (->> supervisors
+ (mapcat (fn [^SupervisorDetails s]
+ (for [p (.getMeta s)]
+ (WorkerSlot. (.getId s) p))))
+ set ))
+ (assignSlots [this topology slots]
)
(getForcedScheduler [this]
nil )
@@ -21,9 +21,9 @@
)
-(defn- read-my-executors [storm-cluster-state storm-id supervisor-id callback]
+(defn- read-my-executors [storm-cluster-state storm-id assignment-id callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
- my-executors (filter (fn [[_ [node _]]] (= node supervisor-id))
+ my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
(:executor->node+port assignment))
port-executors (apply merge-with
concat
@@ -39,13 +39,13 @@
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors"
- [storm-cluster-state supervisor-id callback]
+ [storm-cluster-state assignment-id callback]
(let [storm-ids (.assignments storm-cluster-state callback)]
(apply merge-with
(fn [& ignored]
(throw (RuntimeException.
"Should not have multiple topologies assigned to one port")))
- (dofor [sid storm-ids] (read-my-executors storm-cluster-state sid supervisor-id callback))
+ (dofor [sid storm-ids] (read-my-executors storm-cluster-state sid assignment-id callback))
)))
(defn- read-storm-code-locations
@@ -174,10 +174,12 @@
:worker-thread-pids-atom (atom {})
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:local-state (supervisor-state conf)
- :supervisor-id (.getId isupervisor)
+ :supervisor-id (.getSupervisorId isupervisor)
+ :assignment-id (.getAssignmentId isupervisor)
:my-hostname (if (contains? conf STORM-LOCAL-HOSTNAME)
(conf STORM-LOCAL-HOSTNAME)
(local-hostname))
+ :curr-assignment (atom nil) ;; used for reporting used ports when heartbeating
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
@@ -267,7 +269,7 @@
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments
storm-cluster-state
- (:supervisor-id supervisor)
+ (:assignment-id supervisor)
sync-callback)
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
@@ -306,6 +308,7 @@
(.put local-state
LS-LOCAL-ASSIGNMENTS
new-assignment)
+ (reset! (:curr-assignment supervisor) new-assignment)
;; remove any downloaded code that's no longer assigned or active
;; important that this happens after setting the local assignment so that
;; synchronize-supervisor doesn't try to launch workers for which the
@@ -334,6 +337,9 @@
(:supervisor-id supervisor)
(SupervisorInfo. (current-time-secs)
(:my-hostname supervisor)
+ (:assignment-id supervisor)
+ (keys @(:curr-assignment supervisor))
+ ;; used ports
(.getMetadata isupervisor)
(conf SUPERVISOR-SCHEDULER-META)
((:uptime supervisor)))))]
@@ -417,7 +423,7 @@
" -Dstorm.home=" (System/getProperty "storm.home")
" -Dlogback.configurationFile=logback/cluster.xml"
" -cp " classpath " backtype.storm.daemon.worker "
- (java.net.URLEncoder/encode storm-id) " " (:supervisor-id supervisor)
+ (java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
" " port " " worker-id)]
(log-message "Launching worker with command: " command)
(launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
@@ -458,7 +464,7 @@
worker (worker/mk-worker conf
(:shared-context supervisor)
storm-id
- (:supervisor-id supervisor)
+ (:assignment-id supervisor)
port
worker-id)]
(psim/register-process pid worker)
@@ -487,7 +493,9 @@
true)
(getMetadata [this]
(doall (map int (get @conf-atom SUPERVISOR-SLOTS-PORTS))))
- (getId [this]
+ (getSupervisorId [this]
+ @id-atom)
+ (getAssignmentId [this]
@id-atom)
(killedWorker [this port]
)
Oops, something went wrong.

0 comments on commit 810e676

Please sign in to comment.