Skip to content

Commit

Permalink
added rebalance thrift definition
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Dec 18, 2011
1 parent cf7a37b commit 277d4e6
Show file tree
Hide file tree
Showing 8 changed files with 1,374 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/bootstrap.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
InvalidTopologyException ClusterSummary TopologyInfo InvalidTopologyException ClusterSummary TopologyInfo
TopologySummary TaskSummary TaskStats TaskSpecificStats TopologySummary TaskSummary TaskStats TaskSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary SpoutStats BoltStats ErrorInfo SupervisorSummary
KillOptions JavaObject JavaObjectArg])) KillOptions RebalanceOptions JavaObject JavaObjectArg]))
(import (quote [backtype.storm.daemon.common StormBase Assignment (import (quote [backtype.storm.daemon.common StormBase Assignment
TaskInfo SupervisorInfo WorkerHeartbeat TaskHeartbeat])) TaskInfo SupervisorInfo WorkerHeartbeat TaskHeartbeat]))
(import (quote [backtype.storm.grouping CustomStreamGrouping])) (import (quote [backtype.storm.grouping CustomStreamGrouping]))
Expand Down
12 changes: 11 additions & 1 deletion src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
(locking (:submit-lock nimbus) (locking (:submit-lock nimbus)
(let [[event & event-args] (if (keyword? event) [event] event) (let [[event & event-args] (if (keyword? event) [event] event)
status (topology-status nimbus storm-id)] status (topology-status nimbus storm-id)]
(if status ; handles the case where event was scheduled but has been removed ;; handles the case where event was scheduled but topology has been removed
(if-not status
(log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
(let [get-event (fn [m e] (let [get-event (fn [m e]
(if (contains? m e) (if (contains? m e)
(m e) (m e)
Expand Down Expand Up @@ -647,6 +649,14 @@
(transition-name! nimbus storm-name [:kill wait-amt] true) (transition-name! nimbus storm-name [:kill wait-amt] true)
)) ))


(^void rebalance [this ^String storm-name ^RebalanceOptions options]
(check-storm-active! nimbus storm-name true)
(let [wait-amt (if (.is_set_wait_secs options)
(.get_wait_secs options)
)]
(transition-name! nimbus storm-name [:rebalance wait-amt] true)
))

(activate [this storm-name] (activate [this storm-name]
(transition-name! nimbus storm-name :activate true) (transition-name! nimbus storm-name :activate true)
) )
Expand Down
Loading

0 comments on commit 277d4e6

Please sign in to comment.