Skip to content
Browse files

Merge remote-tracking branch 'martin/exec-not-fork'

  • Loading branch information...
2 parents 360f11f + 0dcfade commit d19e8188812bc30a62a8a5e4609b3c78975ef05e @nathanmarz committed
Showing with 53 additions and 16 deletions.
  1. +53 −16 bin/storm
View
69 bin/storm
@@ -5,6 +5,7 @@ import sys
import random
import subprocess as sub
import getopt
+import re
def identity(x):
return x
@@ -61,6 +62,7 @@ def confvalue(name, extrapaths):
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return " ".join(tokens[1:])
+ return ""
def print_localconfvalue(name):
"""Syntax: [storm localconfvalue conf-name]
@@ -82,12 +84,37 @@ def print_remoteconfvalue(name):
"""
print name + ": " + confvalue(name, [STORM_DIR + "/conf"])
-def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):
- nativepath = confvalue("java.library.path", extrajars)
- args_str = " ".join(map(lambda s: "\"" + s + "\"", args))
- command = "java " + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp " + get_classpath(extrajars) + " " + klass + " " + args_str
- print "Running: " + command
- os.system(command)
+def parse_args(string):
+ r"""Takes a string of whitespace-separated tokens and parses it into a list.
+ Whitespace inside tokens may be quoted with single quotes, double quotes or
+ backslash (similar to command-line arguments in bash).
+
+ >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
+ ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
+ """
+ re_split = re.compile(r'''((?:
+ [^\s"'\\] |
+ "(?: [^"\\] | \\.)*" |
+ '(?: [^'\\] | \\.)*' |
+ \\.
+ )+)''', re.VERBOSE)
+ args = re_split.split(string)[1::2]
+ args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
+ args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
+ return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
+
+def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
+ all_args = [
+ "java", jvmtype, get_config_opts(),
+ "-Dstorm.home=" + STORM_DIR,
+ "-Djava.library.path=" + confvalue("java.library.path", extrajars),
+ "-cp", get_classpath(extrajars),
+ ] + jvmopts + [klass] + list(args)
+ print "Running: " + " ".join(all_args)
+ if fork:
+ os.spawnvp(os.P_WAIT, "java", all_args)
+ else:
+ os.execvp("java", all_args) # replaces the current process and never returns
def jar(jarfile, klass, *args):
"""Syntax: [storm jar topology-jar-path class ...]
@@ -103,7 +130,7 @@ def jar(jarfile, klass, *args):
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
- childopts="-Dstorm.jar=" + jarfile)
+ jvmopts=["-Dstorm.jar=" + jarfile])
def kill(*args):
"""Syntax: [storm kill topology-name [-w wait-time-secs]]
@@ -190,7 +217,8 @@ def shell(resourcesdir, command, *args):
"backtype.storm.command.shell_submission",
args=runnerargs,
jvmtype="-client",
- extrajars=[CONF_DIR])
+ extrajars=[CONF_DIR],
+ fork=True)
os.system("rm " + tmpjarpath)
def repl():
@@ -212,12 +240,15 @@ def nimbus(klass="backtype.storm.daemon.nimbus"):
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
- childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
+ jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
+ "-Dlogfile.name=nimbus.log",
+ "-Dlog4j.configuration=storm.log.properties",
+ ]
exec_storm_class(
klass,
jvmtype="-server",
extrajars=cppaths,
- childopts=childopts)
+ jvmopts=jvmopts)
def supervisor(klass="backtype.storm.daemon.supervisor"):
"""Syntax: [storm supervisor]
@@ -229,12 +260,15 @@ def supervisor(klass="backtype.storm.daemon.supervisor"):
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
- childopts = confvalue("supervisor.childopts", cppaths) + " -Dlogfile.name=supervisor.log -Dlog4j.configuration=storm.log.properties"
+ jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
+ "-Dlogfile.name=supervisor.log",
+ "-Dlog4j.configuration=storm.log.properties",
+ ]
exec_storm_class(
klass,
jvmtype="-server",
extrajars=cppaths,
- childopts=childopts)
+ jvmopts=jvmopts)
def ui():
"""Syntax: [storm ui]
@@ -247,11 +281,14 @@ def ui():
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
- childopts = confvalue("ui.childopts", cppaths) + " -Dlogfile.name=ui.log -Dlog4j.configuration=storm.log.properties"
+ jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
+ "-Dlogfile.name=ui.log",
+ "-Dlog4j.configuration=storm.log.properties",
+ ]
exec_storm_class(
"backtype.storm.ui.core",
jvmtype="-server",
- childopts=childopts,
+ jvmopts=jvmopts,
extrajars=[STORM_DIR + "/log4j", STORM_DIR, STORM_DIR + "/conf"])
def drpc():
@@ -263,11 +300,11 @@ def drpc():
See Distributed RPC for more information.
(https://github.com/nathanmarz/storm/wiki/Distributed-RPC)
"""
- childopts = "-Xmx768m -Dlogfile.name=drpc.log -Dlog4j.configuration=storm.log.properties"
+ jvmopts = ["-Xmx768m", "-Dlogfile.name=drpc.log", "-Dlog4j.configuration=storm.log.properties"]
exec_storm_class(
"backtype.storm.daemon.drpc",
jvmtype="-server",
- childopts=childopts,
+ jvmopts=jvmopts,
extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"])
def dev_zookeeper():

0 comments on commit d19e818

Please sign in to comment.
Something went wrong with that request. Please try again.