Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

unfinished refactoring to a state machine approach for nimbus

  • Loading branch information...
commit 7371926834a75cb1dacde371cd8f3aa0702dcd0a 1 parent 5c3bf86
@nathanmarz authored
View
5 src/clj/backtype/storm/cluster.clj
@@ -315,11 +315,10 @@
)
(remove-storm! [this storm-id]
- (remove-storm-base! this storm-id)
- ;; rmr the task related info. must remove assignment last
(delete-node cluster-state (storm-task-root storm-id))
(delete-node cluster-state (assignment-path storm-id))
- )
+ ;; TODO: this might be a problem... originally this was the first thing
+ (remove-storm-base! this storm-id))
(report-task-error [this storm-id task-id error]
(let [path (taskerror-path storm-id task-id)
View
362 src/clj/backtype/storm/daemon/nimbus.clj
@@ -11,14 +11,6 @@
(bootstrap)
-;; active -> check reassign every X seconds (or do so when something fails)
-;; inactive -> don't do anything
-;; rebalance -> sleep for X seconds and then
-;; what if a topology is killed while rebalancing? need to clear out its scheduled tasks.
-;; have another thread that every X seconds schedules rebalance for all active topologies
-
-
-(defmulti setup-jar cluster-mode)
(defn file-cache-map [conf]
(TimeCacheMap.
@@ -32,23 +24,173 @@
(defn nimbus-data [conf]
{:conf conf
:submitted-count (atom 0)
- :active (atom true)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:task-heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
- :event-manager (event/event-manager false)
- :cleanup-manager (event/event-manager false)
+ :timer (mk-timer :kill-fn (fn [t]
+ (log-error t "Error when processing event " r)
+ (halt-process! 20 "Error when processing an event")
+ ))
})
-(defn nimbus-managers [nimbus]
- ((juxt :event-manager :cleanup-manager) nimbus))
-
(defn inbox [nimbus]
(master-inbox (:conf nimbus)))
+(defn- read-storm-conf [conf storm-id]
+ (let [stormroot (master-stormdist-root conf storm-id)]
+ (merge conf
+ (Utils/deserialize
+ (FileUtils/readFileToByteArray
+ (File. (master-stormconf-path stormroot))
+ )))))
+
+(defn set-topology-status! [nimbus storm-id status]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)]
+ (.update-storm! storm-cluster-state
+ storm-id
+ {:status status})
+ (log-message "Updated " storm-name " with status " status)
+ ))
+
+(declare reassign-topology)
+(declare delay-event)
+
+(defn kill-transition [nimbus storm-id]
+ (fn [kill-time]
+ (let [delay (if kill-time
+ kill-time
+ (get (read-storm-conf (:conf nimbus) storm-id)
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
+ (delay-event nimbus
+ storm-id
+ delay
+ :remove)
+ {:type :killed
+ :kill-time-secs delay})
+ ))
+
+(defn rebalance-transition [nimbus storm-id status]
+ (fn [time]
+ (let [delay (if time
+ time
+ (get (read-storm-conf (:conf nimbus) storm-id)
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
+ (delay-event nimbus
+ storm-id
+ delay
+ :do-rebalance)
+ {:type :rebalancing
+ :delay-secs delay
+ :old-status status
+ })))
+
+(defn reassign-transition [nimbus storm-id]
+ (fn []
+ (reassign-topology nimbus storm-id)
+ nil
+ ))
+
+;; TODO:
+;; have another thread that every X seconds schedules rebalance for all active topologies
+;; how does cleanup work?
+;; -- schedule zookeeper cleanup to be task timeout seconds in future
+;; -- cleanup code if its not active (check every minute)
+(defn state-transitions [nimbus storm-id status]
+ {:active {:monitor (reassign-transition nimbus storm-id)
+ :inactivate :inactive
+ :activate nil
+ :rebalance (rebalance-transition nimbus storm-id status)
+ :kill (kill-transition nimbus storm-id)
+ }
+ :inactive {:monitor (reassign-transition nimbus storm-id)
+ :activate :active
+ :inactivate nil
+ :rebalance (rebalance-transition nimbus storm-id status)
+ :kill (kill-transition nimbus storm-id)
+ }
+ :killed {:monitor (fn [] (delay-event nimbus
+ storm-id
+ (:kill-time-secs status)
+ :remove))
+ :kill (kill-transition nimbus storm-id)
+ :remove (fn []
+ (log-message "Killing topology: " storm-id)
+ (.remove-storm! (:storm-cluster-state nimbus)
+ storm-id)
+ 'removed )
+ }
+ :rebalancing {:monitor (fn [] (delay-event nimbus
+ storm-id
+ (:delay-secs status)
+ :do-rebalance))
+ :kill (kill-transition nimbus storm-id)
+ :do-rebalance (fn []
+ ;; TODO: reassign from scratch
+ (:old-status status))
+ }})
+
+(defn topology-status [nimbus storm-id]
+ (:status (.storm-base (:storm-cluster-state nimbus) storm-id nil)))
+
+(defn transition!
+ ([nimbus storm-id event]
+ (transition! nimbus storm-id event false))
+ ([nimbus storm-id event error-on-no-transition?]
+ (locking (:submit-lock nimbus)
+ (let [[event & event-args] (if (keyword? event) [event []] event)
+ status (topology-status nimbus storm-id)]
+ (if status ; handles the case where event was scheduled but has been removed
+ (let [get-event (fn [m e]
+ (if (contains? m e)
+ (m e)
+ (when error-on-no-transition?
+ (throw-runtime "No transition for event: " event
+ ", status: " status,
+ "storm-id: " storm-id)
+ )))
+ transition (-> (state-transitions nimbus storm-id status)
+ (:type status)
+ (get-event event))
+ transition (if (or (nil? transition)
+ (keyword? transition))
+ (fn [] transition)
+ transition)
+ new-status (apply transition event-args)
+ new-status (cond (nil? new-status) status
+ (keyword? new-status) {:type new-status}
+ true new-status)]
+ (when-not (= 'removed new-status)
+ (set-topology-status! nimbus storm-id new-status)))))
+ )))
+
+(defn transition-name! [nimbus storm-name event & args]
+ (let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name)]
+ (when-not storm-id
+ (throw (NotAliveException. storm-name)))
+ (apply transition! nimbus storm-id event args)))
+
+(defn delay-event [nimbus storm-id delay-secs event]
+ (log-message "Delaying event " event " for " delay-secs " secs for " storm-id)
+ (schedule (:timer nimbus)
+ delay-secs
+ #(transition! nimbus storm-id event false)
+ ))
+
+;; active -> reassign in X secs
+
+;; killed -> wait kill time then shutdown
+;; active -> reassign in X secs
+;; inactive -> nothing
+;; rebalance -> wait X seconds then rebalance
+;; swap... (need to handle kill during swap, etc.)
+;; event transitions are delayed by timer... anything else that comes through (e.g. a kill) override the transition? or just disable other transitions during the transition?
+
+
+(defmulti setup-jar cluster-mode)
+
;; status types
;; -- killed (:kill-time-secs)
;; -- active
@@ -173,14 +315,6 @@
))
-(defn- read-storm-conf [conf storm-id]
- (let [stormroot (master-stormdist-root conf storm-id)]
- (merge conf
- (Utils/deserialize
- (FileUtils/readFileToByteArray
- (File. (master-stormconf-path stormroot))
- )))))
-
(defn- read-storm-topology [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(Utils/deserialize
@@ -192,6 +326,7 @@
(defn task-dead? [conf storm-cluster-state storm-id task-id]
(let [info (.task-heartbeat storm-cluster-state storm-id task-id)]
(or (not info)
+ ;; TODO: this is not correct... times could be different
(> (time-delta (:time-secs info))
(conf NIMBUS-TASK-TIMEOUT-SECS)))
))
@@ -366,10 +501,24 @@
))
))
+(defn reassign-topology [nimbus storm-id]
+ (let [conf (:conf nimbus)
+ storm-cluster-state (:storm-cluster-state nimbus)]
+ (when (conf NIMBUS-REASSIGN)
+ (mk-assignments conf
+ storm-id
+ storm-cluster-state
+ (fn [& ignored] (transition! nimbus storm-id :monitor))
+ (:task-heartbeats-cache nimbus))
+ )))
+
(defn- start-storm [storm-name storm-cluster-state storm-id]
(log-message "Activating " storm-name ": " storm-id)
- (.activate-storm! storm-cluster-state storm-id (StormBase. storm-name (current-time-secs) {:type :active}))
- )
+ (.activate-storm! storm-cluster-state
+ storm-id
+ (StormBase. storm-name
+ (current-time-secs)
+ {:type :active})))
;; Master:
;; job submit:
@@ -390,21 +539,10 @@
(throw (AlreadyAliveException. (str storm-name " is already active"))))
))
-(defn topologies-to-kill [storm-cluster-state]
- (let [assigned-ids (set (.assignments storm-cluster-state nil))]
- (reduce (fn [ret id]
- (let [status (:status (.storm-base storm-cluster-state id nil))]
- (if (or (nil? status) (= :killed (:type status)))
- (assoc ret id (:kill-time-secs status))
- ret)
- ))
- {}
- assigned-ids)
- ))
-
(defn cleanup-storm-ids [conf storm-cluster-state]
(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
error-ids (set (.task-error-storms storm-cluster-state))
+ ;; TODO: should probably use storm-base to determine this?
assigned-ids (set (.assignments storm-cluster-state nil))
storm-ids (set/difference (set/union heartbeat-ids error-ids) assigned-ids)]
(filter
@@ -453,91 +591,45 @@
(assoc storm-conf TOPOLOGY-KRYO-REGISTER sers)
))
-(defn set-topology-status! [nimbus storm-name status]
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)]
- (when-not storm-id
- (throw (NotAliveException. storm-name)))
- (locking (:submit-lock nimbus)
- (.update-storm! storm-cluster-state
- storm-id
- {:status status}))
- (log-message "Updated " storm-name " with status " status)
- ))
+;; TODO: need to put this somewhere else
+;; TODO: removing code locally should be done separately (since topology that doesn't start will still have code)
+;;
+;; technically a supervisor could still think there's an assignment and try to d/l
+;; this will cause supervisor to go down and come back up... eventually it should sync
+;; (rmr (master-stormdist-root conf id))
+;;
+;; (.remove-storm! storm-cluster-state id)
+
+
+;; TODO: need to rethink this whole thing
+(defn do-cleanup [nimbus]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ conf (:conf nimbus)
+ submit-lock (:submit-lock nimbus)]
+ (let [to-cleanup-ids (locking submit-lock
+ (cleanup-storm-ids conf storm-cluster-state))]
+ (when-not (empty? to-cleanup-ids)
+ (doseq [id to-cleanup-ids]
+ (.teardown-heartbeats! storm-cluster-state id)
+ (.teardown-task-errors! storm-cluster-state id)
+ (swap! (:task-heartbeats-cache nimbus) dissoc id)
+ )
+ (log-message "Cleaned up topology task heartbeats: " (pr-str to-cleanup-ids))
+ ))))
-(defn mk-cleanup-fn [nimbus]
- (fn []
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- conf (:conf nimbus)
- submit-lock (:submit-lock nimbus)
- id->kill-time (locking submit-lock
- (topologies-to-kill storm-cluster-state))]
- (when-not (empty? id->kill-time)
- (log-message "Entering kill loop for " (pr-str id->kill-time)))
- (loop [kill-order (sort-by second (seq id->kill-time))]
- (when-not (empty? kill-order)
- (let [[id kill-time] (first kill-order)
- now (current-time-secs)]
- (if (or (nil? kill-time) (>= now kill-time))
- (do
- ;; technically a supervisor could still think there's an assignment and try to d/l
- ;; this will cause supervisor to go down and come back up... eventually it should sync
- ;; TODO: removing code locally should be done separately (since topology that doesn't start will still have code)
- (log-message "Killing topology: " id)
- (rmr (master-stormdist-root conf id))
- (.remove-storm! storm-cluster-state id)
- (recur (rest kill-order))
- )
- (do
- (log-message "Waiting for " (- kill-time now) " seconds to kill topology " id)
- (sleep-secs (- kill-time now))
- (recur kill-order)
- ))
- )))
- (when-not (empty? id->kill-time)
- (log-message "Killed " (pr-str id->kill-time)))
- (let [to-cleanup-ids (locking submit-lock
- (cleanup-storm-ids conf storm-cluster-state))]
- (when-not (empty? to-cleanup-ids)
- (doseq [id to-cleanup-ids]
- (.teardown-heartbeats! storm-cluster-state id)
- (.teardown-task-errors! storm-cluster-state id)
- (swap! (:task-heartbeats-cache nimbus) dissoc id)
- )
- (log-message "Cleaned up topology task heartbeats: " (pr-str to-cleanup-ids))
- )))))
-
-(defn mk-reassign-fn [nimbus]
- (fn this []
- (let [conf (:conf nimbus)
- submit-lock (:submit-lock nimbus)
- storm-cluster-state (:storm-cluster-state nimbus)]
- (when (conf NIMBUS-REASSIGN)
- (locking submit-lock
- (let [callback (fn [& ignored] (.add (:event-manager nimbus) this))
- active-storm-ids (.active-storms storm-cluster-state)]
- (doseq [storm-id active-storm-ids]
- (let [base (.storm-base storm-cluster-state storm-id nil)]
- (if (= :active (-> base :status :type))
- (mk-assignments conf
- storm-id
- storm-cluster-state
- callback
- (:task-heartbeats-cache nimbus)))))
- ))))))
(defserverfn service-handler [conf]
(log-message "Starting Nimbus with conf " conf)
- (let [nimbus (nimbus-data conf)
- cleanup-fn (mk-cleanup-fn nimbus)
- reassign-fn (mk-reassign-fn nimbus)
- threads [(async-loop
- (fn []
- (.add (:event-manager nimbus) reassign-fn)
- (.add (:cleanup-manager nimbus) cleanup-fn)
- (when @(:active nimbus) (conf NIMBUS-MONITOR-FREQ-SECS))
- ))
- ]]
+ (let [nimbus (nimbus-data conf)]
+ (schedule-recurring (:timer nimbus)
+ 0
+ (conf NIMBUS-MONITOR-FREQ-SECS)
+ (fn []
+ (doseq [storm-id (.active-storms
+ (:storm-cluster-state nimbus))]
+ (transition! nimbus storm-id :monitor))
+ (do-cleanup nimbus)
+ ))
(reify Nimbus$Iface
(^void submitTopology
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
@@ -564,7 +656,7 @@
(.setup-heartbeats! storm-cluster-state storm-id)
(setup-storm-static conf storm-id storm-cluster-state)
(mk-assignments conf storm-id storm-cluster-state
- (fn [& ignored] (.add (:event-manager nimbus) reassign-fn))
+ (fn [& ignored] (transition! nimbus storm-id :monitor))
(:task-heartbeats-cache nimbus))
(start-storm storm-name storm-cluster-state storm-id))
))
@@ -574,30 +666,18 @@
(^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
(check-storm-active! nimbus storm-name true)
- (let [storm-cluster-state (:storm-cluster-state nimbus)
- storm-id (get-storm-id storm-cluster-state storm-name)
- wait-amt (if (.is_set_wait_secs options)
- (.get_wait_secs options)
- ((read-storm-conf conf storm-id) TOPOLOGY-MESSAGE-TIMEOUT-SECS)
+ (let [wait-amt (if (.is_set_wait_secs options)
+ (.get_wait_secs options)
)]
- (set-topology-status! nimbus
- storm-name
- {:type :killed
- :kill-time-secs (+ (current-time-secs) wait-amt)})
- (.add (:cleanup-manager nimbus) cleanup-fn)
+ (transition-name! nimbus storm-name [:kill wait-amt] true)
))
(activate [this storm-name]
- (set-topology-status! nimbus
- storm-name
- {:type :active})
+ (transition-name! nimbus storm-name :activate true)
)
(deactivate [this storm-name]
- (set-topology-status! nimbus
- storm-name
- {:type :inactive})
- )
+ (transition-name! nimbus storm-name :inactivate true))
(beginFileUpload [this]
(let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]
@@ -731,21 +811,13 @@
Shutdownable
(shutdown [this]
(log-message "Shutting down master")
- (reset! (:active nimbus) false)
- (doseq [t threads]
- (.interrupt t)
- (.join t))
- (doseq [manager (nimbus-managers nimbus)]
- (.shutdown manager))
+ (cancel-timer (:timer nimbus))
(.disconnect (:storm-cluster-state nimbus))
(log-message "Shut down master")
)
DaemonCommon
(waiting? [this]
- (and
- (every? (memfn sleeping?) threads)
- (every? (memfn waiting?) (nimbus-managers nimbus))
- )))))
+ (timer-waiting? (:timer nimbus))))))
(defn launch-server! [conf]
(validate-distributed-mode! conf)
View
83 src/clj/backtype/storm/util.clj
@@ -6,6 +6,7 @@
(:import [backtype.storm.utils Time Container ClojureTimerTask])
(:import [java.util UUID])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
+ (:import [java.util.concurrent Semaphore])
(:import [java.io File RandomAccessFile StringWriter PrintWriter])
(:import [java.lang.management ManagementFactory])
(:import [org.apache.commons.exec DefaultExecutor CommandLine])
@@ -533,27 +534,63 @@
(defn container-get [^Container container]
(. container object))
-(defn mk-timer []
- {:timer (Timer.) :scheduled (atom #{})})
+(defn to-millis [secs]
+ (* 1000 (long secs)))
-(defn scheduled? [timer id]
- (contains? @(:scheduled timer) id))
-
-(defn schedule [timer id delay afn]
- (when (scheduled? timer id)
- (throw (IllegalArgumentException. "Cannot schedule an already scheduled id")))
- (let [wrapped (fn [] (afn) (swap! (:scheduled timer) disj id))]
- (.schedule ^Timer (:timer timer)
- delay
- (ClojureTimerTask. wrapped))
- (swap! (:scheduled timer) conj id)
- ))
-
-(defn schedule-if-free [timer id delay afn]
- (if-not (scheduled? timer id)
- (schedule timer id delay afn)
- ))
-
-;; need a map from "states" to the corresponding event transition (transition can include a delay)
-;; {:rebalance {:delay 10 :action (fn [] ... :active)}
-;; {(fn [] ...)}}
+(defn- run-on-timer-thread [^Timer timer afn]
+ (let [wait-sem (Semaphore. 0)]
+ (.schedule timer
+ (ClojureTimerTask.
+ (fn []
+ (afn)
+ (.release wait-sem)))
+ (long 0))
+ (.acquire wait-sem)))
+
+(defnk mk-timer [:kill-fn (fn [& _] )]
+ (let [timer (Timer. true)
+ timer-thread (atom nil)]
+ (run-on-timer-thread timer
+ #(reset! timer-thread (Thread/currentThread)))
+ {:timer timer
+ :kill-fn kill-fn
+ :timer-thread @timer-thread
+ :active-tasks (atom 0)}))
+
+(defn- wrap-timer-fn [timer afn task-decrease]
+ (fn []
+ (try
+ (when (Time/isSimulating)
+ ;; this is so we can see if the timer is waiting or not
+ (Time/sleep 1))
+ (afn)
+ (swap! (:active-tasks timer) - task-decrease)
+ (catch Throwable t
+ ((:kill-fn timer) t)
+ (throw t)
+ ))))
+
+(defn schedule [timer delay-secs afn]
+ (swap! (:active-tasks timer) inc)
+ (.schedule ^Timer (:timer timer)
+ (ClojureTimerTask. (wrap-timer-fn timer afn 1))
+ (to-millis delay-secs)
+ ))
+
+(defn schedule-recurring [timer delay-secs recur-secs afn]
+ (swap! (:active-tasks timer) inc)
+ (.schedule ^Timer (:timer timer)
+ (ClojureTimerTask. (wrap-timer-fn timer afn 0))
+ (to-millis delay-secs)
+ (to-millis recur-secs)
+ ))
+
+(defn cancel-timer [timer]
+ (run-on-timer-thread (:timer timer) #(.cancel (:timer timer))))
+
+(defn timer-waiting? [timer]
+ (or (= 0 @(:active-tasks timer))
+ (Time/isThreadWaiting (:timer-thread timer))))
+
+(defn throw-runtime [& strs]
+ (throw (RuntimeException. (apply str strs))))
Please sign in to comment.
Something went wrong with that request. Please try again.