Skip to content

Commit

Permalink
[SPARK-32872][CORE] Prevent BytesToBytesMap at MAX_CAPACITY from exce…
Browse files Browse the repository at this point in the history
…eding growth threshold

### What changes were proposed in this pull request?

When BytesToBytesMap is at `MAX_CAPACITY` and reaches its growth threshold, `numKeys >= growthThreshold` is true but `longArray.size() / 2 < MAX_CAPACITY` is false. This correctly prevents the map from growing, but `canGrowArray` incorrectly remains true. Therefore the map keeps accepting new keys and exceeds its growth threshold. If we attempt to spill the map in this state, the UnsafeKVExternalSorter will not be able to reuse the long array for sorting. By this point the task has typically consumed all available memory, so the allocation of the new pointer array is likely to fail.

This PR fixes the issue by setting `canGrowArray` to false in this case. This prevents the map from accepting new elements when it cannot grow to accommodate them.

### Why are the changes needed?

Without this change, hash aggregations will fail when the number of groups per task is greater than `MAX_CAPACITY / 2 = 2^28` (approximately 268 million), and when the grouping aggregation is the only memory-consuming operator in its stage.

For example, the final aggregation in `SELECT COUNT(DISTINCT id) FROM tbl` fails when `tbl` contains 1 billion distinct values and when `spark.sql.shuffle.partitions=1`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Reproducing this issue requires building a very large BytesToBytesMap. Because this is infeasible to do in a unit test, this PR was tested manually by adding the following test to AbstractBytesToBytesMapSuite. Before this PR, the test fails in 8.5 minutes. With this PR, the test passes in 1.5 minutes.

```java
public abstract class AbstractBytesToBytesMapSuite {
  // ...
  Test
  public void respectGrowthThresholdAtMaxCapacity() {
    TestMemoryManager memoryManager2 =
        new TestMemoryManager(
            new SparkConf()
            .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
            .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 25600 * 1024 * 1024L)
            .set(package$.MODULE$.SHUFFLE_SPILL_COMPRESS(), false)
            .set(package$.MODULE$.SHUFFLE_COMPRESS(), false));
    TaskMemoryManager taskMemoryManager2 = new TaskMemoryManager(memoryManager2, 0);
    final long pageSizeBytes = 8000000 + 8; // 8 bytes for end-of-page marker
    final BytesToBytesMap map = new BytesToBytesMap(taskMemoryManager2, 1024, pageSizeBytes);

    try {
      // Insert keys into the map until it stops accepting new keys.
      for (long i = 0; i < BytesToBytesMap.MAX_CAPACITY; i++) {
        if (i % (1024 * 1024) == 0) System.out.println("Inserting element " + i);
        final long[] value = new long[]{i};
        BytesToBytesMap.Location loc = map.lookup(value, Platform.LONG_ARRAY_OFFSET, 8);
        Assert.assertFalse(loc.isDefined());
        boolean success =
            loc.append(value, Platform.LONG_ARRAY_OFFSET, 8, value, Platform.LONG_ARRAY_OFFSET, 8);
        if (!success) break;
      }

      // The map should grow to its max capacity.
      long capacity = map.getArray().size() / 2;
      Assert.assertTrue(capacity == BytesToBytesMap.MAX_CAPACITY);

      // The map should stop accepting new keys once it has reached its growth
      // threshold, which is half the max capacity.
      Assert.assertTrue(map.numKeys() == BytesToBytesMap.MAX_CAPACITY / 2);

      map.free();
    } finally {
      map.free();
    }
  }
}
```

Closes apache#29744 from ankurdave/SPARK-32872.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
ankurdave authored and dongjoon-hyun committed Sep 14, 2020
1 parent 0696f04 commit 72550c3
Showing 1 changed file with 15 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,21 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
} catch (SparkOutOfMemoryError oom) {
// If the map has reached its growth threshold, try to grow it.
if (numKeys >= growthThreshold) {
// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
} catch (SparkOutOfMemoryError oom) {
canGrowArray = false;
}
} else {
// The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from
// accepting any more new elements to make sure we don't exceed the load factor. If we
// need to spill later, this allows UnsafeKVExternalSorter to reuse the array for
// sorting.
canGrowArray = false;
}
}
Expand Down

0 comments on commit 72550c3

Please sign in to comment.