Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into unsafe-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 8, 2015
2 parents b95e642 + 008a60d commit 9883e30
Show file tree
Hide file tree
Showing 254 changed files with 14,810 additions and 3,833 deletions.
7 changes: 7 additions & 0 deletions .rat-excludes
Expand Up @@ -74,5 +74,12 @@ logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
json_expectation
local-1422981759269/*
local-1422981780767/*
local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -15,11 +15,11 @@ Suggests:
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
'schema.R'
'generics.R'
'jobj.R'
'RDD.R'
'pairRDD.R'
'schema.R'
'column.R'
'group.R'
'DataFrame.R'
Expand Down
8 changes: 0 additions & 8 deletions R/pkg/NAMESPACE
Expand Up @@ -26,7 +26,6 @@ exportMethods("cache",
"intersect",
"isLocal",
"join",
"length",
"limit",
"orderBy",
"names",
Expand All @@ -45,8 +44,6 @@ exportMethods("cache",
"showDF",
"sortDF",
"take",
"toJSON",
"toRDD",
"unionAll",
"unpersist",
"where",
Expand Down Expand Up @@ -95,19 +92,14 @@ export("cacheTable",
"createExternalTable",
"dropTempTable",
"jsonFile",
"jsonRDD",
"loadDF",
"parquetFile",
"sql",
"table",
"tableNames",
"tables",
"toDF",
"uncacheTable")

export("sparkRSQL.init",
"sparkRHive.init")

export("structField",
"structField.jobj",
"structField.character",
Expand Down
97 changes: 51 additions & 46 deletions R/pkg/R/DataFrame.R
Expand Up @@ -45,6 +45,9 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {

#' @rdname DataFrame
#' @export
#'
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the dataFrame is cached
dataFrame <- function(sdf, isCached = FALSE) {
new("DataFrame", sdf, isCached)
}
Expand Down Expand Up @@ -244,7 +247,7 @@ setMethod("columns",
})

#' @rdname columns
#' @export
#' @aliases names,DataFrame,function-method
setMethod("names",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -272,7 +275,7 @@ setMethod("names",
setMethod("registerTempTable",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName) {
callJMethod(x@sdf, "registerTempTable", tableName)
invisible(callJMethod(x@sdf, "registerTempTable", tableName))
})

#' insertInto
Expand Down Expand Up @@ -399,23 +402,23 @@ setMethod("repartition",
dataFrame(sdf)
})

#' toJSON
#'
#' Convert the rows of a DataFrame into JSON objects and return an RDD where
#' each element contains a JSON string.
#'
#' @param x A SparkSQL DataFrame
#' @return A StringRRDD of JSON objects
#' @rdname tojson
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newRDD <- toJSON(df)
#'}
# toJSON
#
# Convert the rows of a DataFrame into JSON objects and return an RDD where
# each element contains a JSON string.
#
#@param x A SparkSQL DataFrame
# @return A StringRRDD of JSON objects
# @rdname tojson
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# newRDD <- toJSON(df)
#}
setMethod("toJSON",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -578,8 +581,8 @@ setMethod("limit",
dataFrame(res)
})

# Take the first NUM rows of a DataFrame and return a the results as a data.frame

#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
#'
#' @rdname take
#' @export
#' @examples
Expand Down Expand Up @@ -644,22 +647,22 @@ setMethod("first",
take(x, 1)
})

#' toRDD()
#'
#' Converts a Spark DataFrame to an RDD while preserving column names.
#'
#' @param x A Spark DataFrame
#'
#' @rdname DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' rdd <- toRDD(df)
#' }
# toRDD()
#
# Converts a Spark DataFrame to an RDD while preserving column names.
#
# @param x A Spark DataFrame
#
# @rdname DataFrame
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# rdd <- toRDD(df)
# }
setMethod("toRDD",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -706,6 +709,7 @@ setMethod("groupBy",
#'
#' Compute aggregates by specifying a list of columns
#'
#' @param x a DataFrame
#' @rdname DataFrame
#' @export
setMethod("agg",
Expand All @@ -721,53 +725,53 @@ setMethod("agg",
# the requested map function. #
###################################################################################

#' @rdname lapply
# @rdname lapply
setMethod("lapply",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapply(rdd, FUN)
})

#' @rdname lapply
# @rdname lapply
setMethod("map",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})

#' @rdname flatMap
# @rdname flatMap
setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
flatMap(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapplyPartition(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreach",
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreach(rdd, func)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"),
function(x, func) {
Expand All @@ -788,6 +792,7 @@ setMethod("$", signature(x = "DataFrame"),
getColumn(x, name)
})

#' @rdname select
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
Expand Down Expand Up @@ -1009,7 +1014,7 @@ setMethod("sortDF",
})

#' @rdname sortDF
#' @export
#' @aliases orderBy,DataFrame,function-method
setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) {
Expand Down Expand Up @@ -1046,7 +1051,7 @@ setMethod("filter",
})

#' @rdname filter
#' @export
#' @aliases where,DataFrame,function-method
setMethod("where",
signature(x = "DataFrame", condition = "characterOrColumn"),
function(x, condition) {
Expand Down

0 comments on commit 9883e30

Please sign in to comment.