Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-8103
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jul 17, 2015
2 parents 6bc23af + 031d7d4 commit e43ac25
Show file tree
Hide file tree
Showing 226 changed files with 5,194 additions and 1,293 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ exportMethods("abs",
"atan",
"atan2",
"avg",
"between",
"cast",
"cbrt",
"ceiling",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[['path']] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {

determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
sparkSubmitBinName <- "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
sparkSubmitBinName <- "spark-submit.cmd"
}
sparkSubmitBinName
}
Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
column(jc)
})

#' between
#'
#' Test if the column is between the lower bound and upper bound, inclusive.
#'
#' @rdname column
#'
#' @param bounds lower and upper bounds
setMethod("between", signature(x = "Column"),
function(x, bounds) {
if (is.vector(bounds) && length(bounds) == 2) {
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
column(jc)
} else {
stop("bounds should be a vector of lower and upper bounds")
}
})

#' Casts the column to a different data type.
#'
#' @rdname column
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
#' @export
setGeneric("avg", function(x, ...) { standardGeneric("avg") })

#' @rdname column
#' @export
setGeneric("between", function(x, bounds) { standardGeneric("between") })

#' @rdname column
#' @export
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setMethod("count",
setMethod("agg",
signature(x = "GroupedData"),
function(x, ...) {
cols = list(...)
cols <- list(...)
stopifnot(length(cols) > 0)
if (is.character(cols[[1]])) {
cols <- varargsToEnv(...)
Expand All @@ -97,7 +97,7 @@ setMethod("agg",
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] = alias(cols[[n]], n)
cols[[n]] <- alias(cols[[n]], n)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
}
options <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
if (isInstanceOf(obj, "scala.Tuple2")) {
# JavaPairRDD[Array[Byte], Array[Byte]].

keyBytes = callJMethod(obj, "_1")
valBytes = callJMethod(obj, "_2")
keyBytes <- callJMethod(obj, "_1")
valBytes <- callJMethod(obj, "_2")
res <- list(unserialize(keyBytes),
unserialize(valBytes))
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("functions on binary files")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ test_that("zipPartitions() on RDDs", {
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ test_that("zipRDD() on RDDs", {
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down Expand Up @@ -483,7 +483,7 @@ test_that("cartesian() on RDDs", {
actual <- collect(cartesian(rdd, emptyRdd))
expect_equal(actual, list())

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
38 changes: 38 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
insertInto(df, "people")
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
})

test_that("convert NAs to null type in DataFrames", {
Expand Down Expand Up @@ -612,6 +638,18 @@ test_that("column functions", {
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
c9 <- toDegrees(c) + toRadians(c)

df <- jsonFile(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
expect_equal(collect(df2)[[3, 1]], FALSE)
expect_equal(collect(df2)[[3, 2]], TRUE)

df3 <- select(df, between(df$name, c("Apache", "Spark")))
expect_equal(collect(df3)[[1, 1]], TRUE)
expect_equal(collect(df3)[[2, 1]], FALSE)
expect_equal(collect(df3)[[3, 1]], TRUE)
})

test_that("column binary mathfunctions", {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("the textFile() function")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("textFile() on a local file returns an RDD", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ test_that("cleanClosure on R functions", {
# Test for overriding variables in base namespace (Issue: SparkR-196).
nums <- as.list(1:10)
rdd <- parallelize(sc, nums, 2L)
t = 4 # Override base::t in .GlobalEnv.
t <- 4 # Override base::t in .GlobalEnv.
f <- function(x) { x > t }
newF <- cleanClosure(f)
env <- environment(newF)
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
Expand Down
22 changes: 5 additions & 17 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,7 @@

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.apache.spark.scheduler.*;

/**
* Java clients should extend this class instead of implementing
Expand Down Expand Up @@ -94,4 +78,8 @@ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,10 @@ public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}

@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
private final Serializer serializer;

/** Array of file writers, one for each partition */
private BlockObjectWriter[] partitionWriters;
private DiskBlockObjectWriter[] partitionWriters;

public BypassMergeSortShuffleWriter(
SparkConf conf,
Expand All @@ -101,7 +101,7 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new BlockObjectWriter[numPartitions];
partitionWriters = new DiskBlockObjectWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
Expand All @@ -121,7 +121,7 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (BlockObjectWriter writer : partitionWriters) {
for (DiskBlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (BlockObjectWriter writer : partitionWriters) {
for (DiskBlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {

// Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
// after SPARK-5581 is fixed.
BlockObjectWriter writer;
DiskBlockObjectWriter writer;

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockObjectWriter;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempLocalBlockId;
import org.apache.spark.unsafe.PlatformDependent;

Expand All @@ -47,7 +47,7 @@ final class UnsafeSorterSpillWriter {
private final File file;
private final BlockId blockId;
private final int numRecordsToWrite;
private BlockObjectWriter writer;
private DiskBlockObjectWriter writer;
private int numRecordsSpilled = 0;

public UnsafeSorterSpillWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.util.control.ControlThrowable

import com.codahale.metrics.{Gauge, MetricRegistry}

Expand Down Expand Up @@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private object Logging {
try {
// We use reflection here to handle the case where users remove the
// slf4j-to-jul bridge order to route their logs to JUL.
val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
val bridgeClass = Utils.classForName("org.slf4j.bridge.SLF4JBridgeHandler")
bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
if (!installed) {
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ object Partitioner {
* produce an unexpected or incorrect result.
*/
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

def getPartition(key: Any): Int = key match {
Expand Down
Loading

0 comments on commit e43ac25

Please sign in to comment.