Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail to decompress a stream #89

Closed
davies opened this issue Oct 25, 2016 · 4 comments
Closed

Fail to decompress a stream #89

davies opened this issue Oct 25, 2016 · 4 comments

Comments

@davies
Copy link

davies commented Oct 25, 2016

When LZ4 is used to compresss the shuffle files in Spark, we saw these failure:

java.io.IOException: Stream is corrupted
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
    at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
    at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
    at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:78)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortAggregateExec.scala:75)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 3212 of input buffer
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:206)
    ... 36 more

LZ4BlockInputStream is a fork of LZ4BlockInputStream here, in order to support concated stream: davies@cc1fa94

They have the same bug

    case COMPRESSION_METHOD_LZ4:
      if (compressedBuffer.length < originalLen) {
        compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
      }
      readFully(compressedBuffer, compressedLen);

I think compressedBuffer.length < originalLen should be
compressedBuffer.length < compressedLen

@davies
Copy link
Author

davies commented Oct 27, 2016

Since the LZ4BlockOutputStream could make sure that the originalLen is always larger than compressedLen, this is not bug actual.

@davies davies closed this as completed Oct 27, 2016
@JoshRosen
Copy link
Contributor

I'm continuing to investigate a stream corruption bug, so here are some notes based on me revisiting this PR (to hopefully save time for anyone else who comes across this):

The bug that @davies identified here looks like a duplicate of #70, where @mtopolnik described it as a performance issue (due to possible overallocation of more buffer space than is really needed).

The current code would be incorrect if the compressed data was longer than the original data, in which case we might write past the end of the buffer, but it turns out that LZ4BlockOutputStream has checks which switch from COMPRESSION_METHOD_LZ4 to COMPRESSION_METHOD_RAW in case the LZ4-compressed data is longer than the original data (see discussion at apache/spark#15632 (comment))..

I agree with other commenters on the linked Spark PR who pointed out that the existing code is somewhat dodgy / hard-to-understand because it's implicitly relying on guarantees from a distant part of the code. If the current behavior is intended then there should at least be a comment explaining the invariants that make it safe. Given the potential perf. impact mentioned in #70, though, I'd rather see this fixed completely via a code change to use the safer comparison.

@odaira
Copy link
Member

odaira commented May 23, 2017

Yes, I agree. I'll fix the code.

Do you still see the corrupted stream error in Spark? I think the problem is specific to your copied version of lz4-java, which supports concatenated stream, because the original version does not call LZ4BlockInputStream.refill() from LZ4BlockInputStream.available(). It looks weird to me that available() calls refill(), because usually available() should not call a blocking method like refill().

@odaira
Copy link
Member

odaira commented May 24, 2017

Fixed by c8f4371.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants