Skip to content

Commit

Permalink
Merge pull request #12 from apache/master
Browse files Browse the repository at this point in the history
Apache spark latest pull
  • Loading branch information
rekhajoshm committed Feb 9, 2018
2 parents a7fc787 + 4df84c3 commit 3a2d453
Show file tree
Hide file tree
Showing 428 changed files with 10,246 additions and 3,236 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ project/plugins/src_managed/
project/plugins/target/
python/lib/pyspark.zip
python/deps
python/test_coverage/coverage_data
python/test_coverage/htmlcov
python/pyspark/python
reports/
scalastyle-on-compile.generated.xml
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,8 @@ setMethod("selectExpr",
#'
#' @param x a SparkDataFrame.
#' @param colName a column name.
#' @param col a Column expression, or an atomic vector in the length of 1 as literal value.
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
#' vector in the length of 1 as literal value.
#' @return A SparkDataFrame with the new column added or the existing column replaced.
#' @family SparkDataFrame functions
#' @aliases withColumn,SparkDataFrame,character-method
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,9 @@ setMethod("last_day",
})

#' @details
#' \code{length}: Computes the length of a given string or binary column.
#' \code{length}: Computes the character length of a string data or number of bytes
#' of a binary data. The length of string data includes the trailing spaces.
#' The length of binary data includes binary zeros.
#'
#' @rdname column_string_functions
#' @aliases length length,Column-method
Expand Down
15 changes: 14 additions & 1 deletion R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) {
#' savedModel <- read.ml(path)
#' summary(savedModel)
#'
#' # multinomial logistic regression
#' # binary logistic regression against two classes with
#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts
#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
#' model <- spark.logit(training, Species ~ .,
#' upperBoundsOnCoefficients = ubc,
#' upperBoundsOnIntercepts = 1.0)
#'
#' # multinomial logistic regression
#' model <- spark.logit(training, Class ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # multinomial logistic regression with
#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts
#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
#' lbi <- as.array(c(0.0, 0.0))
#' model <- spark.logit(training, Species ~ ., family = "multinomial",
#' lowerBoundsOnCoefficients = lbc,
#' lowerBoundsOnIntercepts = lbi)
#' }
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
Expand Down
11 changes: 6 additions & 5 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
# POSIXct,POSIXlt -> Time
#
# list[T] -> Array[T], where T is one of above mentioned types
# Multi-element vector of any of the above (except raw) -> Array[T]
# environment -> Map[String, T], where T is a native type
# jobj -> Object, where jobj is an object created in the backend
# nolint end

getSerdeType <- function(object) {
type <- class(object)[[1]]
if (type != "list") {
type
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
"array"
} else if (type != "list") {
type
} else {
# Check if all elements are of same type
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
Expand All @@ -50,9 +53,7 @@ getSerdeType <- function(object) {
}

writeObject <- function(con, object, writeType = TRUE) {
# NOTE: In R vectors have same type as objects. So we don't support
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
# NOTE: In R vectors have same type as objects
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since 'is.na' only handles atomic vectors,
# lists and pairlists
Expand Down
47 changes: 47 additions & 0 deletions R/pkg/tests/fulltests/test_Serde.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,53 @@ test_that("SerDe of primitive types", {
expect_equal(class(x), "character")
})

test_that("SerDe of multi-element primitive vectors inside R data.frame", {
# vector of integers embedded in R data.frame
indices <- 1L:3L
myDf <- data.frame(indices)
myDf$data <- list(rep(0L, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0L, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "integer")

# vector of numeric embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(0, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "numeric")

# vector of logical embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(TRUE, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(TRUE, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "logical")

# vector of character embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep("abc", 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep("abc", 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "character")
})

test_that("SerDe of list of primitive types", {
x <- list(1L, 2L, 3L)
y <- callJStatic("SparkRHandler", "echo", x)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ test_that("spark.logit", {
# Petal.Width 0.42122607
# nolint end

# Test multinomial logistic regression againt three classes
# Test multinomial logistic regression against three classes
df <- suppressWarnings(createDataFrame(iris))
model <- spark.logit(df, Species ~ ., regParam = 0.5)
summary <- summary(model)
Expand Down Expand Up @@ -196,7 +196,7 @@ test_that("spark.logit", {
#
# nolint end

# Test multinomial logistic regression againt two classes
# Test multinomial logistic regression against two classes
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
Expand All @@ -208,7 +208,7 @@ test_that("spark.logit", {
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))

# Test binomial logistic regression againt two classes
# Test binomial logistic regression against two classes
model <- spark.logit(training, Species ~ ., regParam = 0.5)
summary <- summary(model)
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
Expand Down Expand Up @@ -239,7 +239,7 @@ test_that("spark.logit", {
prediction2 <- collect(select(predict(model2, df2), "prediction"))
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))

# Test binomial logistic regression againt two classes with upperBoundsOnCoefficients
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
# and upperBoundsOnIntercepts
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
Expand All @@ -252,7 +252,7 @@ test_that("spark.logit", {
expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
upperBoundsOnIntercepts = 1.0))

# Test binomial logistic regression againt two classes with lowerBoundsOnCoefficients
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
while (buf.hasRemaining()) {
channel.write(buf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ private boolean shouldPool(long size) {

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (shouldPool(size)) {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
Expand All @@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
return memory;
}
}
bufferPoolsBySize.remove(size);
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[(int) ((size + 7) / 8)];
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
Expand Down Expand Up @@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

if (shouldPool(size)) {
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.unsafe;

import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;

Expand Down Expand Up @@ -134,4 +135,25 @@ public void memoryDebugFillEnabledInTest() {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}

@Test
public void heapMemoryReuse() {
MemoryAllocator heapMem = new HeapMemoryAllocator();
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
MemoryBlock onheap1 = heapMem.allocate(513);
Object obj1 = onheap1.getBaseObject();
heapMem.free(onheap1);
MemoryBlock onheap2 = heapMem.allocate(514);
Assert.assertNotEquals(obj1, onheap2.getBaseObject());

// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// reuse the previous memory which has released.
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
Object obj3 = onheap3.getBaseObject();
heapMem.free(onheap3);
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
onEvent(executorBlacklisted);
}

@Override
public void onExecutorBlacklistedForStage(
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
onEvent(executorBlacklistedForStage);
}

@Override
public void onNodeBlacklistedForStage(
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
onEvent(nodeBlacklistedForStage);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
Expand All @@ -185,6 +182,11 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public abstract class RecordComparator {
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
int leftBaseLength,
Object rightBaseObject,
long rightBaseOffset);
long rightBaseOffset,
int rightBaseLength);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
int uaoSize = UnsafeAlignedOffset.getUaoSize();
if (prefixComparisonResult == 0) {
final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
// skip length
final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
final int baseLength1 = UnsafeAlignedOffset.getSize(baseObject1, baseOffset1 - uaoSize);
final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
// skip length
final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);
final int baseLength2 = UnsafeAlignedOffset.getSize(baseObject2, baseOffset2 - uaoSize);
return recordComparator.compare(baseObject1, baseOffset1, baseLength1, baseObject2,
baseOffset2, baseLength2);
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ final class UnsafeSorterSpillMerger {
prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
if (prefixComparisonResult == 0) {
return recordComparator.compare(
left.getBaseObject(), left.getBaseOffset(),
right.getBaseObject(), right.getBaseOffset());
left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
right.getBaseObject(), right.getBaseOffset(), right.getRecordLength());
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public UnsafeSorterSpillReader(
SparkEnv.get() == null ? 0.5 :
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5);

// SPARK-23310: Disable read-ahead input stream, because it is causing lock contention and perf
// regression for TPC-DS queries.
final boolean readAheadEnabled = SparkEnv.get() != null &&
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", false);

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ $(document).ready(function() {
requestedIncomplete = getParameterByName("showIncomplete", searchString);
requestedIncomplete = (requestedIncomplete == "true" ? true : false);

$.getJSON("api/v1/applications?limit=" + appLimit, function(response,status,jqXHR) {
appParams = {
limit: appLimit,
status: (requestedIncomplete ? "running" : "completed")
};

$.getJSON("api/v1/applications", appParams, function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
Expand Down
Loading

0 comments on commit 3a2d453

Please sign in to comment.