Skip to content

Commit

Permalink
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug
Browse files Browse the repository at this point in the history
This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for xerial/snappy-java#107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660
  • Loading branch information
JoshRosen authored and jeanlyn committed May 28, 2015
1 parent 20de7dd commit fbc87af
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
49 changes: 47 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.io

import java.io.{InputStream, OutputStream}
import java.io.{IOException, InputStream, OutputStream}

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
Expand Down Expand Up @@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
new SnappyOutputStream(s, blockSize)
new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

/**
* Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
* issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
* of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
*/
private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {

private[this] var closed: Boolean = false

override def write(b: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte]): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.write(b, off, len)
}

override def flush(): Unit = {
if (closed) {
throw new IOException("Stream is closed")
}
os.flush()
}

override def close(): Unit = {
if (!closed) {
closed = true
os.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -97,13 +96,6 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
// This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
// suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
// preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
// needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
synchronized (CachedBufferAllocator.class) {
CachedBufferAllocator.queueTable.clear();
}
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
Expand Down

0 comments on commit fbc87af

Please sign in to comment.