Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4705
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  • Loading branch information
Marcelo Vanzin committed Apr 28, 2015
2 parents bc885b7 + 8009810 commit f66dcc5
Show file tree
Hide file tree
Showing 162 changed files with 6,566 additions and 1,229 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ exportMethods(
"unpersist",
"value",
"values",
"zipPartitions",
"zipRDD",
"zipWithIndex",
"zipWithUniqueId"
Expand Down
53 changes: 52 additions & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
.Object
})

setMethod("show", "RDD",
function(.Object) {
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
.Object@env <- new.env()
.Object@env$isCached <- FALSE
Expand Down Expand Up @@ -967,7 +972,7 @@ setMethod("keyBy",
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numToInt(numPartitions), TRUE)
coalesce(x, numPartitions, TRUE)
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand Down Expand Up @@ -1590,3 +1595,49 @@ setMethod("intersection",

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})

#' Zips an RDD's partitions with one (or more) RDD(s).
#' Same as zipPartitions in Spark.
#'
#' @param ... RDDs to be zipped.
#' @param func A function to transform zipped partitions.
#' @return A new RDD by applying a function to the zipped partitions.
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
#' collect(zipPartitions(rdd1, rdd2, rdd3,
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
setMethod("zipPartitions",
"RDD",
function(..., func) {
rrdds <- list(...)
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, numPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}

rrdds <- lapply(rrdds, function(rdd) {
mapPartitionsWithIndex(rdd, function(partIndex, part) {
print(length(part))
list(list(partIndex, part))
})
})
union.rdd <- Reduce(unionRDD, rrdds)
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
res <- mapPartitions(zipped.rdd, function(plist) {
do.call(func, plist[[1]])
})
res
})
17 changes: 11 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

#' @rdname distinct
#' @export
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })

#' @rdname filterRDD
#' @export
Expand Down Expand Up @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
#' @rdname sortBy
#' @export
setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1L) {
function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy")
})

Expand Down Expand Up @@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
#' @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })

#' @rdname zipRDD
#' @export
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
signature = "...")

#' @rdname zipWithIndex
#' @seealso zipWithUniqueId
#' @export
Expand Down Expand Up @@ -244,7 +249,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })

#' @rdname keys
Expand Down Expand Up @@ -346,21 +351,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
#' @rdname sortByKey
#' @export
setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1L) {
function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey")
})

#' @rdname subtract
#' @export
setGeneric("subtract",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtract")
})

#' @rdname subtractByKey
#' @export
setGeneric("subtractByKey",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey")
})

Expand Down
24 changes: 12 additions & 12 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ setMethod("flatMapValues",
#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {

#if (missing(partitionFunc)) {
Expand All @@ -211,7 +211,7 @@ setMethod("partitionBy",
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
as.integer(numPartitions),
numToInt(numPartitions),
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
Expand All @@ -221,7 +221,7 @@ setMethod("partitionBy",

# Create a corresponding partitioner.
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
as.integer(numPartitions))
numToInt(numPartitions))

# Call partitionBy on the obtained PairwiseRDD.
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
Expand Down Expand Up @@ -256,7 +256,7 @@ setMethod("partitionBy",
#' @rdname groupByKey
#' @aliases groupByKey,RDD,integer-method
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
shuffled <- partitionBy(x, numPartitions)
groupVals <- function(part) {
Expand Down Expand Up @@ -315,7 +315,7 @@ setMethod("groupByKey",
#' @rdname reduceByKey
#' @aliases reduceByKey,RDD,integer-method
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
reduceVals <- function(part) {
vals <- new.env()
Expand Down Expand Up @@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally",
#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "integer"),
mergeCombiners = "ANY", numPartitions = "numeric"),
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
combineLocally <- function(part) {
combiners <- new.env()
Expand Down Expand Up @@ -483,7 +483,7 @@ setMethod("combineByKey",
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "integer"),
combOp = "ANY", numPartitions = "numeric"),
function(x, zeroValue, seqOp, combOp, numPartitions) {
createCombiner <- function(v) {
do.call(seqOp, list(zeroValue, v))
Expand Down Expand Up @@ -514,7 +514,7 @@ setMethod("aggregateByKey",
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "integer"),
func = "ANY", numPartitions = "numeric"),
function(x, zeroValue, func, numPartitions) {
aggregateByKey(x, zeroValue, func, func, numPartitions)
})
Expand Down Expand Up @@ -553,7 +553,7 @@ setMethod("join",
joinTaggedList(v, list(FALSE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
doJoin)
})

Expand Down Expand Up @@ -582,7 +582,7 @@ setMethod("join",
#' @rdname join-methods
#' @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -619,7 +619,7 @@ setMethod("leftOuterJoin",
#' @rdname join-methods
#' @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -659,7 +659,7 @@ setMethod("rightOuterJoin",
#' @rdname join-methods
#' @aliases fullOuterJoin,RDD,RDD-method
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down
33 changes: 33 additions & 0 deletions R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
})

test_that("zipPartitions() on RDDs", {
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))

mockFile = c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

rdd <- textFile(sc, fileName, 1)
actual <- collect(zipPartitions(rdd, rdd,
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
expected <- list(paste(mockFile, mockFile, sep = "\n"))
expect_equal(actual, expected)

rdd1 <- parallelize(sc, 0:1, 1)
actual <- collect(zipPartitions(rdd1, rdd,
func = function(x, y) { list(x + nchar(y)) }))
expected <- list(0:1 + nchar(mockFile))
expect_equal(actual, expected)

rdd <- map(rdd, function(x) { x })
actual <- collect(zipPartitions(rdd, rdd1,
func = function(x, y) { list(y + nchar(x)) }))
expect_equal(actual, expected)

unlink(fileName)
})
5 changes: 5 additions & 0 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,11 @@ test_that("collectAsMap() on a pairwise RDD", {
expect_equal(vals, list(`1` = "a", `2` = "b"))
})

test_that("show()", {
rdd <- parallelize(sc, list(1:10))
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
})

test_that("sampleByKey() on pairwise RDDs", {
rdd <- parallelize(sc, 1:2000)
pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
Expand Down
11 changes: 0 additions & 11 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>dist</id>
Expand All @@ -213,16 +212,6 @@
</plugins>
</build>
</profile>
<profile>
<id>kinesis-asl</id>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${commons.httpclient.version}</version>
</dependency>
</dependencies>
</profile>

<!-- Profiles that disable inclusion of certain dependencies. -->
<profile>
Expand Down
5 changes: 4 additions & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
"%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
set SPARK_CMD=%%i
)
del %LAUNCHER_OUTPUT%
%SPARK_CMD%
3 changes: 2 additions & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
Expand Down Expand Up @@ -39,6 +39,7 @@
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

Expand Down
7 changes: 5 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -275,7 +279,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.5.0</version>
<version>0.6.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -474,7 +478,6 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.3.2</version>
<executions>
<execution>
<id>sparkr-pkg</id>
Expand Down
Loading

0 comments on commit f66dcc5

Please sign in to comment.