Permalink
Browse files

added rebalance to storm client script

  • Loading branch information...
1 parent 277d4e6 commit 861d9ab112c95bf1ed614e4ef6b88c7c00ffd6ca @nathanmarz committed Dec 18, 2011
Showing with 23 additions and 1 deletion.
  1. +4 −1 bin/storm
  2. +19 −0 src/clj/backtype/storm/command/rebalance.clj
View
@@ -77,6 +77,9 @@ def activate(*args):
def deactivate(*args):
exec_storm_class("backtype.storm.command.deactivate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+def rebalance(*args):
+ exec_storm_class("backtype.storm.command.rebalance", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])
+
def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
@@ -113,7 +116,7 @@ def print_classpath():
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui,
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
- "activate": activate, "deactivate": deactivate}
+ "activate": activate, "deactivate": deactivate, "rebalance": rebalance}
def print_commands():
global COMMANDS
@@ -0,0 +1,19 @@
+(ns backtype.storm.command.rebalance
+ (:use [clojure.contrib.command-line :only [with-command-line]])
+ (:use [backtype.storm thrift config log])
+ (:import [backtype.storm.generated RebalanceOptions])
+ (:gen-class))
+
+
+(defn -main [& args]
+ (with-command-line args
+ "Rebalance a topology"
+ [[wait w "Override the amount of time to wait after deactivating before rebalancing" nil]
+ posargs]
+ (let [name (first posargs)
+ opts (RebalanceOptions.)]
+ (if wait (.set_wait_secs opts (Integer/parseInt wait)))
+ (with-configured-nimbus-connection nimbus
+ (.rebalance nimbus name opts)
+ (log-message "Topology " name " is rebalancing")
+ ))))

0 comments on commit 861d9ab

Please sign in to comment.