From f780fb1c19498246c1de3a86e8e7816359bf4069 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 10 May 2015 17:04:41 -0700 Subject: [PATCH] Add test demonstrating which compression codecs support concatenation. --- .../spark/io/CompressionCodecSuite.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 8c6035fb367fe..cf6a143537889 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.io import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import com.google.common.io.ByteStreams import org.scalatest.FunSuite import org.apache.spark.SparkConf @@ -62,6 +63,14 @@ class CompressionCodecSuite extends FunSuite { testCodec(codec) } + test("lz4 does not support concatenation of serialized streams") { + val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName) + assert(codec.getClass === classOf[LZ4CompressionCodec]) + intercept[Exception] { + testConcatenationOfSerializedStreams(codec) + } + } + test("lzf compression codec") { val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) assert(codec.getClass === classOf[LZFCompressionCodec]) @@ -74,6 +83,12 @@ class CompressionCodecSuite extends FunSuite { testCodec(codec) } + test("lzf supports concatenation of serialized streams") { + val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName) + assert(codec.getClass === classOf[LZFCompressionCodec]) + testConcatenationOfSerializedStreams(codec) + } + test("snappy compression codec") { val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) assert(codec.getClass === classOf[SnappyCompressionCodec]) @@ -86,9 +101,38 @@ class CompressionCodecSuite extends FunSuite { testCodec(codec) } + test("snappy does not support concatenation of serialized streams") { + val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName) + assert(codec.getClass === classOf[SnappyCompressionCodec]) + intercept[Exception] { + testConcatenationOfSerializedStreams(codec) + } + } + test("bad compression codec") { intercept[IllegalArgumentException] { CompressionCodec.createCodec(conf, "foobar") } } + + private def testConcatenationOfSerializedStreams(codec: CompressionCodec): Unit = { + val bytes1: Array[Byte] = { + val baos = new ByteArrayOutputStream() + val out = codec.compressedOutputStream(baos) + (0 to 64).foreach(out.write) + out.close() + baos.toByteArray + } + val bytes2: Array[Byte] = { + val baos = new ByteArrayOutputStream() + val out = codec.compressedOutputStream(baos) + (65 to 127).foreach(out.write) + out.close() + baos.toByteArray + } + val concatenatedBytes = codec.compressedInputStream(new ByteArrayInputStream(bytes1 ++ bytes2)) + val decompressed: Array[Byte] = new Array[Byte](128) + ByteStreams.readFully(concatenatedBytes, decompressed) + assert(decompressed.toSeq === (0 to 127)) + } }