Skip to content

Commit

Permalink
[SPARK-7033][SPARKR] Clean usage of split. Use partition instead wher…
Browse files Browse the repository at this point in the history
…e applicable.
  • Loading branch information
Sun Rui committed Apr 22, 2015
1 parent 3a3f710 commit d531c86
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
36 changes: 18 additions & 18 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -306,7 +306,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "splits")
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})

Expand Down Expand Up @@ -452,8 +452,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(split, iterator) {
lapply(iterator, FUN)
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
Expand Down Expand Up @@ -538,8 +538,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
#' split * Reduce("+", part) })
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
Expand Down Expand Up @@ -813,7 +813,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
Expand All @@ -825,14 +825,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0

# Discards some random values to ensure each partition has a
# different random seed.
runif(split)
runif(partIndex)

for (elem in part) {
if (withReplacement) {
Expand Down Expand Up @@ -989,8 +989,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
func <- function(s, part) {
set.seed(s) # split as seed
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
Expand Down Expand Up @@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the splits of the text file are saved
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
Expand Down Expand Up @@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)

partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + split)
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
Expand Down Expand Up @@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}

partitionFunc <- function(split, part) {
if (split == 0) {
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[split]]
startIndex <- startIndices[[partIndex]]
}

mapply(
Expand Down
20 changes: 10 additions & 10 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minSplits)
as.integer(minPartitions)
}

#' Create an RDD from a text file.
Expand All @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
Expand All @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minSplits = NULL) {
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
Expand All @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
Expand All @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minSplits = NULL) {
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ setMethod("partitionBy",
get(name, .broadcastNames) })
jrdd <- getJRDD(x)

# We create a PairwiseRRDD that extends RDD[(Array[Byte],
# Array[Byte])], where the key is the hashed split, the value is
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
# where the key is the target partition number, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
Expand Down Expand Up @@ -866,8 +866,8 @@ setMethod("sampleByKey",
}

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
set.seed(bitwXor(seed, split))
samplingFunc <- function(partIndex, part) {
set.seed(bitwXor(seed, partIndex))
res <- vector("list", length(part))
len <- 0

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
# A result RDD.
mergePartitions <- function(rdd, zip) {
serializerMode <- getSerializedMode(rdd)
partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
Expand Down

0 comments on commit d531c86

Please sign in to comment.