Skip to content

Commit

Permalink
[SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to …
Browse files Browse the repository at this point in the history
…match their counterparts in Scala.

Author: Sun Rui <rui.sun@intel.com>

Closes apache#6007 from sun-rui/SPARK-7482 and squashes the following commits:

5c5cf5e [Sun Rui] Implement alias loadDF() as a new function.
3a30c10 [Sun Rui] Rename load()/save() to read.df()/write.df(). Also add loadDF()/saveDF() as aliases.
9f569d6 [Sun Rui] [SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.
  • Loading branch information
Sun Rui authored and shivaram committed May 13, 2015
1 parent 208b902 commit df9b94a
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 49 deletions.
6 changes: 4 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"sampleDF",
"sample",
"sample_frac",
"saveAsParquetFile",
"saveAsTable",
Expand All @@ -53,7 +53,8 @@ exportMethods("arrange",
"unpersist",
"where",
"withColumn",
"withColumnRenamed")
"withColumnRenamed",
"write.df")

exportClasses("Column")

Expand Down Expand Up @@ -101,6 +102,7 @@ export("cacheTable",
"jsonFile",
"loadDF",
"parquetFile",
"read.df",
"sql",
"table",
"tableNames",
Expand Down
35 changes: 22 additions & 13 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ setMethod("registerTempTable",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df2 <- loadDF(sqlCtx, path2, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' df2 <- read.df(sqlCtx, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
Expand Down Expand Up @@ -473,14 +473,14 @@ setMethod("distinct",
dataFrame(sdf)
})

#' SampleDF
#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
#'
#' @param x A SparkSQL DataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @rdname sampleDF
#' @rdname sample
#' @aliases sample_frac
#' @export
#' @examples
Expand All @@ -489,10 +489,10 @@ setMethod("distinct",
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' collect(sampleDF(df, FALSE, 0.5))
#' collect(sampleDF(df, TRUE, 0.5))
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#'}
setMethod("sampleDF",
setMethod("sample",
# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
signature(x = "DataFrame", withReplacement = "logical",
Expand All @@ -503,13 +503,13 @@ setMethod("sampleDF",
dataFrame(sdf)
})

#' @rdname sampleDF
#' @aliases sampleDF
#' @rdname sample
#' @aliases sample
setMethod("sample_frac",
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction) {
sampleDF(x, withReplacement, fraction)
sample(x, withReplacement, fraction)
})

#' Count
Expand Down Expand Up @@ -1303,17 +1303,17 @@ setMethod("except",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#'
#' @rdname saveAsTable
#' @rdname write.df
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' saveAsTable(df, "myfile")
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("saveDF",
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
Expand All @@ -1334,6 +1334,15 @@ setMethod("saveDF",
callJMethod(df@sdf, "save", source, jmode, options)
})

#' @rdname write.df
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

#' saveAsTable
#'
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
MAXINT)))))

# TODO(zongheng): investigate if this call is an in-place shuffle?
sample(samples)[1:total]
base::sample(samples)[1:total]
})

# Creates tuples of the elements in this RDD by applying a function.
Expand Down Expand Up @@ -996,7 +996,7 @@ setMethod("coalesce",
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
start <- as.integer(base::sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
pos <- (start + i) %% numPartitions
Expand Down
13 changes: 10 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
#' \dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df <- read.df(sqlCtx, path, "parquet")
#' registerTempTable(df, "table")
#' dropTempTable(sqlCtx, "table")
#' }
Expand Down Expand Up @@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- load(sqlCtx, "path/to/file.json", source = "json")
#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
#' }

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
Expand All @@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
read.df(sqlCtx, path, source, ...)
}

#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
Expand Down
22 changes: 13 additions & 9 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @export
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sample_frac",
setGeneric("sample",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
standardGeneric("sample")
})

#' @rdname sampleDF
#' @rdname sample
#' @export
setGeneric("sampleDF",
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sampleDF")
})
standardGeneric("sample_frac")
})

#' @rdname saveAsParquetFile
#' @export
Expand All @@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
standardGeneric("saveAsTable")
})

#' @rdname saveAsTable
#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })

#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })

Expand Down
40 changes: 20 additions & 20 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
})

test_that("insertInto() on a registered table", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", "overwrite")
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", "overwrite")
dfParquet <- read.df(sqlCtx, parquetPath, "parquet")

lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
saveDF(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
df2 <- read.df(sqlCtx, jsonPath2, "json")
write.df(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")

registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
Expand Down Expand Up @@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
expect_true(count(uniques) == 3)
})

test_that("sampleDF on a DataFrame", {
test_that("sample on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
sampled <- sampleDF(df, FALSE, 1.0)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
sampled2 <- sampleDF(df, FALSE, 0.1)
sampled2 <- sample(df, FALSE, 0.1)
expect_true(count(sampled2) < 3)

# Also test sample_frac
Expand Down Expand Up @@ -491,16 +491,16 @@ test_that("column calculation", {
expect_true(count(df2) == 3)
})

test_that("load() from json file", {
df <- loadDF(sqlCtx, jsonPath, "json")
test_that("read.df() from json file", {
df <- read.df(sqlCtx, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)
})

test_that("save() as parquet file", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", mode="overwrite")
df2 <- loadDF(sqlCtx, parquetPath, "parquet")
test_that("write.df() as parquet file", {
df <- read.df(sqlCtx, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlCtx, parquetPath, "parquet")
expect_true(inherits(df2, "DataFrame"))
expect_true(count(df2) == 3)
})
Expand Down Expand Up @@ -670,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
df2 <- read.df(sqlCtx, jsonPath2, "json")

unioned <- arrange(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
Expand Down Expand Up @@ -712,19 +712,19 @@ test_that("mutate() and rename()", {
expect_true(columns(newDF2)[1] == "newerAge")
})

test_that("saveDF() on DataFrame and works with parquetFile", {
test_that("write.df() on DataFrame and works with parquetFile", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(count(df), count(parquetDF))
})

test_that("parquetFile works with multiple input paths", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
saveDF(df, parquetPath2, "parquet", mode="overwrite")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)
Expand Down

0 comments on commit df9b94a

Please sign in to comment.