Skip to content

Commit

Permalink
Add test demonstrating which compression codecs support concatenation.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 11, 2015
1 parent 4a01c45 commit f780fb1
Showing 1 changed file with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.io

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.google.common.io.ByteStreams
import org.scalatest.FunSuite

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -62,6 +63,14 @@ class CompressionCodecSuite extends FunSuite {
testCodec(codec)
}

test("lz4 does not support concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
assert(codec.getClass === classOf[LZ4CompressionCodec])
intercept[Exception] {
testConcatenationOfSerializedStreams(codec)
}
}

test("lzf compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
Expand All @@ -74,6 +83,12 @@ class CompressionCodecSuite extends FunSuite {
testCodec(codec)
}

test("lzf supports concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testConcatenationOfSerializedStreams(codec)
}

test("snappy compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
Expand All @@ -86,9 +101,38 @@ class CompressionCodecSuite extends FunSuite {
testCodec(codec)
}

test("snappy does not support concatenation of serialized streams") {
val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
intercept[Exception] {
testConcatenationOfSerializedStreams(codec)
}
}

test("bad compression codec") {
intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
}
}

private def testConcatenationOfSerializedStreams(codec: CompressionCodec): Unit = {
val bytes1: Array[Byte] = {
val baos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(baos)
(0 to 64).foreach(out.write)
out.close()
baos.toByteArray
}
val bytes2: Array[Byte] = {
val baos = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(baos)
(65 to 127).foreach(out.write)
out.close()
baos.toByteArray
}
val concatenatedBytes = codec.compressedInputStream(new ByteArrayInputStream(bytes1 ++ bytes2))
val decompressed: Array[Byte] = new Array[Byte](128)
ByteStreams.readFully(concatenatedBytes, decompressed)
assert(decompressed.toSeq === (0 to 127))
}
}

0 comments on commit f780fb1

Please sign in to comment.