Skip to content

Commit

Permalink
Added command-line option to specify SPARK_HOME
Browse files Browse the repository at this point in the history
  • Loading branch information
ksbg committed Jun 22, 2018
1 parent 7caaf77 commit 5936e03
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
from setuptools import setup

setup(
name='sparklanes',
version='0.2.1',
version='0.2.2',
url='https://github.com/ksbg/sparklanes',
project_urls={
'sparklanes documentation': 'https://sparklanes.readthedocs.io/',
Expand Down
37 changes: 22 additions & 15 deletions sparklanes/_submit/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from subprocess import call, STDOUT

SPARK_SUBMIT_FLAGS = ['verbose', 'supervised']

MY_ENV = os.environ.copy()

def submit_to_spark():
"""Console-script entry point"""
Expand All @@ -35,7 +35,10 @@ def _package_and_submit(args):
dist_dir=dist,
custom_main=args['main'],
extra_data=args['extra_data'])
__run_spark_submit(lane_yaml=args['yaml'], dist_dir=dist, spark_args=args['spark_args'],
__run_spark_submit(lane_yaml=args['yaml'],
dist_dir=dist,
spark_home=args['spark_home'],
spark_args=args['spark_args'],
silent=args['silent'])

except Exception as exc:
Expand Down Expand Up @@ -66,6 +69,10 @@ def _parse_and_validate_args(args):
'and sent to Spark.')
parser.add_argument('-m', '--main', type=str, required=False,
help='Path to a custom main python file')
parser.add_argument('-d', '--spark-home', type=str, required=False,
help='Custom path to the directory containing your Spark installation. If '
'none is given, sparklanes will try to use the `spark-submit` command '
'from your PATH')
parser.add_argument('-s', '--spark-args', nargs='*', required=False,
help='Any additional arguments that should be sent to Spark via '
'spark-submit. '
Expand All @@ -75,7 +82,8 @@ def _parse_and_validate_args(args):
args = parser.parse_args(args).__dict__

# Check/fix files/dirs
args['package'] = __validate_and_fix_path(args['package'], check_dir=True)
for param in ('package', 'spark_home'):
args[param] = __validate_and_fix_path(args[param], check_dir=True)
for param in ('yaml', 'requirements', 'main'):
args[param] = __validate_and_fix_path(args[param], check_file=True)
if args['extra_data']:
Expand All @@ -101,14 +109,13 @@ def __validate_and_fix_path(path, check_file=False, check_dir=False):
if path is None:
return path
else:
abs_path = path
if not os.path.isabs(abs_path):
abs_path = os.path.abspath(os.path.join(os.path.abspath(os.curdir), abs_path))
if not (os.path.isfile(abs_path) if check_file else False) \
and not (os.path.isdir(abs_path) if check_dir else False):
if not (os.path.isfile(path) if check_file else False) \
and not (os.path.isdir(path) if check_dir else False):
raise SystemExit('Path `%s` does not exist' % path)
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(os.path.abspath(os.curdir), path))

return abs_path
return path


def __validate_and_fix_spark_args(spark_args):
Expand Down Expand Up @@ -194,7 +201,7 @@ def __package_dependencies(dist_dir, additional_reqs, silent):
devnull.close()

# Package
shutil.make_archive(os.path.join(dist_dir, 'libs'), 'zip', libs_dir, './')
shutil.make_archive(libs_dir, 'zip', libs_dir, './')


def __package_app(tasks_pkg, dist_dir, custom_main=None, extra_data=None):
Expand Down Expand Up @@ -250,7 +257,7 @@ def __package_app(tasks_pkg, dist_dir, custom_main=None, extra_data=None):
raise IOError('File `%s` not found at `%s`.' % (dat, real_path))


def __run_spark_submit(lane_yaml, dist_dir, spark_args, silent):
def __run_spark_submit(lane_yaml, dist_dir, spark_home, spark_args, silent):
"""
Submits the packaged application to spark using a `spark-submit` subprocess
Expand All @@ -261,24 +268,24 @@ def __run_spark_submit(lane_yaml, dist_dir, spark_args, silent):
spark_args (str): String of any additional spark config args to be passed when submitting
silent (bool): Flag indicating whether job output should be printed to console
"""
cmd = ['spark-submit']
# spark-submit binary
cmd = ['spark-submit' if spark_home is None else os.path.join(spark_home, 'bin/spark-submit')]

# Supplied spark arguments
if spark_args:
cmd += spark_args

# Packaged App & lane
cmd += ['--py-files', 'libs.zip,_framework.zip,tasks.zip', 'main.py']
if lane_yaml:
cmd += ['--lane', lane_yaml]
cmd += ['--lane', lane_yaml]

logging.info('Submitting to Spark')
logging.debug(str(cmd))

# Submit
devnull = open(os.devnull, 'w')
outp = {'stderr': STDOUT, 'stdout': devnull} if silent else {}
call(cmd, cwd=dist_dir, **outp)
call(cmd, cwd=dist_dir, env=MY_ENV, **outp)
devnull.close()


Expand Down

0 comments on commit 5936e03

Please sign in to comment.