Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4705
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  • Loading branch information
Marcelo Vanzin committed Apr 24, 2015
2 parents 1aa309d + ebb77b2 commit 7c381ec
Show file tree
Hide file tree
Showing 54 changed files with 2,817 additions and 1,357 deletions.
8 changes: 7 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),

setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column")
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
Expand All @@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
})
nx <- select(x, cols)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down
36 changes: 18 additions & 18 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -306,7 +306,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "splits")
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})

Expand Down Expand Up @@ -452,8 +452,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(split, iterator) {
lapply(iterator, FUN)
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
Expand Down Expand Up @@ -538,8 +538,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
#' split * Reduce("+", part) })
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
Expand Down Expand Up @@ -813,7 +813,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
Expand All @@ -825,14 +825,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0

# Discards some random values to ensure each partition has a
# different random seed.
runif(split)
runif(partIndex)

for (elem in part) {
if (withReplacement) {
Expand Down Expand Up @@ -989,8 +989,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
func <- function(s, part) {
set.seed(s) # split as seed
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
Expand Down Expand Up @@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the splits of the text file are saved
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
Expand Down Expand Up @@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)

partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + split)
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
Expand Down Expand Up @@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}

partitionFunc <- function(split, part) {
if (split == 0) {
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[split]]
startIndex <- startIndices[[partIndex]]
}

mapply(
Expand Down
20 changes: 10 additions & 10 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minSplits)
as.integer(minPartitions)
}

#' Create an RDD from a text file.
Expand All @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
Expand All @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minSplits = NULL) {
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
Expand All @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
Expand All @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minSplits = NULL) {
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ setMethod("partitionBy",
get(name, .broadcastNames) })
jrdd <- getJRDD(x)

# We create a PairwiseRRDD that extends RDD[(Array[Byte],
# Array[Byte])], where the key is the hashed split, the value is
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
# where the key is the target partition number, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
Expand Down Expand Up @@ -866,8 +866,8 @@ setMethod("sampleByKey",
}

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
set.seed(bitwXor(seed, split))
samplingFunc <- function(partIndex, part) {
set.seed(bitwXor(seed, partIndex))
res <- vector("list", length(part))
len <- 0

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
# A result RDD.
mergePartitions <- function(rdd, zip) {
serializerMode <- getSerializedMode(rdd)
partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
rdd2 <- rdd
for (i in 1:12)
rdd2 <- lapplyPartitionsWithIndex(
rdd2, function(split, part) {
part <- as.list(unlist(part) * split + i)
rdd2, function(partIndex, part) {
part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
actual <- collect(rdd2)
Expand All @@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# PipelinedRDD
rdd2 <- lapplyPartitionsWithIndex(
rdd2,
function(split, part) {
part <- as.list(unlist(part) * split)
function(partIndex, part) {
part <- as.list(unlist(part) * partIndex)
})

cache(rdd2)
Expand Down Expand Up @@ -174,13 +174,13 @@ test_that("lapply with dependency", {
})

test_that("lapplyPartitionsWithIndex on RDDs", {
func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))

pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
mkTup <- function(splitIndex, part) { list(splitIndex, part) }
mkTup <- function(partIndex, part) { list(partIndex, part) }
actual <- collect(lapplyPartitionsWithIndex(
partitionBy(pairsRDD, 2L, partitionByParity),
mkTup),
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ test_that("select operators", {
df$age2 <- df$age * 2
expect_equal(columns(df), c("name", "age", "age2"))
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)

df$age2 <- NULL
expect_equal(columns(df), c("name", "age"))
df$age3 <- NULL
expect_equal(columns(df), c("name", "age"))
})

test_that("select with column", {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A zero-argument function that returns an R.
*/
public interface Function0<R> extends Serializable {
public R call() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
}
defaultProperties
Expand All @@ -97,6 +93,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
// Populate `sparkProperties` map from properties file
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

Expand All @@ -117,6 +115,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}
}

/**
* Remove keys that don't start with "spark." from `sparkProperties`.
*/
private def ignoreNonSparkProperties(): Unit = {
sparkProperties.foreach { case (k, v) =>
if (!k.startsWith("spark.")) {
sparkProperties -= k
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}

/**
* Load arguments from environment variables, Spark properties etc.
*/
Expand Down
Loading

0 comments on commit 7c381ec

Please sign in to comment.