Skip to content

Commit

Permalink
[SPARK-4019] [SPARK-3740] Fix MapStatus compression bug that could le…
Browse files Browse the repository at this point in the history
…ad to empty results or Snappy errors

This commit fixes a bug in MapStatus that could cause jobs to wrongly return
empty results if those jobs contained stages with more than 2000 partitions
where most of those partitions were empty.

For jobs with > 2000 partitions, MapStatus uses HighlyCompressedMapStatus,
which only stores the average size of blocks.  If the average block size is
zero, then this will cause all blocks to be reported as empty, causing
BlockFetcherIterator to mistakenly skip them.

For example, this would return an empty result:

    sc.makeRDD(0 until 10, 1000).repartition(2001).collect()

This can also lead to deserialization errors (e.g. Snappy decoding errors)
for jobs with > 2000 partitions where the average block size is non-zero but
there is at least one empty block.  In this case, the BlockFetcher attempts to
fetch empty blocks and fails when trying to deserialize them.

The root problem here is that MapStatus has a (previously undocumented)
correctness property that was violated by HighlyCompressedMapStatus:

    If a block is non-empty, then getSizeForBlock must be non-zero.

I fixed this by modifying HighlyCompressedMapStatus to store the average size
of _non-empty_ blocks and to use a compressed bitmap to track which blocks are
empty.

I also removed a test which was broken as originally written: it attempted
to check that HighlyCompressedMapStatus's size estimation error was < 10%,
but this was broken because HighlyCompressedMapStatus is only used for map
statuses with > 2000 partitions, but the test only created 50.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#2866 from JoshRosen/spark-4019 and squashes the following commits:

fc8b490 [Josh Rosen] Roll back hashset change, which didn't improve performance.
5faa0a4 [Josh Rosen] Incorporate review feedback
c8b8cae [Josh Rosen] Two performance fixes:
3b892dd [Josh Rosen] Address Reynold's review comments
ba2e71c [Josh Rosen] Add missing newline
609407d [Josh Rosen] Use Roaring Bitmap to track non-empty blocks.
c23897a [Josh Rosen] Use sets when comparing collect() results
91276a3 [Josh Rosen] [SPARK-4019] Fix MapStatus compression bug that could lead to empty results.
  • Loading branch information
JoshRosen authored and pwendell committed Oct 23, 2014
1 parent 222fa47 commit 83b7a1c
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 34 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
76 changes: 64 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId

/**
Expand All @@ -29,7 +31,12 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: BlockManagerId

/** Estimated size for the reduce block, in bytes. */
/**
* Estimated size for the reduce block, in bytes.
*
* If a block is non-empty, then this method MUST return a non-zero size. This invariant is
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
*/
def getSizeForBlock(reduceId: Int): Long
}

Expand All @@ -38,7 +45,7 @@ private[spark] object MapStatus {

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
new HighlyCompressedMapStatus(loc, uncompressedSizes)
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
Expand Down Expand Up @@ -112,35 +119,80 @@ private[spark] class CompressedMapStatus(
}
}


/**
* A [[MapStatus]] implementation that only stores the average size of the blocks.
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap
* is compressed.
*
* @param loc location where the task is being executed.
* @param avgSize average size of all the blocks
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.sum / uncompressedSizes.length)
}
// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, 0L) // For deserialization only
protected def this() = this(null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = avgSize
override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
}
}

override def writeExternal(out: ObjectOutput): Unit = {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.add(i)
}
i += 1
}
val avgSize = if (numNonEmptyBlocks > 0) {
totalSize / numNonEmptyBlocks
} else {
0
}
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
5 changes: 5 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}

test("collect large number of empty partitions") {
// Regression test for SPARK-4019
assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet)
}

test("take") {
var nums = sc.makeRDD(Range(1, 1000), 1)
assert(nums.take(0).size === 0)
Expand Down
53 changes: 31 additions & 22 deletions core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer

import scala.util.Random

class MapStatusSuite extends FunSuite {

Expand All @@ -46,6 +47,26 @@ class MapStatusSuite extends FunSuite {
}
}

test("MapStatus should never report non-empty blocks' sizes as 0") {
import Math._
for (
numSizes <- Seq(1, 10, 100, 1000, 10000);
mean <- Seq(0L, 100L, 10000L, Int.MaxValue.toLong);
stddev <- Seq(0.0, 0.01, 0.5, 1.0)
) {
val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean)
val status = MapStatus(BlockManagerId("a", "b", 10), sizes)
val status1 = compressAndDecompressMapStatus(status)
for (i <- 0 until numSizes) {
if (sizes(i) != 0) {
val failureMessage = s"Failed with $numSizes sizes with mean=$mean, stddev=$stddev"
assert(status.getSizeForBlock(i) !== 0, failureMessage)
assert(status1.getSizeForBlock(i) !== 0, failureMessage)
}
}
}
}

test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
val sizes = Array.fill[Long](2001)(150L)
val status = MapStatus(null, sizes)
Expand All @@ -56,37 +77,25 @@ class MapStatusSuite extends FunSuite {
assert(status.getSizeForBlock(2000) === 150L)
}

test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") {
val sizes = Array.tabulate[Long](50) { i => i.toLong }
test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") {
val sizes = Array.tabulate[Long](3000) { i => i.toLong }
val avg = sizes.sum / sizes.filter(_ != 0).length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
val ser = new JavaSerializer(new SparkConf)
val buf = ser.newInstance().serialize(status)
val status1 = ser.newInstance().deserialize[MapStatus](buf)
val status1 = compressAndDecompressMapStatus(status)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.location == loc)
for (i <- 0 until sizes.length) {
// make sure the estimated size is within 10% of the input; note that we skip the very small
// sizes because the compression is very lossy there.
for (i <- 0 until 3000) {
val estimate = status1.getSizeForBlock(i)
if (estimate > 100) {
assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i),
s"incorrect estimated size $estimate, original was ${sizes(i)}")
if (sizes(i) > 0) {
assert(estimate === avg)
}
}
}

test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") {
val sizes = Array.tabulate[Long](3000) { i => i.toLong }
val avg = sizes.sum / sizes.length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
def compressAndDecompressMapStatus(status: MapStatus): MapStatus = {
val ser = new JavaSerializer(new SparkConf)
val buf = ser.newInstance().serialize(status)
val status1 = ser.newInstance().deserialize[MapStatus](buf)
assert(status1.location == loc)
for (i <- 0 until 3000) {
val estimate = status1.getSizeForBlock(i)
assert(estimate === avg)
}
ser.newInstance().deserialize[MapStatus](buf)
}
}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down

0 comments on commit 83b7a1c

Please sign in to comment.