Skip to content

Commit

Permalink
More minor updates (wording, renaming etc.)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 25, 2015
1 parent 8b71cdb commit e45e904
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 52 deletions.
23 changes: 14 additions & 9 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,24 +363,24 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) {
}

/** Helper class to identify a method. */
private case class MethodIdentifier(cls: Class[_], name: String, desc: String)
private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)

/**
* Find the fields accessed by a given class.
*
* The fields are stored in the mutable map passed in by the class that contains them.
* The resulting fields are stored in the mutable map passed in through the constructor.
* This map is assumed to have its keys already populated with the classes of interest.
*
* @param fields the mutable map that stores the fields to return
* @param findTransitively if true, find fields indirectly referenced in other classes
* @param specificMethod if not empty, visit only this method
* @param visitedMethods a list of visited methods to avoid cycles
* @param findTransitively if true, find fields indirectly referenced through method calls
* @param specificMethod if not empty, visit only this specific method
* @param visitedMethods a set of visited methods to avoid cycles
*/
private[util] class FieldAccessFinder(
fields: Map[Class[_], Set[String]],
findTransitively: Boolean,
specificMethod: Option[MethodIdentifier] = None,
visitedMethods: Set[MethodIdentifier] = Set.empty)
specificMethod: Option[MethodIdentifier[_]] = None,
visitedMethods: Set[MethodIdentifier[_]] = Set.empty)
extends ClassVisitor(ASM4) {

override def visitMethod(
Expand All @@ -390,7 +390,7 @@ private[util] class FieldAccessFinder(
sig: String,
exceptions: Array[String]): MethodVisitor = {

// Ignore this method unless we are told to visit it
// If we are told to visit only a certain method and this is not the one, ignore it
if (specificMethod.isDefined &&
(specificMethod.get.name != name || specificMethod.get.desc != desc)) {
return null
Expand All @@ -412,7 +412,7 @@ private[util] class FieldAccessFinder(
if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
fields(cl) += name
}
// Visit other methods to find fields that are transitively referenced
// Optionally visit other methods to find fields that are transitively referenced
if (findTransitively) {
val m = MethodIdentifier(cl, name, desc)
if (!visitedMethods.contains(m)) {
Expand All @@ -431,6 +431,11 @@ private[util] class FieldAccessFinder(
private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
var myName: String = null

// TODO: Recursively find inner closures that we indirectly reference, e.g.
// val closure1 = () = { () => 1 }
// val closure2 = () => { (1 to 5).map(closure1) }
// The second closure technically has two inner closures, but this finder only finds one

override def visit(version: Int, access: Int, name: String, sig: String,
superName: String, interfaces: Array[String]) {
myName = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import org.apache.spark.serializer.SerializerInstance
*/
class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {

// Start a SparkContext so that SparkEnv.get.closureSerializer is accessible
// We do not actually use this explicitly except to stop it later
// Start a SparkContext so that the closure serializer is accessible
// We do not actually use this explicitly otherwise
private var sc: SparkContext = null
private var closureSerializer: SerializerInstance = null

Expand All @@ -48,7 +48,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
closureSerializer = null
}

// Some fields and methods that belong to this class, which is itself not serializable
// Some fields and methods to reference in inner closures later
private val someSerializableValue = 1
private val someNonSerializableValue = new NonSerializable
private def someSerializableMethod() = 1
Expand Down Expand Up @@ -86,19 +86,19 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
assertSerializable(closure, serializableBefore)
// If the resulting closure is not serializable even after
// cleaning, we expect ClosureCleaner to throw a SparkException
intercept[SparkException] {
if (serializableAfter) {
ClosureCleaner.clean(closure, checkSerializable = true, transitive)
// Otherwise, if we do expect the closure to be serializable after the
// clean, throw the SparkException ourselves so scalatest is happy
if (serializableAfter) { throw new SparkException("no-op") }
} else {
intercept[SparkException] {
ClosureCleaner.clean(closure, checkSerializable = true, transitive)
}
}
assertSerializable(closure, serializableAfter)
}

/**
* Return the fields accessed by the given closure by class.
* This also optionally finds the fields transitively referenced through methods
* that belong to other classes.
* This also optionally finds the fields transitively referenced through methods invocations.
*/
private def findAccessedFields(
closure: AnyRef,
Expand Down Expand Up @@ -211,7 +211,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val outerObjects2 = getOuterObjects(closure2)
assert(outerClasses1.size === outerObjects1.size)
assert(outerClasses2.size === outerObjects2.size)
// These inner closures only reference local variables, and so do not have $outer pointer
// These inner closures only reference local variables, and so do not have $outer pointers
assert(outerClasses1.isEmpty)
assert(outerClasses2.isEmpty)
}
Expand All @@ -235,8 +235,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
// This closure references the "test2" scope because it needs to find the method `y`
// Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2
assert(outerClasses2.size === 3)
// This closure references the "test2" scope because it needs to find the
// `localValue` defined outside of this scope
// This closure references the "test2" scope because it needs to find the `localValue`
// defined outside of this scope
assert(outerClasses3.size === 3)
assert(isClosure(outerClasses2(0)))
assert(isClosure(outerClasses3(0)))
Expand Down Expand Up @@ -270,10 +270,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
assert(fields2.isEmpty)
assert(fields3.size === 2)
// This corresponds to the "FunSuite#test" closure. This is empty because the
// field `closure3` references belongs to its parent (i.e. ClosureCleanerSuite2)
// `someSerializableValue` belongs to its parent (i.e. ClosureCleanerSuite2).
assert(fields3(outerClasses3(0)).isEmpty)
// This corresponds to the ClosureCleanerSuite2. This is also empty, however,
// because we did not find fields transitively (i.e. beyond 1 enclosing scope)
// because accessing a `ClosureCleanerSuite2#someSerializableValue` actually involves a
// method call. Since we do not find fields transitively, we will not recursively trace
// through the fields referenced by this method.
assert(fields3(outerClasses3(1)).isEmpty)

val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true)
Expand All @@ -283,7 +285,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
assert(fields2t.isEmpty)
assert(fields3t.size === 2)
// Because we find fields transitively now, we are able to detect that we need the
// $outer pointer to get the field from the ClosureCleanerSuite2.
// $outer pointer to get the field from the ClosureCleanerSuite2
assert(fields3t(outerClasses3(0)).size === 1)
assert(fields3t(outerClasses3(0)).head === "$outer")
assert(fields3t(outerClasses3(1)).size === 1)
Expand All @@ -304,31 +306,32 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val outerClasses3 = getOuterClasses(closure3)
val outerClasses4 = getOuterClasses(closure4)

// First, find only fields the closures directly access
// First, find only fields accessed directly, not transitively, by these closures
val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false)
val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false)
val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false)
val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false)
assert(fields1.isEmpty)
// "test1" < "FunSuite#test" < ClosureCleanerSuite2
assert(fields2.size === 3)
assert(fields2(outerClasses2(0)).isEmpty) // `def a` is not a field
assert(fields2(outerClasses2(1)).isEmpty)
assert(fields2(outerClasses2(2)).isEmpty)
// Since we do not find fields transitively here, we do not look into what `def a` references
assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope
assert(fields2(outerClasses2(1)).isEmpty) // This corresponds to the "FunSuite#test" scope
assert(fields2(outerClasses2(2)).isEmpty) // This corresponds to the ClosureCleanerSuite2
assert(fields3.size === 3)
// Note that `localValue` is a field of the "test1" closure because `def a` needs it
// Further note that it is NOT a field of the "FunSuite#test" closure but a local variable
// Note that `localValue` is a field of the "test1" scope because `def a` references it,
// but NOT a field of the "FunSuite#test" scope because it is only a local variable there
assert(fields3(outerClasses3(0)).size === 1)
assert(fields3(outerClasses3(0)).head.contains("localValue"))
assert(fields3(outerClasses3(1)).isEmpty)
assert(fields3(outerClasses3(2)).isEmpty)
assert(fields4.size === 3)
// Because `val someSerializableValue` is an instance variable, even an explicit reference
// here actually involves a method call to access the underlying value of the variable.
// Because we are not finding fields transitively here, we do not consider the fields
// accessed by this "method" (i.e. the val's accessor).
assert(fields4(outerClasses4(0)).isEmpty)
assert(fields4(outerClasses4(1)).isEmpty)
// Because `someSerializableValue` is a val, even an explicit reference here actually
// involves a method call to access the underlying value of the variable. Because we are
// not finding fields transitively here, we do not consider the fields accessed by this
// "method" (i.e. the val's accessor).
assert(fields4(outerClasses4(2)).isEmpty)

// Now do the same, but find fields that the closures transitively reference
Expand All @@ -338,8 +341,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true)
assert(fields1t.isEmpty)
assert(fields2t.size === 3)
// This closure transitively references `localValue` because `def a` uses it
assert(fields2t(outerClasses2(0)).size === 1)
assert(fields2t(outerClasses2(0)).size === 1) // `def a` references `localValue`
assert(fields2t(outerClasses2(0)).head.contains("localValue"))
assert(fields2t(outerClasses2(1)).isEmpty)
assert(fields2t(outerClasses2(2)).isEmpty)
Expand All @@ -362,11 +364,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
}

test("clean basic serializable closures") {
val localSerializableVal = someSerializableValue
val localValue = someSerializableValue
val closure1 = () => 1
val closure2 = () => Array[String]("a", "b", "c")
val closure3 = (s: String, arr: Array[Long]) => s + arr.mkString(", ")
val closure4 = () => localSerializableVal
val closure4 = () => localValue
val closure5 = () => new NonSerializable(5) // we're just serializing the class information
val closure1r = closure1()
val closure2r = closure2()
Expand Down Expand Up @@ -395,7 +397,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
val closure4 = () => someNonSerializableValue
val closure2 = () => someNonSerializableMethod()

// These are not cleanable because they ultimately reference the `this` pointer
// These are not cleanable because they ultimately reference the ClosureCleanerSuite2
testClean(closure1, serializableBefore = false, serializableAfter = false)
testClean(closure2, serializableBefore = false, serializableAfter = false)
testClean(closure3, serializableBefore = false, serializableAfter = false)
Expand All @@ -404,13 +406,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
}

test("clean basic nested serializable closures") {
val localSerializableValue = someSerializableValue
val localValue = someSerializableValue
val closure1 = (i: Int) => {
(1 to i).map { x => x + localSerializableValue } // 1 level of nesting
(1 to i).map { x => x + localValue } // 1 level of nesting
}
val closure2 = (j: Int) => {
(1 to j).flatMap { x =>
(1 to x).map { y => y + localSerializableValue } // 2 levels
(1 to x).map { y => y + localValue } // 2 levels
}
}
val closure3 = (k: Int, l: Int, m: Int) => {
Expand All @@ -426,6 +428,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
testClean(closure2, serializableBefore = true, serializableAfter = true)
testClean(closure3, serializableBefore = true, serializableAfter = true)

// Verify that closures can still be invoked and the result still the same
assert(closure1(1) === closure1r)
assert(closure2(2) === closure2r)
assert(closure3(3, 4, 5) === closure3r)
Expand All @@ -434,9 +437,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
test("clean basic nested non-serializable closures") {
def localSerializableMethod() = someSerializableValue
val localNonSerializableValue = someNonSerializableValue
// These closures ultimately reference the ClosureCleanerSuite2
// Note that even accessing `val` that is an instance variable involves a method call
val closure1 = (i: Int) => { (1 to i).map { x => x + someSerializableValue } }
val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } }
val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } }
// This closure references a local non-serializable value
val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } }
// This is non-serializable no matter how many levels we nest it
val closure5 = (m: Int) => {
Expand All @@ -457,15 +463,18 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
}

test("clean complicated nested serializable closures") {
val localSerializableValue = someSerializableValue
val localValue = someSerializableValue

// Here we assume that if the outer closure is serializable,
// then all inner closures must also be serializable

// Reference local fields from all levels
val closure1 = (i: Int) => {
val a = 1
(1 to i).flatMap { x =>
val b = a + 1
(1 to x).map { y =>
y + a + b + localSerializableValue
y + a + b + localValue
}
}
}
Expand All @@ -479,8 +488,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
def b2 = a2 + 1
(1 to x).map { y =>
// If this references a method outside the outermost closure, then it will try to pull
// in the ClosureCleanerSuite2. This is why `localSerializableValue` here must be a val.
y + a1 + a2 + b1 + b2 + localSerializableValue
// in the ClosureCleanerSuite2. This is why `localValue` here must be a local `val`.
y + a1 + a2 + b1 + b2 + localValue
}
}
}
Expand All @@ -494,13 +503,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
}

test("clean complicated nested non-serializable closures") {
val localSerializableValue = someSerializableValue
val localValue = someSerializableValue

// Note that we are not interested in cleaning the outer closures here
// Note that we are not interested in cleaning the outer closures here (they are not cleanable)
// The only reason why they exist is to nest the inner closures

val test1 = () => {
val a = localSerializableValue
val a = localValue
val b = sc
val inner1 = (x: Int) => x + a + b.hashCode()
val inner2 = (x: Int) => x + a
Expand All @@ -509,15 +518,15 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM
// There is no way to clean it
testClean(inner1, serializableBefore = false, serializableAfter = false)

// This closure is serializable to begin with since
// it does not have a pointer to the outer closure
// This closure is serializable to begin with since it does not need a pointer to
// the outer closure (it only references local variables)
testClean(inner2, serializableBefore = true, serializableAfter = true)
}

// Same as above, but the `val a` becomes `def a`
// The difference here is that all inner closures now have pointers to the outer closure
val test2 = () => {
def a = localSerializableValue
def a = localValue
val b = sc
val inner1 = (x: Int) => x + a + b.hashCode()
val inner2 = (x: Int) => x + a
Expand Down

0 comments on commit e45e904

Please sign in to comment.