Permalink
Browse files

Merge branch 'master' into 0.6.1

  • Loading branch information...
nathanmarz committed Dec 25, 2011
2 parents 8452c4d + e455ceb commit 1546cd7540eb46f2ea03cd4e68f496c2e50fec8f
View
@@ -19,6 +19,8 @@ mkdir $DIR/lib
cp storm*jar $DIR/
cp lib/*.jar $DIR/lib
+echo $RELEASE > $DIR/RELEASE
+
cp -R log4j $DIR/
mkdir $DIR/logs
View
@@ -23,6 +23,13 @@ else:
CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.abspath( __file__ ).split("/")[:-2])
+if not os.path.exists(STORM_DIR + "/RELEASE"):
+ print "******************************************"
+ print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code."
+ print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads"
+ print "******************************************"
+ sys.exit(1)
+
def get_jars_full(adir):
files = os.listdir(adir)
ret = []
@@ -122,7 +129,8 @@ def print_commands():
global COMMANDS
cmds = COMMANDS.keys()
cmds.sort()
- print "Commands:\n\t", reduce(lambda x,y: x + ', ' + y, cmds[1:], cmds[0])
+ print "Commands:\n\t", reduce(lambda x,y: x + '\n\t' + y, cmds[1:], cmds[0])
+ print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n"
def print_usage(msg=None):
if msg != None:
@@ -95,12 +95,6 @@
nil
))
-;; there's a minor problem where rebalancing is scheduled over and over until it finally happens
-;; can fix this by either:
-;; 1. detecting whether it's scheduled or not...
-;; 2. not using monitor event, but rather some sort of "on startup" event
-;; 3. generating a "rebalance id" and only rebalancing if current status has that id
-
(defn state-transitions [nimbus storm-id status]
{:active {:monitor (reassign-transition nimbus storm-id)
:inactivate :inactive
@@ -114,7 +108,7 @@
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
}
- :killed {:monitor (fn [] (delay-event nimbus
+ :killed {:startup (fn [] (delay-event nimbus
storm-id
(:kill-time-secs status)
:remove))
@@ -125,7 +119,7 @@
storm-id)
nil)
}
- :rebalancing {:monitor (fn [] (delay-event nimbus
+ :rebalancing {:startup (fn [] (delay-event nimbus
storm-id
(:delay-secs status)
:do-rebalance))
@@ -136,14 +130,15 @@
}})
(defn topology-status [nimbus storm-id]
- (:status (.storm-base (:storm-cluster-state nimbus) storm-id nil)))
+ (-> nimbus :storm-cluster-state (.storm-base storm-id nil) :status))
(defn transition!
([nimbus storm-id event]
(transition! nimbus storm-id event false))
([nimbus storm-id event error-on-no-transition?]
(locking (:submit-lock nimbus)
- (let [[event & event-args] (if (keyword? event) [event] event)
+ (let [system-events #{:startup :monitor}
+ [event & event-args] (if (keyword? event) [event] event)
status (topology-status nimbus storm-id)]
;; handles the case where event was scheduled but topology has been removed
(if-not status
@@ -156,8 +151,10 @@
" storm-id: " storm-id)]
(if error-on-no-transition?
(throw-runtime msg)
- (do (log-message msg) nil)
- ))))
+ (do (when-not (contains? system-events event)
+ (log-message msg))
+ nil))
+ )))
transition (-> (state-transitions nimbus storm-id status)
(get (:type status))
(get-event event))
@@ -619,16 +616,27 @@
(swap! (:task-heartbeats-cache nimbus) dissoc id))
))))
+(defn cleanup-corrupt-topologies! [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ code-ids (set (code-ids (:conf nimbus)))
+ active-topologies (set (.active-storms storm-cluster-state))
+ corrupt-topologies (set/difference active-topologies code-ids)]
+ (doseq [corrupt corrupt-topologies]
+ (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
+ (.remove-storm! storm-cluster-state corrupt)
+ )))
(defserverfn service-handler [conf]
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf)]
+ (cleanup-corrupt-topologies! nimbus)
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+ (transition! nimbus storm-id :startup))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
- (doseq [storm-id (.active-storms
- (:storm-cluster-state nimbus))]
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :monitor))
(do-cleanup nimbus)
))
@@ -200,6 +200,7 @@
`(with-simulated-time
(with-local-cluster ~@args)))
+;; TODO: should take in a port symbol and find available port automatically
(defmacro with-inprocess-zookeeper [port & body]
`(with-local-tmp [tmp#]
(let [zks# (zk/mk-inprocess-zookeeper tmp# ~port)]
@@ -76,8 +76,8 @@
(defn cancel-timer [timer]
(check-active! timer)
- (reset! (:active timer) false)
(locking (:lock timer)
+ (reset! (:active timer) false)
(.interrupt (:timer-thread timer)))
(.acquire (:cancel-notifier timer)))
@@ -2,6 +2,7 @@
(:use [clojure test])
(:use [clojure.contrib.def :only [defnk]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
+
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
@@ -395,6 +396,32 @@
(check-num-nodes slot-tasks 3)
)))
+(deftest test-cleans-corrupt
+ (let [zk-port (available-port 2181)]
+ (with-inprocess-zookeeper zk-port
+ (with-local-tmp [nimbus-dir]
+ (letlocals
+ (bind conf (merge (read-storm-config)
+ {STORM-CLUSTER-MODE "local"
+ STORM-ZOOKEEPER-PORT zk-port
+ STORM-LOCAL-DIR nimbus-dir}))
+ (bind cluster-state (cluster/mk-storm-cluster-state conf))
+ (bind nimbus (nimbus/service-handler conf))
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (submit-local-topology nimbus "t1" {} topology)
+ (submit-local-topology nimbus "t2" {} topology)
+ (bind storm-id1 (get-storm-id cluster-state "t1"))
+ (bind storm-id2 (get-storm-id cluster-state "t2"))
+ (.shutdown nimbus)
+ (rmr (master-stormdist-root conf storm-id1))
+ (bind nimbus (nimbus/service-handler conf))
+ (is ( = #{storm-id2} (set (.active-storms cluster-state))))
+ (.shutdown nimbus)
+ (.disconnect cluster-state)
+ )))))
+
(deftest test-no-overlapping-slots
;; test that same node+port never appears across 2 assignments
)

0 comments on commit 1546cd7

Please sign in to comment.