Skip to content

Commit

Permalink
Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456)
Browse files Browse the repository at this point in the history
  • Loading branch information
nishkamravi2 committed Jun 9, 2014
1 parent 6b840f0 commit df2aeb1
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Expand Up @@ -126,7 +126,7 @@ class HadoopRDD[K, V](
private val createTime = new Date()

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = synchronized {
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
Expand All @@ -139,10 +139,13 @@ class HadoopRDD[K, V](
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
broadcastedConf.synchronized {
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
}
}

Expand Down

0 comments on commit df2aeb1

Please sign in to comment.