Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added :startup transition event and replaced kill and rebalance :moni…

…tor transitions with it
  • Loading branch information...
commit 340dedca54f91c6766db74ed3908cdd42fe3f187 1 parent 95d0771
@nathanmarz authored
Showing with 11 additions and 13 deletions.
  1. +11 −13 src/clj/backtype/storm/daemon/nimbus.clj
View
24 src/clj/backtype/storm/daemon/nimbus.clj
@@ -95,12 +95,6 @@
nil
))
-;; there's a minor problem where rebalancing is scheduled over and over until it finally happens
-;; can fix this by either:
-;; 1. detecting whether it's scheduled or not...
-;; 2. not using monitor event, but rather some sort of "on startup" event
-;; 3. generating a "rebalance id" and only rebalancing if current status has that id
-
(defn state-transitions [nimbus storm-id status]
{:active {:monitor (reassign-transition nimbus storm-id)
:inactivate :inactive
@@ -114,7 +108,7 @@
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
}
- :killed {:monitor (fn [] (delay-event nimbus
+ :killed {:startup (fn [] (delay-event nimbus
storm-id
(:kill-time-secs status)
:remove))
@@ -125,7 +119,7 @@
storm-id)
nil)
}
- :rebalancing {:monitor (fn [] (delay-event nimbus
+ :rebalancing {:startup (fn [] (delay-event nimbus
storm-id
(:delay-secs status)
:do-rebalance))
@@ -143,7 +137,8 @@
(transition! nimbus storm-id event false))
([nimbus storm-id event error-on-no-transition?]
(locking (:submit-lock nimbus)
- (let [[event & event-args] (if (keyword? event) [event] event)
+ (let [system-events #{:startup :monitor}
+ [event & event-args] (if (keyword? event) [event] event)
status (topology-status nimbus storm-id)]
;; handles the case where event was scheduled but topology has been removed
(if-not status
@@ -156,8 +151,10 @@
" storm-id: " storm-id)]
(if error-on-no-transition?
(throw-runtime msg)
- (do (log-message msg) nil)
- ))))
+ (do (when-not (contains? system-events event)
+ (log-message msg))
+ nil))
+ )))
transition (-> (state-transitions nimbus storm-id status)
(get (:type status))
(get-event event))
@@ -623,12 +620,13 @@
(defserverfn service-handler [conf]
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf)]
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+ (transition! nimbus storm-id :startup))
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
- (doseq [storm-id (.active-storms
- (:storm-cluster-state nimbus))]
+ (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :monitor))
(do-cleanup nimbus)
))
Please sign in to comment.
Something went wrong with that request. Please try again.