From 89585847a4c0340caf7f681ea24b3b00f029a295 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 3 May 2015 14:42:36 -0700 Subject: [PATCH] Fix bug in calculating free space in current page. This broke off-heap mode. --- .../spark/shuffle/unsafe/UnsafeShuffleWriter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index 4d65016577872..9554298c0f3f8 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -71,7 +71,7 @@ public class UnsafeShuffleWriter implements ShuffleWriter { private MapStatus mapStatus = null; private MemoryBlock currentPage = null; - private long currentPagePosition = PAGE_SIZE; + private long currentPagePosition = -1; /** * Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -110,11 +110,17 @@ public void write(scala.collection.Iterator> records) { } private void ensureSpaceInDataPage(long requiredSpace) throws Exception { + final long spaceInCurrentPage; + if (currentPage != null) { + spaceInCurrentPage = PAGE_SIZE - (currentPagePosition - currentPage.getBaseOffset()); + } else { + spaceInCurrentPage = 0; + } if (requiredSpace > PAGE_SIZE) { // TODO: throw a more specific exception? throw new Exception("Required space " + requiredSpace + " is greater than page size (" + PAGE_SIZE + ")"); - } else if (requiredSpace > (PAGE_SIZE - currentPagePosition)) { + } else if (requiredSpace > spaceInCurrentPage) { currentPage = memoryManager.allocatePage(PAGE_SIZE); currentPagePosition = currentPage.getBaseOffset(); allocatedPages.add(currentPage);