Permalink
Browse files

have nimbus detect corrupt topologies (in zookeeper but not local dir…

…) and cleanup automatically
  • Loading branch information...
1 parent e1eb16b commit e455ceb45620b431ab6ea1580c6ac270a3f61c2c @nathanmarz committed Dec 25, 2011
@@ -616,10 +616,20 @@
(swap! (:task-heartbeats-cache nimbus) dissoc id))
))))
+(defn cleanup-corrupt-topologies! [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ code-ids (set (code-ids (:conf nimbus)))
+ active-topologies (set (.active-storms storm-cluster-state))
+ corrupt-topologies (set/difference active-topologies code-ids)]
+ (doseq [corrupt corrupt-topologies]
+ (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
+ (.remove-storm! storm-cluster-state corrupt)
+ )))
(defserverfn service-handler [conf]
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf)]
+ (cleanup-corrupt-topologies! nimbus)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup))
(schedule-recurring (:timer nimbus)
@@ -200,6 +200,7 @@
`(with-simulated-time
(with-local-cluster ~@args)))
+;; TODO: should take in a port symbol and find available port automatically
(defmacro with-inprocess-zookeeper [port & body]
`(with-local-tmp [tmp#]
(let [zks# (zk/mk-inprocess-zookeeper tmp# ~port)]
@@ -2,6 +2,7 @@
(:use [clojure test])
(:use [clojure.contrib.def :only [defnk]])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
+
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
@@ -395,6 +396,32 @@
(check-num-nodes slot-tasks 3)
)))
+(deftest test-cleans-corrupt
+ (let [zk-port (available-port 2181)]
+ (with-inprocess-zookeeper zk-port
+ (with-local-tmp [nimbus-dir]
+ (letlocals
+ (bind conf (merge (read-storm-config)
+ {STORM-CLUSTER-MODE "local"
+ STORM-ZOOKEEPER-PORT zk-port
+ STORM-LOCAL-DIR nimbus-dir}))
+ (bind cluster-state (cluster/mk-storm-cluster-state conf))
+ (bind nimbus (nimbus/service-handler conf))
+ (bind topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
+ {}))
+ (submit-local-topology nimbus "t1" {} topology)
+ (submit-local-topology nimbus "t2" {} topology)
+ (bind storm-id1 (get-storm-id cluster-state "t1"))
+ (bind storm-id2 (get-storm-id cluster-state "t2"))
+ (.shutdown nimbus)
+ (rmr (master-stormdist-root conf storm-id1))
+ (bind nimbus (nimbus/service-handler conf))
+ (is ( = #{storm-id2} (set (.active-storms cluster-state))))
+ (.shutdown nimbus)
+ (.disconnect cluster-state)
+ )))))
+
(deftest test-no-overlapping-slots
;; test that same node+port never appears across 2 assignments
)

0 comments on commit e455ceb

Please sign in to comment.