Skip to content

Commit

Permalink
Merge branch 'master' into sparkR-alias
Browse files Browse the repository at this point in the history
  • Loading branch information
falaki committed Jul 30, 2015
2 parents c1b88bd + 6d94bf6 commit f51cbef
Show file tree
Hide file tree
Showing 163 changed files with 5,446 additions and 1,297 deletions.
14 changes: 7 additions & 7 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

# @rdname intersection
# @export
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })
setGeneric("intersection",
function(x, other, numPartitions = 1) {
standardGeneric("intersection")
})

# @rdname keys
# @export
Expand Down Expand Up @@ -489,9 +491,7 @@ setGeneric("sample",
#' @rdname sample
#' @export
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname saveAsParquetFile
#' @export
Expand Down Expand Up @@ -553,8 +553,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn

#' @rdname withColumnRenamed
#' @export
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
standardGeneric("withColumnRenamed") })
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })


###################### Column Methods ##########################
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ setMethod("partitionBy",

packageNamesArr <- serialize(.sparkREnv$.packages,
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
get(name, .broadcastNames) })
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
jrdd <- getJRDD(x)

# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
Expand Down
9 changes: 6 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
connExists <- function(env) {
tryCatch({
exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
}, error = function(err) {
},
error = function(err) {
return(FALSE)
})
}
Expand Down Expand Up @@ -153,7 +154,8 @@ sparkR.init <- function(
.sparkREnv$backendPort <- backendPort
tryCatch({
connectBackend("localhost", backendPort)
}, error = function(err) {
},
error = function(err) {
stop("Failed to connect JVM\n")
})

Expand Down Expand Up @@ -264,7 +266,8 @@ sparkRHive.init <- function(jsc = NULL) {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
stop("Spark SQL is not built with Hive support")
})

Expand Down
6 changes: 4 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ test_that("create DataFrame from RDD", {
df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
Expand Down Expand Up @@ -609,7 +610,8 @@ test_that("write.df() as parquet file", {
test_that("test HiveContext", {
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
df <- createExternalTable(hiveCtx, "json", jsonPath, "json")
Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);

private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@VisibleForTesting
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final int initialSize;
private final int numPartitions;
private final int pageSizeBytes;
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
Expand Down Expand Up @@ -109,7 +109,10 @@ public UnsafeShuffleExternalSorter(
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
}
Expand Down Expand Up @@ -272,7 +275,11 @@ void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

private long freeMemory() {
Expand Down Expand Up @@ -346,23 +353,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public UnsafeShuffleWriter(
open();
}

@VisibleForTesting
public int maxRecordSizeBytes() {
return sorter.maxRecordSizeBytes;
}

/**
* This convenience method should only be called in test code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.util.collection.unsafe.sort;

import com.google.common.base.Charsets;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
import com.google.common.primitives.UnsignedLongs;

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.types.UTF8String;
Expand All @@ -37,32 +35,11 @@ private PrefixComparators() {}
public static final class StringPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
// TODO: can done more efficiently
byte[] a = Longs.toByteArray(aPrefix);
byte[] b = Longs.toByteArray(bPrefix);
for (int i = 0; i < 8; i++) {
int c = UnsignedBytes.compare(a[i], b[i]);
if (c != 0) return c;
}
return 0;
}

public long computePrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
byte[] padded = new byte[8];
System.arraycopy(bytes, 0, padded, 0, Math.min(bytes.length, 8));
return Longs.fromByteArray(padded);
}
}

public long computePrefix(String value) {
return value == null ? 0L : computePrefix(value.getBytes(Charsets.UTF_8));
return UnsignedLongs.compare(aPrefix, bPrefix);
}

public long computePrefix(UTF8String value) {
return value == null ? 0L : computePrefix(value.getBytes());
return value == null ? 0L : value.getPrefix();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.LinkedList;

import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,10 +44,7 @@ public final class UnsafeExternalSorter {

private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);

private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
@VisibleForTesting
static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;

private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
Expand Down Expand Up @@ -91,7 +91,19 @@ public UnsafeExternalSorter(
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
freeMemory();
return null;
}
});
}

// TODO: metrics tracking + integration with shuffle write metrics
Expand Down Expand Up @@ -147,7 +159,11 @@ public void spill() throws IOException {
}

private long getMemoryUsage() {
return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -214,23 +230,23 @@ private void allocateSpaceForRecord(int requiredSpace) throws IOException {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
if (requiredSpace > PAGE_SIZE) {
if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
PAGE_SIZE + ")");
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquired < PAGE_SIZE) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = PAGE_SIZE;
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,4 @@ private[spark] object Accumulators extends Logging {
}
}

def stringifyPartialValue(partialValue: Any): String = "%s".format(partialValue)

def stringifyValue(value: Any): String = "%s".format(value)
}
Loading

0 comments on commit f51cbef

Please sign in to comment.