Skip to content

Commit

Permalink
In sc.runJob, actually clean the inner closure
Browse files Browse the repository at this point in the history
If a closure is passed into another closure as a reference, the
inner closure will be a field of the outer closure. In sc.runJob,
we used to only clean the outer closure, leaving the inner one
uncleaned.

Simple reproduction:

Wrap RDD#take in a closure. For instance:

  // The body "..." here contains a return statement
  def take(num: Int): Array[T] = (1 to 1).foreach { _ => ... }

Now if you call `sc.parallelize(1 to 10).take(5)`, the closure
cleaner will not be able to find the return statement in the
`foreach` closure. This is because it's not even cleaning the
`foreach` closure. Instead, it will fail with a not serializable
exception complaining the internal java.lang.Object
$nonLocalReturnKey is not serializable.
  • Loading branch information
Andrew Or committed Apr 29, 2015
1 parent 9187066 commit 3998168
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
// We must clean `func` here before using it in another closure below
// Otherwise, the closure cleaner will only clean the outer closure but not `func`
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, iter: Iterator[T]) => cleanedFunc(iter), partitions, allowLocal)
}

/**
Expand Down

0 comments on commit 3998168

Please sign in to comment.