diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 0adfb3423214d..9ea0da15f7947 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -363,24 +363,24 @@ private class ReturnStatementFinder extends ClassVisitor(ASM4) { } /** Helper class to identify a method. */ -private case class MethodIdentifier(cls: Class[_], name: String, desc: String) +private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String) /** * Find the fields accessed by a given class. * - * The fields are stored in the mutable map passed in by the class that contains them. + * The resulting fields are stored in the mutable map passed in through the constructor. * This map is assumed to have its keys already populated with the classes of interest. * * @param fields the mutable map that stores the fields to return - * @param findTransitively if true, find fields indirectly referenced in other classes - * @param specificMethod if not empty, visit only this method - * @param visitedMethods a list of visited methods to avoid cycles + * @param findTransitively if true, find fields indirectly referenced through method calls + * @param specificMethod if not empty, visit only this specific method + * @param visitedMethods a set of visited methods to avoid cycles */ private[util] class FieldAccessFinder( fields: Map[Class[_], Set[String]], findTransitively: Boolean, - specificMethod: Option[MethodIdentifier] = None, - visitedMethods: Set[MethodIdentifier] = Set.empty) + specificMethod: Option[MethodIdentifier[_]] = None, + visitedMethods: Set[MethodIdentifier[_]] = Set.empty) extends ClassVisitor(ASM4) { override def visitMethod( @@ -390,7 +390,7 @@ private[util] class FieldAccessFinder( sig: String, exceptions: Array[String]): MethodVisitor = { - // Ignore this method unless we are told to visit it + // If we are told to visit only a certain method and this is not the one, ignore it if (specificMethod.isDefined && (specificMethod.get.name != name || specificMethod.get.desc != desc)) { return null @@ -412,7 +412,7 @@ private[util] class FieldAccessFinder( if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { fields(cl) += name } - // Visit other methods to find fields that are transitively referenced + // Optionally visit other methods to find fields that are transitively referenced if (findTransitively) { val m = MethodIdentifier(cl, name, desc) if (!visitedMethods.contains(m)) { @@ -431,6 +431,11 @@ private[util] class FieldAccessFinder( private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) { var myName: String = null + // TODO: Recursively find inner closures that we indirectly reference, e.g. + // val closure1 = () = { () => 1 } + // val closure2 = () => { (1 to 5).map(closure1) } + // The second closure technically has two inner closures, but this finder only finds one + override def visit(version: Int, access: Int, name: String, sig: String, superName: String, interfaces: Array[String]) { myName = name diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 3ef1946d7bb5b..c5575471d18c2 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -32,8 +32,8 @@ import org.apache.spark.serializer.SerializerInstance */ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { - // Start a SparkContext so that SparkEnv.get.closureSerializer is accessible - // We do not actually use this explicitly except to stop it later + // Start a SparkContext so that the closure serializer is accessible + // We do not actually use this explicitly otherwise private var sc: SparkContext = null private var closureSerializer: SerializerInstance = null @@ -48,7 +48,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM closureSerializer = null } - // Some fields and methods that belong to this class, which is itself not serializable + // Some fields and methods to reference in inner closures later private val someSerializableValue = 1 private val someNonSerializableValue = new NonSerializable private def someSerializableMethod() = 1 @@ -86,19 +86,19 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assertSerializable(closure, serializableBefore) // If the resulting closure is not serializable even after // cleaning, we expect ClosureCleaner to throw a SparkException - intercept[SparkException] { + if (serializableAfter) { ClosureCleaner.clean(closure, checkSerializable = true, transitive) - // Otherwise, if we do expect the closure to be serializable after the - // clean, throw the SparkException ourselves so scalatest is happy - if (serializableAfter) { throw new SparkException("no-op") } + } else { + intercept[SparkException] { + ClosureCleaner.clean(closure, checkSerializable = true, transitive) + } } assertSerializable(closure, serializableAfter) } /** * Return the fields accessed by the given closure by class. - * This also optionally finds the fields transitively referenced through methods - * that belong to other classes. + * This also optionally finds the fields transitively referenced through methods invocations. */ private def findAccessedFields( closure: AnyRef, @@ -211,7 +211,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val outerObjects2 = getOuterObjects(closure2) assert(outerClasses1.size === outerObjects1.size) assert(outerClasses2.size === outerObjects2.size) - // These inner closures only reference local variables, and so do not have $outer pointer + // These inner closures only reference local variables, and so do not have $outer pointers assert(outerClasses1.isEmpty) assert(outerClasses2.isEmpty) } @@ -235,8 +235,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // This closure references the "test2" scope because it needs to find the method `y` // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 assert(outerClasses2.size === 3) - // This closure references the "test2" scope because it needs to find the - // `localValue` defined outside of this scope + // This closure references the "test2" scope because it needs to find the `localValue` + // defined outside of this scope assert(outerClasses3.size === 3) assert(isClosure(outerClasses2(0))) assert(isClosure(outerClasses3(0))) @@ -270,10 +270,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields2.isEmpty) assert(fields3.size === 2) // This corresponds to the "FunSuite#test" closure. This is empty because the - // field `closure3` references belongs to its parent (i.e. ClosureCleanerSuite2) + // `someSerializableValue` belongs to its parent (i.e. ClosureCleanerSuite2). assert(fields3(outerClasses3(0)).isEmpty) // This corresponds to the ClosureCleanerSuite2. This is also empty, however, - // because we did not find fields transitively (i.e. beyond 1 enclosing scope) + // because accessing a `ClosureCleanerSuite2#someSerializableValue` actually involves a + // method call. Since we do not find fields transitively, we will not recursively trace + // through the fields referenced by this method. assert(fields3(outerClasses3(1)).isEmpty) val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) @@ -283,7 +285,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields2t.isEmpty) assert(fields3t.size === 2) // Because we find fields transitively now, we are able to detect that we need the - // $outer pointer to get the field from the ClosureCleanerSuite2. + // $outer pointer to get the field from the ClosureCleanerSuite2 assert(fields3t(outerClasses3(0)).size === 1) assert(fields3t(outerClasses3(0)).head === "$outer") assert(fields3t(outerClasses3(1)).size === 1) @@ -304,7 +306,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val outerClasses3 = getOuterClasses(closure3) val outerClasses4 = getOuterClasses(closure4) - // First, find only fields the closures directly access + // First, find only fields accessed directly, not transitively, by these closures val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) @@ -312,23 +314,24 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM assert(fields1.isEmpty) // "test1" < "FunSuite#test" < ClosureCleanerSuite2 assert(fields2.size === 3) - assert(fields2(outerClasses2(0)).isEmpty) // `def a` is not a field - assert(fields2(outerClasses2(1)).isEmpty) - assert(fields2(outerClasses2(2)).isEmpty) + // Since we do not find fields transitively here, we do not look into what `def a` references + assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope + assert(fields2(outerClasses2(1)).isEmpty) // This corresponds to the "FunSuite#test" scope + assert(fields2(outerClasses2(2)).isEmpty) // This corresponds to the ClosureCleanerSuite2 assert(fields3.size === 3) - // Note that `localValue` is a field of the "test1" closure because `def a` needs it - // Further note that it is NOT a field of the "FunSuite#test" closure but a local variable + // Note that `localValue` is a field of the "test1" scope because `def a` references it, + // but NOT a field of the "FunSuite#test" scope because it is only a local variable there assert(fields3(outerClasses3(0)).size === 1) assert(fields3(outerClasses3(0)).head.contains("localValue")) assert(fields3(outerClasses3(1)).isEmpty) assert(fields3(outerClasses3(2)).isEmpty) assert(fields4.size === 3) + // Because `val someSerializableValue` is an instance variable, even an explicit reference + // here actually involves a method call to access the underlying value of the variable. + // Because we are not finding fields transitively here, we do not consider the fields + // accessed by this "method" (i.e. the val's accessor). assert(fields4(outerClasses4(0)).isEmpty) assert(fields4(outerClasses4(1)).isEmpty) - // Because `someSerializableValue` is a val, even an explicit reference here actually - // involves a method call to access the underlying value of the variable. Because we are - // not finding fields transitively here, we do not consider the fields accessed by this - // "method" (i.e. the val's accessor). assert(fields4(outerClasses4(2)).isEmpty) // Now do the same, but find fields that the closures transitively reference @@ -338,8 +341,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true) assert(fields1t.isEmpty) assert(fields2t.size === 3) - // This closure transitively references `localValue` because `def a` uses it - assert(fields2t(outerClasses2(0)).size === 1) + assert(fields2t(outerClasses2(0)).size === 1) // `def a` references `localValue` assert(fields2t(outerClasses2(0)).head.contains("localValue")) assert(fields2t(outerClasses2(1)).isEmpty) assert(fields2t(outerClasses2(2)).isEmpty) @@ -362,11 +364,11 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean basic serializable closures") { - val localSerializableVal = someSerializableValue + val localValue = someSerializableValue val closure1 = () => 1 val closure2 = () => Array[String]("a", "b", "c") val closure3 = (s: String, arr: Array[Long]) => s + arr.mkString(", ") - val closure4 = () => localSerializableVal + val closure4 = () => localValue val closure5 = () => new NonSerializable(5) // we're just serializing the class information val closure1r = closure1() val closure2r = closure2() @@ -395,7 +397,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM val closure4 = () => someNonSerializableValue val closure2 = () => someNonSerializableMethod() - // These are not cleanable because they ultimately reference the `this` pointer + // These are not cleanable because they ultimately reference the ClosureCleanerSuite2 testClean(closure1, serializableBefore = false, serializableAfter = false) testClean(closure2, serializableBefore = false, serializableAfter = false) testClean(closure3, serializableBefore = false, serializableAfter = false) @@ -404,13 +406,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean basic nested serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue val closure1 = (i: Int) => { - (1 to i).map { x => x + localSerializableValue } // 1 level of nesting + (1 to i).map { x => x + localValue } // 1 level of nesting } val closure2 = (j: Int) => { (1 to j).flatMap { x => - (1 to x).map { y => y + localSerializableValue } // 2 levels + (1 to x).map { y => y + localValue } // 2 levels } } val closure3 = (k: Int, l: Int, m: Int) => { @@ -426,6 +428,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM testClean(closure2, serializableBefore = true, serializableAfter = true) testClean(closure3, serializableBefore = true, serializableAfter = true) + // Verify that closures can still be invoked and the result still the same assert(closure1(1) === closure1r) assert(closure2(2) === closure2r) assert(closure3(3, 4, 5) === closure3r) @@ -434,9 +437,12 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM test("clean basic nested non-serializable closures") { def localSerializableMethod() = someSerializableValue val localNonSerializableValue = someNonSerializableValue + // These closures ultimately reference the ClosureCleanerSuite2 + // Note that even accessing `val` that is an instance variable involves a method call val closure1 = (i: Int) => { (1 to i).map { x => x + someSerializableValue } } val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } } val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } } + // This closure references a local non-serializable value val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } } // This is non-serializable no matter how many levels we nest it val closure5 = (m: Int) => { @@ -457,7 +463,10 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean complicated nested serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue + + // Here we assume that if the outer closure is serializable, + // then all inner closures must also be serializable // Reference local fields from all levels val closure1 = (i: Int) => { @@ -465,7 +474,7 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM (1 to i).flatMap { x => val b = a + 1 (1 to x).map { y => - y + a + b + localSerializableValue + y + a + b + localValue } } } @@ -479,8 +488,8 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM def b2 = a2 + 1 (1 to x).map { y => // If this references a method outside the outermost closure, then it will try to pull - // in the ClosureCleanerSuite2. This is why `localSerializableValue` here must be a val. - y + a1 + a2 + b1 + b2 + localSerializableValue + // in the ClosureCleanerSuite2. This is why `localValue` here must be a local `val`. + y + a1 + a2 + b1 + b2 + localValue } } } @@ -494,13 +503,13 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM } test("clean complicated nested non-serializable closures") { - val localSerializableValue = someSerializableValue + val localValue = someSerializableValue - // Note that we are not interested in cleaning the outer closures here + // Note that we are not interested in cleaning the outer closures here (they are not cleanable) // The only reason why they exist is to nest the inner closures val test1 = () => { - val a = localSerializableValue + val a = localValue val b = sc val inner1 = (x: Int) => x + a + b.hashCode() val inner2 = (x: Int) => x + a @@ -509,15 +518,15 @@ class ClosureCleanerSuite2 extends FunSuite with BeforeAndAfterAll with PrivateM // There is no way to clean it testClean(inner1, serializableBefore = false, serializableAfter = false) - // This closure is serializable to begin with since - // it does not have a pointer to the outer closure + // This closure is serializable to begin with since it does not need a pointer to + // the outer closure (it only references local variables) testClean(inner2, serializableBefore = true, serializableAfter = true) } // Same as above, but the `val a` becomes `def a` // The difference here is that all inner closures now have pointers to the outer closure val test2 = () => { - def a = localSerializableValue + def a = localValue val b = sc val inner1 = (x: Int) => x + a + b.hashCode() val inner2 = (x: Int) => x + a