Skip to content

Commit

Permalink
[SPARK-39771][CORE] Add a warning msg in Dependency when a too larg…
Browse files Browse the repository at this point in the history
…e number of shuffle blocks is to be created

### What changes were proposed in this pull request?
Add a warning msg in `Dependency` when a large number of shuffle blocks is to be created which may crash the driver with OOM.

### Why are the changes needed?
Warn user ahead of a potential failure, and hint a possible solution.

### Does this PR introduce _any_ user-facing change?

User may see a warning message as above when the number of shuffle blocks exceeds our threshold. There is no change to the execution outcome.

### How was this patch tested?

I added an unit test to ensure the warning message will be logged.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45266 from y-wei/y-wei/SPARK-39771.

Lead-authored-by: y-wei <yifei.wei@databricks.com>
Co-authored-by: Yifei Wei <52417396+y-wei@users.noreply.github.com>
Signed-off-by: Yuanjian Li <yuanjian.li@databricks.com>
  • Loading branch information
2 people authored and xuanyuanking committed Mar 6, 2024
1 parent fe2174d commit d705d86
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,18 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
finalizeTask = Option(task)
}

// Set the threshold to 1 billion which leads to an 128MB bitmap and
// the actual size of `HighlyCompressedMapStatus` can be much larger than 128MB.
// This may crash the driver with an OOM error.
if (numPartitions.toLong * partitioner.numPartitions.toLong > (1L << 30)) {
logWarning(
s"The number of shuffle blocks (${numPartitions.toLong * partitioner.numPartitions.toLong})" +
s" for shuffleId ${shuffleId} for ${_rdd} with ${numPartitions} partitions" +
" is possibly too large, which could cause the driver to crash with an out-of-memory" +
" error. Consider decreasing the number of partitions in this shuffle stage."
)
}

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,17 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalRootDi
}
assert(e.getMessage.contains("corrupted due to DISK_ISSUE"))
}

test("SPARK-39771: warn when shuffle block number is too large") {
sc = new SparkContext("local", "test", conf)
val logAppender = new LogAppender("warn when shuffle block number is too large")
withLogAppender(logAppender) {
sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + _).toDebugString
}
assert(logAppender
.loggingEvents
.count(_.getMessage.getFormattedMessage.contains(s"The number of shuffle blocks")) == 1)
}
}

/**
Expand Down

0 comments on commit d705d86

Please sign in to comment.