From 2710092790906d2a04952e4cf6230185eca3ca00 Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Fri, 8 Apr 2016 16:12:11 -0400 Subject: [PATCH 1/5] Make sure we only update streamparse_run paths in submit.py --- streamparse/cli/submit.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streamparse/cli/submit.py b/streamparse/cli/submit.py index 9d727b9b..09777444 100644 --- a/streamparse/cli/submit.py +++ b/streamparse/cli/submit.py @@ -212,12 +212,14 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2, for thrift_bolt in itervalues(topology_class.thrift_bolts): inner_shell = thrift_bolt.bolt_object.shell if isinstance(inner_shell, ShellComponent): - inner_shell.execution_command = streamparse_run_path + if 'streamparse_run' in inner_shell.execution_command: + inner_shell.execution_command = streamparse_run_path # Update python paths in spouts for thrift_spout in itervalues(topology_class.thrift_spouts): inner_shell = thrift_spout.spout_object.shell if isinstance(inner_shell, ShellComponent): - inner_shell.execution_command = streamparse_run_path + if 'streamparse_run' in inner_shell.execution_command: + inner_shell.execution_command = streamparse_run_path # Check topology for JVM stuff to see if we need to create uber-jar simple_jar = not any(isinstance(spec, JavaComponentSpec) From 752e8fb485f25033cc23ca358164cef5d4271cb0 Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Fri, 8 Apr 2016 16:12:44 -0400 Subject: [PATCH 2/5] Add serializer setting to config.json --- doc/source/quickstart.rst | 2 +- streamparse/bootstrap/project/config.jinja2.json | 4 ++-- streamparse/cli/submit.py | 15 +++++++++++++++ streamparse/run.py | 9 ++++++++- 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/doc/source/quickstart.rst b/doc/source/quickstart.rst index b4e3130e..568f0a73 100644 --- a/doc/source/quickstart.rst +++ b/doc/source/quickstart.rst @@ -307,7 +307,7 @@ in our ``config.json`` file: .. code-block:: json { - "library": "", + "serializer": "json", "topology_specs": "topologies/", "virtualenv_specs": "virtualenvs/", "envs": { diff --git a/streamparse/bootstrap/project/config.jinja2.json b/streamparse/bootstrap/project/config.jinja2.json index fce5d40e..778cb511 100644 --- a/streamparse/bootstrap/project/config.jinja2.json +++ b/streamparse/bootstrap/project/config.jinja2.json @@ -1,11 +1,11 @@ { - "library": "", + "serializer": "json", "topology_specs": "topologies/", "virtualenv_specs": "virtualenvs/", "envs": { "prod": { "user": "", - "ssh_password": "", + "ssh_password": "", "nimbus": "", "workers": [], "log": { diff --git a/streamparse/cli/submit.py b/streamparse/cli/submit.py index 09777444..fdc90362 100644 --- a/streamparse/cli/submit.py +++ b/streamparse/cli/submit.py @@ -221,6 +221,21 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2, if 'streamparse_run' in inner_shell.execution_command: inner_shell.execution_command = streamparse_run_path + serializer = env_config.get('serializer', 'json') + # Set serializer arg in bolts + for thrift_bolt in itervalues(topology_class.thrift_bolts): + inner_shell = thrift_bolt.bolt_object.shell + if isinstance(inner_shell, ShellComponent): + inner_shell.script = '-s {} {}'.format(serializer, + inner_shell.script) + # Set serializer arg in spouts + for thrift_spout in itervalues(topology_class.thrift_spouts): + inner_shell = thrift_spout.spout_object.shell + if isinstance(inner_shell, ShellComponent): + inner_shell.script = '-s {} {}'.format(serializer, + inner_shell.script) + + # Check topology for JVM stuff to see if we need to create uber-jar simple_jar = not any(isinstance(spec, JavaComponentSpec) for spec in topology_class.specs) diff --git a/streamparse/run.py b/streamparse/run.py index 1414de18..12115b65 100644 --- a/streamparse/run.py +++ b/streamparse/run.py @@ -7,6 +7,8 @@ import os import sys +from pystorm.component import _SERIALIZERS + def main(): """main entry point for Python bolts and spouts""" @@ -16,6 +18,11 @@ def main(): 'classes via ``python -m ' 'streamparse.run ``.') parser.add_argument('target_class', help='The bolt/spout class to start.') + parser.add_argument('-s', '--serializer', + help='The serialization protocol to use to talk to ' + 'Storm.', + choices=_SERIALIZERS.keys(), + default='json') args = parser.parse_args() # Add current directory to sys.path so imports will work sys.path.append(os.getcwd()) @@ -24,7 +31,7 @@ def main(): mod = importlib.import_module(mod_name) # Get class from module and run it cls = getattr(mod, cls_name) - cls().run() + cls(serializer=args.serializer).run() if __name__ == '__main__': From 8c96fc9571d37dd0dab5e4189bf70555436de47f Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Mon, 11 Apr 2016 16:18:30 -0400 Subject: [PATCH 3/5] Add --uber_jar arg to submit to force Uber-JAR creation --- streamparse/cli/submit.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/streamparse/cli/submit.py b/streamparse/cli/submit.py index fdc90362..cc9fbcf8 100644 --- a/streamparse/cli/submit.py +++ b/streamparse/cli/submit.py @@ -184,7 +184,8 @@ def _upload_jar(nimbus_client, local_path): def submit_topology(name=None, env_name="prod", workers=2, ackers=2, - options=None, force=False, debug=False, wait=None): + options=None, force=False, debug=False, wait=None, + simple_jar=True): """Submit a topology to a remote Storm cluster.""" config = get_config() name, topology_file = get_topology_definition(name) @@ -237,8 +238,9 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2, # Check topology for JVM stuff to see if we need to create uber-jar - simple_jar = not any(isinstance(spec, JavaComponentSpec) - for spec in topology_class.specs) + if simple_jar: + simple_jar = not any(isinstance(spec, JavaComponentSpec) + for spec in topology_class.specs) # Prepare a JAR that doesn't have Storm dependencies packaged topology_jar = jar_for_deploy(simple_jar=simple_jar) @@ -272,6 +274,11 @@ def subparser_hook(subparsers): add_name(subparser) add_options(subparser) add_par(subparser) + subparser.add_argument('-u', '--uber_jar', + help='Build an Uber-JAR even if you have no Java ' + 'components in your topology. Useful if you ' + 'are providing your own seriailzer class.', + dest='simple_jar', action='store_false') add_wait(subparser) add_workers(subparser) @@ -282,4 +289,4 @@ def main(args): submit_topology(name=args.name, env_name=args.environment, workers=args.workers, ackers=args.ackers, options=args.options, force=args.force, debug=args.debug, - wait=args.wait) + wait=args.wait, simple_jar=args.simple_jar) From d9c78b71782ae31c671127106878591bc3dbfcdf Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Mon, 11 Apr 2016 16:27:02 -0400 Subject: [PATCH 4/5] Allow serializer setting to specified for all environments or specific ones --- streamparse/cli/submit.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamparse/cli/submit.py b/streamparse/cli/submit.py index cc9fbcf8..fec54805 100644 --- a/streamparse/cli/submit.py +++ b/streamparse/cli/submit.py @@ -222,7 +222,7 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2, if 'streamparse_run' in inner_shell.execution_command: inner_shell.execution_command = streamparse_run_path - serializer = env_config.get('serializer', 'json') + serializer = env_config.get('serializer', config.get('serializer', 'json')) # Set serializer arg in bolts for thrift_bolt in itervalues(topology_class.thrift_bolts): inner_shell = thrift_bolt.bolt_object.shell From f9a01e9eb2e1b3222b451e4ff022c25b49ad60e8 Mon Sep 17 00:00:00 2001 From: Dan Blanchard Date: Mon, 11 Apr 2016 16:37:50 -0400 Subject: [PATCH 5/5] Work around Storm sending all arguments to streamparse_run as a single string --- streamparse/run.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streamparse/run.py b/streamparse/run.py index 12115b65..2e45048e 100644 --- a/streamparse/run.py +++ b/streamparse/run.py @@ -23,6 +23,9 @@ def main(): 'Storm.', choices=_SERIALIZERS.keys(), default='json') + # Storm sends everything as one string, which is not great + if len(sys.argv) == 2: + sys.argv = [sys.argv[0]] + sys.argv[1].split() args = parser.parse_args() # Add current directory to sys.path so imports will work sys.path.append(os.getcwd())