Skip to content

Commit

Permalink
Small refactoring of SerializerPropertiesSuite to enable test re-use:
Browse files Browse the repository at this point in the history
This lays some groundwork for re-using this test logic for serializers defined
in other subprojects (those projects can just declare a test-jar dependency
on Spark core).
  • Loading branch information
JoshRosen committed May 3, 2015
1 parent b8a09fe commit c2fca17
Showing 1 changed file with 25 additions and 16 deletions.
Expand Up @@ -21,39 +21,40 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import scala.util.Random

import org.scalatest.FunSuite
import org.scalatest.{Assertions, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset

private case class MyCaseClass(foo: Int, bar: String)

class SerializerPropertiesSuite extends FunSuite {

import SerializerPropertiesSuite._

test("JavaSerializer does not support relocation") {
testSupportsRelocationOfSerializedObjects(new JavaSerializer(new SparkConf()))
val ser = new JavaSerializer(new SparkConf())
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
}

test("KryoSerializer supports relocation when auto-reset is enabled") {
val ser = new KryoSerializer(new SparkConf)
assert(ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
testSupportsRelocationOfSerializedObjects(ser)
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
}

test("KryoSerializer does not support relocation when auto-reset is disabled") {
val conf = new SparkConf().set("spark.kryo.registrator",
classOf[RegistratorWithoutAutoReset].getName)
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
testSupportsRelocationOfSerializedObjects(ser)
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
}

def testSupportsRelocationOfSerializedObjects(serializer: Serializer): Unit = {
val NUM_TRIALS = 100
if (!serializer.supportsRelocationOfSerializedObjects) {
return
}
val rand = new Random(42)
}

object SerializerPropertiesSuite extends Assertions {

def generateRandomItem(rand: Random): Any = {
val randomFunctions: Seq[() => Any] = Seq(
() => rand.nextInt(),
() => rand.nextString(rand.nextInt(10)),
Expand All @@ -66,14 +67,21 @@ class SerializerPropertiesSuite extends FunSuite {
(x, x)
}
)
def generateRandomItem(): Any = {
randomFunctions(rand.nextInt(randomFunctions.size)).apply()
}
randomFunctions(rand.nextInt(randomFunctions.size)).apply()
}

def testSupportsRelocationOfSerializedObjects(
serializer: Serializer,
generateRandomItem: Random => Any): Unit = {
if (!serializer.supportsRelocationOfSerializedObjects) {
return
}
val NUM_TRIALS = 10
val rand = new Random(42)
for (_ <- 1 to NUM_TRIALS) {
val items = {
// Make sure that we have duplicate occurrences of the same object in the stream:
val randomItems = Seq.fill(10)(generateRandomItem())
val randomItems = Seq.fill(10)(generateRandomItem(rand))
randomItems ++ randomItems.take(5)
}
val baos = new ByteArrayOutputStream()
Expand All @@ -99,5 +107,6 @@ class SerializerPropertiesSuite extends FunSuite {
deserializedItemsStream.close()
}
}

}

private case class MyCaseClass(foo: Int, bar: String)

0 comments on commit c2fca17

Please sign in to comment.