Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 2, 2015
1 parent 16fbcfd commit 26c5072
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 43 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1769,8 +1769,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
val cleanTransitively = conf.getBoolean("spark.closureCleaner.transitive", true)
ClosureCleaner.clean(f, checkSerializable, cleanTransitively)
ClosureCleaner.clean(f, checkSerializable)
f
}

Expand Down
21 changes: 13 additions & 8 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] object ClosureCleaner extends Logging {
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
return f.getType :: getOuterClasses(f.get(obj))
return f.getType :: getOuterClasses(outer)
} else {
return f.getType :: Nil // Stop at the first $outer that is not a closure
}
Expand All @@ -79,9 +79,9 @@ private[spark] object ClosureCleaner extends Logging {
// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
return f.get(obj) :: getOuterObjects(f.get(obj))
return outer :: getOuterObjects(outer)
} else {
return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
return outer :: Nil // Stop at the first $outer that is not a closure
}
}
}
Expand All @@ -91,7 +91,10 @@ private[spark] object ClosureCleaner extends Logging {
/**
* Return a list of classes that represent closures enclosed in the given closure object.
*/
private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
if (!isClosure(obj.getClass)) {
throw new IllegalArgumentException(s"Expected a closure object; got ${obj.getClass.getName}")
}
val seen = Set[Class[_]](obj.getClass)
var stack = List[Class[_]](obj.getClass)
while (!stack.isEmpty) {
Expand All @@ -104,7 +107,7 @@ private[spark] object ClosureCleaner extends Logging {
stack = cls :: stack
}
}
return (seen - obj.getClass).toList
(seen - obj.getClass).toList
}

private def createNullValue(cls: Class[_]): AnyRef = {
Expand Down Expand Up @@ -153,12 +156,12 @@ private[spark] object ClosureCleaner extends Logging {
*
* class SomethingNotSerializable {
* def someValue = 1
* def scope(name: String)(body: => Unit) = body
* def someMethod(): Unit = scope("one") {
* def x = someValue
* def y = 2
* scope("two") { println(y + 1) }
* }
* def scope(name: String)(body: => Unit) = body
* }
*
* In this example, scope "two" is not serializable because it references scope "one", which
Expand Down Expand Up @@ -189,7 +192,7 @@ private[spark] object ClosureCleaner extends Logging {
logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}}) +++")

// A list of classes that represents closures enclosed in the given one
val innerClasses = getInnerClasses(func)
val innerClasses = getInnerClosureClasses(func)

// A list of enclosing objects and their respective classes, from innermost to outermost
// An outer object at a given index is of type outer class at the same index
Expand Down Expand Up @@ -280,7 +283,9 @@ private[spark] object ClosureCleaner extends Logging {
// the already populated accessed fields map of the starting closure
if (cleanTransitively && isClosure(clone.getClass)) {
logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})")
clean(clone, checkSerializable, cleanTransitively, accessedFields)
// No need to check serializable here for the outer closures because we're
// only interested in the serializability of the starting closure
clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
}
parent = clone
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,23 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
/**
* Helper method for testing whether closure cleaning works as expected.
* This cleans the given closure twice, with and without transitive cleaning.
*
* @param closure closure to test cleaning with
* @param serializableBefore if true, verify that the closure is serializable
* before cleaning, otherwise assert that it is not
* @param serializableAfter if true, assert that the closure is serializable
* after cleaning otherwise assert that it is not
*/
private def testClean(
private def verifyCleaning(
closure: AnyRef,
serializableBefore: Boolean,
serializableAfter: Boolean): Unit = {
testClean(closure, serializableBefore, serializableAfter, transitive = true)
testClean(closure, serializableBefore, serializableAfter, transitive = false)
verifyCleaning(closure, serializableBefore, serializableAfter, transitive = true)
verifyCleaning(closure, serializableBefore, serializableAfter, transitive = false)
}

/** Helper method for testing whether closure cleaning works as expected. */
private def testClean(
private def verifyCleaning(
closure: AnyRef,
serializableBefore: Boolean,
serializableAfter: Boolean,
Expand Down Expand Up @@ -113,15 +119,15 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM

// Accessors for private methods
private val _isClosure = PrivateMethod[Boolean]('isClosure)
private val _getInnerClasses = PrivateMethod[List[Class[_]]]('getInnerClasses)
private val _getInnerClosureClasses = PrivateMethod[List[Class[_]]]('getInnerClosureClasses)
private val _getOuterClasses = PrivateMethod[List[Class[_]]]('getOuterClasses)
private val _getOuterObjects = PrivateMethod[List[AnyRef]]('getOuterObjects)

private def isClosure(obj: AnyRef): Boolean = {
ClosureCleaner invokePrivate _isClosure(obj)
}

private def getInnerClasses(closure: AnyRef): List[Class[_]] = {
private def getInnerClosureClasses(closure: AnyRef): List[Class[_]] = {
ClosureCleaner invokePrivate _getInnerClasses(closure)
}

Expand All @@ -133,7 +139,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
ClosureCleaner invokePrivate _getOuterObjects(closure)
}

test("get inner classes") {
test("get inner closure classes") {
val closure1 = () => 1
val closure2 = () => { () => 1 }
val closure3 = (i: Int) => {
Expand Down Expand Up @@ -312,7 +318,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false)
val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false)
assert(fields1.isEmpty)
// "test1" < "FunSuite#test" < ClosureCleanerSuite2
// Note that the size here represents the number of outer classes, not the number of fields
// "test1" < parameter of "FunSuite#test" < ClosureCleanerSuite2
assert(fields2.size === 3)
// Since we do not find fields transitively here, we do not look into what `def a` references
assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope
Expand Down Expand Up @@ -376,11 +383,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val closure4r = closure4()
val closure5r = closure5()

testClean(closure1, serializableBefore = true, serializableAfter = true)
testClean(closure2, serializableBefore = true, serializableAfter = true)
testClean(closure3, serializableBefore = true, serializableAfter = true)
testClean(closure4, serializableBefore = true, serializableAfter = true)
testClean(closure5, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure1, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure2, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure3, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure4, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure5, serializableBefore = true, serializableAfter = true)

// Verify that closures can still be invoked and the result still the same
assert(closure1() === closure1r)
Expand All @@ -398,11 +405,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val closure2 = () => someNonSerializableMethod()

// These are not cleanable because they ultimately reference the ClosureCleanerSuite2
testClean(closure1, serializableBefore = false, serializableAfter = false)
testClean(closure2, serializableBefore = false, serializableAfter = false)
testClean(closure3, serializableBefore = false, serializableAfter = false)
testClean(closure4, serializableBefore = false, serializableAfter = false)
testClean(closure5, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure1, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure2, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure3, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure4, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure5, serializableBefore = false, serializableAfter = false)
}

test("clean basic nested serializable closures") {
Expand All @@ -424,9 +431,9 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val closure2r = closure2(2)
val closure3r = closure3(3, 4, 5)

testClean(closure1, serializableBefore = true, serializableAfter = true)
testClean(closure2, serializableBefore = true, serializableAfter = true)
testClean(closure3, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure1, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure2, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure3, serializableBefore = true, serializableAfter = true)

// Verify that closures can still be invoked and the result still the same
assert(closure1(1) === closure1r)
Expand Down Expand Up @@ -455,11 +462,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
}
}

testClean(closure1, serializableBefore = false, serializableAfter = false)
testClean(closure2, serializableBefore = false, serializableAfter = false)
testClean(closure3, serializableBefore = false, serializableAfter = false)
testClean(closure4, serializableBefore = false, serializableAfter = false)
testClean(closure5, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure1, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure2, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure3, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure4, serializableBefore = false, serializableAfter = false)
verifyCleaning(closure5, serializableBefore = false, serializableAfter = false)
}

test("clean complicated nested serializable closures") {
Expand Down Expand Up @@ -496,8 +503,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM

val closure1r = closure1(1)
val closure2r = closure2(2)
testClean(closure1, serializableBefore = true, serializableAfter = true)
testClean(closure2, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure1, serializableBefore = true, serializableAfter = true)
verifyCleaning(closure2, serializableBefore = true, serializableAfter = true)
assert(closure1(1) == closure1r)
assert(closure2(2) == closure2r)
}
Expand All @@ -516,11 +523,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM

// This closure explicitly references a non-serializable field
// There is no way to clean it
testClean(inner1, serializableBefore = false, serializableAfter = false)
verifyCleaning(inner1, serializableBefore = false, serializableAfter = false)

// This closure is serializable to begin with since it does not need a pointer to
// the outer closure (it only references local variables)
testClean(inner2, serializableBefore = true, serializableAfter = true)
verifyCleaning(inner2, serializableBefore = true, serializableAfter = true)
}

// Same as above, but the `val a` becomes `def a`
Expand All @@ -532,17 +539,17 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val inner2 = (x: Int) => x + a

// As before, this closure is neither serializable nor cleanable
testClean(inner1, serializableBefore = false, serializableAfter = false)
verifyCleaning(inner1, serializableBefore = false, serializableAfter = false)

// This closure is no longer serializable because it now has a pointer to the outer closure,
// which is itself not serializable because it has a pointer to the ClosureCleanerSuite2.
// If we do not clean transitively, we will not null out this indirect reference.
testClean(inner2, serializableBefore = false, serializableAfter = false, transitive = false)
verifyCleaning(inner2, serializableBefore = false, serializableAfter = false, transitive = false)

// If we clean transitively, we will find that method `a` does not actually reference the
// outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out
// the outer closure's parent pointer. This will make `inner2` serializable.
testClean(inner2, serializableBefore = false, serializableAfter = true, transitive = true)
verifyCleaning(inner2, serializableBefore = false, serializableAfter = true, transitive = true)
}

// Same as above, but with more levels of nesting
Expand Down

0 comments on commit 26c5072

Please sign in to comment.