Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark1xJob #812

Merged
merged 1 commit into from
Mar 10, 2015

Conversation

jthi3rry
Copy link
Contributor

@jthi3rry jthi3rry commented Mar 4, 2015

@Tarrasch, @berkerpeksag, this is my follow up PR from comments in #806.

I deprecated both Spark1xJob and PySpark1xJob and created a single SparkSubmitTask that follows the command usage and can handle java, scala and python apps using spark standalone, yarn or mesos. I'm wondering whether SparkJob should be deprecated too.

Happy to discuss and make any changes.

Thanks

@landscape-bot
Copy link

Code Health
Repository health increased by 0.00% when pulling 0c8606b on jthi3rry:master into fed8dd5 on spotify:master.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.00% when pulling 203ffd4 on jthi3rry:master into fed8dd5 on spotify:master.


Strictly follows spark-submit usage::

Usage: spark-submit [options] <app jar | python file> [app options]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to just say something like "See spark-submit -h for more information.". Or just add documentation link of the spark-submit script.

@erikbern
Copy link
Contributor

erikbern commented Mar 4, 2015

Looks great but on a more general note I think it would be awesome if we can support inline Python code like in luigi.hadoop.

@landscape-bot
Copy link

Code Health
Repository health increased by 0.00% when pulling 1ed80a4 on jthi3rry:master into fed8dd5 on spotify:master.

@jthi3rry
Copy link
Contributor Author

jthi3rry commented Mar 5, 2015

Thanks for the review. Updated PR contains:

  • Incorporation of @berkerpeksag's comments and more global configuration options for the class and in the docs.
  • Deprecation warnings using version 1.1.0. @Tarrasch, let me know if that changes and I'll update.
  • Deprecation of SparkJob too as it seemed very YARN specific. Does that sound alright?

@erikbern, that would be awesome as an additional feature. A simple support for spark submit is necessary though to be able to use existing python drivers and java or scala drivers.

That could be introduced as a subclass (PySparkTask?) with a main(sc) abstract method where sc is a given spark context (like when using the pyspark shell). I'll have a look at luigi.hadoop to see how it's done. That's a good one for another PR :-)

On an unrelated note. Is there any kind of support in luigi for streaming tasks? It could be good to also be able to incorporate spark streaming in pipelines.

@jthi3rry jthi3rry force-pushed the master branch 3 times, most recently from 83884a0 to 058bfb0 Compare March 6, 2015 22:50
@jthi3rry jthi3rry changed the title Refactored (Py)Spark1xJob's into SparkSubmitTask SparkSubmitTask & PySparkTask Mar 7, 2015
@jthi3rry
Copy link
Contributor Author

jthi3rry commented Mar 7, 2015

@erikbern, added PySparkTask to run spark jobs inline.

Usage looks like this:

class MyJob(PySparkTask):
    param = IntParameter(default=2)

    def output(self):
        # return a target accessible from the spark cluster (HDFS, S3, ...)

    def main(self, sc, *args):
        exponent = self.param
        output = self.output().path
        sc.parallelize([1, 2, 3]).map(lambda x: x ** exponent).saveAsTextFile(output)

@erikbern
Copy link
Contributor

erikbern commented Mar 7, 2015

That's really cool – can you map over an input source as well?

@erikbern
Copy link
Contributor

erikbern commented Mar 7, 2015

If you want to, feel free to leave out the last commit for a separate PR. That way we can merge what we have so far

@jthi3rry jthi3rry changed the title SparkSubmitTask & PySparkTask Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark1xJob Mar 7, 2015
@jthi3rry jthi3rry force-pushed the master branch 2 times, most recently from e05320e to 1a9b820 Compare March 8, 2015 01:35
@jthi3rry
Copy link
Contributor Author

jthi3rry commented Mar 8, 2015

I removed the last commit and squashed the existing ones together.

Yes, the input path can be used as a source by doing something like sc.textFile(self.input().path)

@landscape-bot
Copy link

Code Health
Repository health decreased by 0.01% when pulling 021fcba on jthi3rry:master into 10064c8 on spotify:master.

@jthi3rry jthi3rry force-pushed the master branch 2 times, most recently from 30ef92c to b7b7449 Compare March 9, 2015 11:55
env = os.environ.copy()
temp_stderr = tempfile.TemporaryFile()
logger.info('Running: %s', repr(args))
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=temp_stderr, env=env, close_fds=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If stdout and stderr will always be text, you can open the process with universal_newline=True. The process will accept unicode as stdin and return unicode.

Also, any reason for env=env. According to my understanding of the doc, it's the default behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added universal_newline=True to #837.

#837 adds a get_environment() that alters the env before passing it to Popen.

@erikbern
Copy link
Contributor

erikbern commented Mar 9, 2015

Something seems weird with the delta now. Can you rebase on master?

@jthi3rry
Copy link
Contributor Author

jthi3rry commented Mar 9, 2015

Done @erikbern, does that fix it?

@erikbern
Copy link
Contributor

erikbern commented Mar 9, 2015

Yes looks good. Is this ready to be merged? How backwards-incompatible is it? Happy to merge just want to make sure there's no risk

@jthi3rry
Copy link
Contributor Author

jthi3rry commented Mar 9, 2015

It should be fully backwards compatible, all existing unit tests pass, SparkJob is unchanged, (Py)Spark1xJob's have more features now as they extend SparkSubmitTask but their behaviour is unchanged.

erikbern pushed a commit that referenced this pull request Mar 10, 2015
Added SparkSubmitTask and deprecated SparkJob, Spark1xJob and PySpark1xJob
@erikbern erikbern merged commit 021cb2b into spotify:master Mar 10, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants