Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thank you - SplittableGzip works out of the box with Apache Spark! #2

Closed
nchammas opened this issue Oct 5, 2019 · 9 comments
Closed

Comments

@nchammas
Copy link

nchammas commented Oct 5, 2019

I came across this project via a comment on a Spark Jira ticket where I was thinking about a way to split gzip files that is similar to what this project does. I was delighted to learn that someone had already thought through and implemented such a solution, and from the looks of it done a much better job at it than I could have.

So I just wanted to report here for the record, since gzipped files are a common stumbling block for Apache Spark users, that your solution works with Apache Spark without modification.

Here is an example, which I've tested against Apache Spark 2.4.4 using the Python DataFrame API:

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )

Run this script as follows:

spark-submit --packages "nl.basjes.hadoop:splittablegzip:1.2" splittable-gzip.py

Here's what I see in the Spark UI when I run this script against a 20 GB gzip file on my laptop:

Screen Shot 2019-10-04 at 8 46 35 PM

You can see in the task list the behavior described in the README, with each task reading from the start of the file to its target split.

And here is the Executor UI, which shows every available core running concurrently against this single file:

Screen Shot 2019-10-04 at 9 19 20 PM

I will experiment some more with this project -- and perhaps ask some questions on here, if you don't mind -- and then promote it on Stack Overflow and in the Boston area.

Thank you for open sourcing this work!

@nielsbasjes
Copy link
Owner

This is great to hear.
I'm putting your input into a readme so others can find it more easily.
Thanks !

@nielsbasjes
Copy link
Owner

I've added your examples to the documentation.
https://github.com/nielsbasjes/splittablegzip/blob/master/README-Spark.md
Thanks.

@nielsbasjes
Copy link
Owner

P.S. If I made a mistake in copying your documentation or if you have improvements then please put up a pull/merge request with the appropriate changes.

@nchammas
Copy link
Author

nchammas commented Oct 6, 2019

I have some follow-up questions about your project, to make sure I understand it well:

  1. In the simple case where you have 1 big file, would you agree that the optimal split size is roughly file size / number of cores? That way each core will decompress any given segment of the file no more than once, and every core will end up assigned the same amount of data.

  2. One gzip stumbling block I have seen people struggle with is needing a node in their cluster that is capable of holding the entire decompressed file. If you receive a single, 50 GB gzipped file that expands to several times the size, this can be a challenge depending on your infrastructure. Am I correct to say that this library will let you avoid ever needing to hold the entire decompressed file in memory or on disk? A task will only need to hold its target decompressed split.

  3. In the main README you write:

    This codec is only useful if there are less Gzipped input file(s) than available map task slots (i.e. some slots are idle during the input/map phase).

    Isn't this library still useful when you have more gzipped input files than cores, since it can help reduce or eliminate task skew? Say the input files are unevenly sized. Instead of tasks taking wildly varying times because their input files have very different sizes, this library lets your tasks split those larger gzipped files and reduce the downstream task runtime.

    Without this library you'd need to explicitly repartition/shuffle the data after the initial data load to even out the distribution of data across cores.

  4. In the main README you write:

    First of all this only works with Hadoop 0.21 and up because this depends on the presence of the SplittableCompressionCodec interface. So Hadoop 1.x is not yet supported (waiting for HADOOP-7823).

    Is this still relevant? It seems that HADOOP-7823 is fixed and that this library works with Hadoop 2.7 (which I believe Spark is built against).

@nchammas
Copy link
Author

Hi @nielsbasjes! Any update on my questions above?

I'm putting together a brief talk about SplittableGzip at the Boston Spark Meetup and want to make sure I have the basic facts right.

@nielsbasjes
Copy link
Owner

nielsbasjes commented Oct 19, 2019

  1. Yes, roughly. Due note that the true optimum depends also upon the rest of the flow (i.e. is the task doing the decompression also doing the first steps of parsing/processing).

  2. Yes, I think you are right in this expectation. I also expect that a node will only need to be able to handle the size of only it's split.

  3. Yes, if you have highly skewed files then this library will split the few 'very large' files while leaving the 'very small' files in a single split. This can indeed reduce skew if those very large files (occasionally) exist.

  4. Yes, that part of the documentation is a "bit outdated" ... I fixed that.

Something that may be of use for your presentation: Using this library also reduces the part of the job that needs to be "redone" in case of an external error (i.e. node failure).

@nielsbasjes
Copy link
Owner

@nchammas was this what you needed for your presentation?
If possible: I would really like to see what your presentation looks like.

@nchammas
Copy link
Author

Yes indeed, thank you for your responses! I’m giving the talk next week. I will share the slides with you after the talk.

One interesting thing I found in my research for the talk is the GZinga project from eBay, which also aims to provide a splittable gzip codec for use with Hadoop. If I understood correctly, the key difference in their project is that they write additional metadata to the gzipped files to make them seekable. This is interesting, but it also means that files not created with their library do not support random access and we’re back to square one.

With your library we don’t get true random access but we do get compatibility with gzipped files generated by any application or library, which is a better trade off. If you control how the data is written, you’re probably better off writing it with something other than gzip vs. extending gzip with custom metadata.

@nielsbasjes
Copy link
Owner

This GZinga project seems to be similar to the ideas discussed in https://issues.apache.org/jira/browse/HADOOP-7909

Essentially the tradeoff between

  • a custom file format that is backwards compatible with gzip (like gzinga)
  • any gzipped file but one time preprocessing is needed (to create some kind of index)
  • my approach which works with any file but will waste resources. As a consequence: I didn't have to rewrite any of the gzip code. I only had to change the way the compressed and uncompressed bytes are handled.

Good luck with your presentation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants