Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into viz
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 27, 2015
2 parents 205f838 + 998aac2 commit fe7816f
Show file tree
Hide file tree
Showing 200 changed files with 5,851 additions and 1,994 deletions.
22 changes: 13 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
## Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original
author. Along with any pull requests, please state that the contribution is
your original work and that you license the work to the project under the
project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
*Before opening a pull request*, review the
[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).
It lists steps that are required before creating a PR. In particular, consider:

- Is the change important and ready enough to ask the community to spend time reviewing?
- Have you searched for existing, related JIRAs and pull requests?
- Is this a new feature that can stand alone as a package on http://spark-packages.org ?
- Is the change being proposed clearly explained and motivated?

Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
When you contribute code, you affirm that the contribution is your original work and that you
license the work to the project under the project's open source license. Whether or not you
state this explicitly, by submitting any copyrighted material via pull request, email, or
other means you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
8 changes: 7 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),

setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column")
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
Expand All @@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
})
nx <- select(x, cols)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down
38 changes: 19 additions & 19 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -306,7 +306,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "splits")
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})

Expand Down Expand Up @@ -452,8 +452,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(split, iterator) {
lapply(iterator, FUN)
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
Expand Down Expand Up @@ -538,8 +538,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
#' split * Reduce("+", part) })
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
Expand Down Expand Up @@ -813,7 +813,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
Expand All @@ -825,14 +825,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0

# Discards some random values to ensure each partition has a
# different random seed.
runif(split)
runif(partIndex)

for (elem in part) {
if (withReplacement) {
Expand Down Expand Up @@ -967,7 +967,7 @@ setMethod("keyBy",
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numToInt(numPartitions), TRUE)
coalesce(x, numPartitions, TRUE)
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand All @@ -989,8 +989,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
func <- function(s, part) {
set.seed(s) # split as seed
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
Expand Down Expand Up @@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the splits of the text file are saved
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
Expand Down Expand Up @@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)

partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + split)
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
Expand Down Expand Up @@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}

partitionFunc <- function(split, part) {
if (split == 0) {
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[split]]
startIndex <- startIndices[[partIndex]]
}

mapply(
Expand Down
20 changes: 10 additions & 10 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minSplits)
as.integer(minPartitions)
}

#' Create an RDD from a text file.
Expand All @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
Expand All @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minSplits = NULL) {
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
Expand All @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
Expand All @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minSplits = NULL) {
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

#' @rdname distinct
#' @export
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })

#' @rdname filterRDD
#' @export
Expand Down Expand Up @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
#' @rdname sortBy
#' @export
setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1L) {
function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy")
})

Expand Down Expand Up @@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })

#' @rdname keys
Expand Down Expand Up @@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
#' @rdname sortByKey
#' @export
setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1L) {
function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey")
})

#' @rdname subtract
#' @export
setGeneric("subtract",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtract")
})

#' @rdname subtractByKey
#' @export
setGeneric("subtractByKey",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey")
})

Expand Down
Loading

0 comments on commit fe7816f

Please sign in to comment.