Skip to content

Commit

Permalink
Merge pull request #2 from apache/master
Browse files Browse the repository at this point in the history
pull latest from apache spark
  • Loading branch information
rekhajoshm committed May 8, 2015
2 parents e3677c9 + 90527f5 commit 106fd8e
Show file tree
Hide file tree
Showing 352 changed files with 47,129 additions and 19,957 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ scalastyle.txt
scalastyle-output.xml
R-unit-tests.log
R/unit-tests.out
python/lib/pyspark.zip

# For Hive
metastore_db/
Expand Down
3 changes: 1 addition & 2 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ graphlib-dot.min.js
sorttable.js
vis.min.js
vis.min.css
vis.map
.*avsc
.*txt
.*json
Expand Down Expand Up @@ -80,6 +79,6 @@ local-1422981780767/*
local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1427397477963/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ Suggests:
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
'schema.R'
'generics.R'
'jobj.R'
'RDD.R'
'pairRDD.R'
'schema.R'
'column.R'
'group.R'
'DataFrame.R'
Expand Down
9 changes: 1 addition & 8 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ exportMethods("cache",
"collect",
"columns",
"count",
"describe",
"distinct",
"dtypes",
"except",
Expand All @@ -25,7 +26,6 @@ exportMethods("cache",
"intersect",
"isLocal",
"join",
"length",
"limit",
"orderBy",
"names",
Expand All @@ -44,8 +44,6 @@ exportMethods("cache",
"showDF",
"sortDF",
"take",
"toJSON",
"toRDD",
"unionAll",
"unpersist",
"where",
Expand Down Expand Up @@ -94,19 +92,14 @@ export("cacheTable",
"createExternalTable",
"dropTempTable",
"jsonFile",
"jsonRDD",
"loadDF",
"parquetFile",
"sql",
"table",
"tableNames",
"tables",
"toDF",
"uncacheTable")

export("sparkRSQL.init",
"sparkRHive.init")

export("structField",
"structField.jobj",
"structField.character",
Expand Down
134 changes: 88 additions & 46 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {

#' @rdname DataFrame
#' @export
#'
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the dataFrame is cached
dataFrame <- function(sdf, isCached = FALSE) {
new("DataFrame", sdf, isCached)
}
Expand Down Expand Up @@ -244,7 +247,7 @@ setMethod("columns",
})

#' @rdname columns
#' @export
#' @aliases names,DataFrame,function-method
setMethod("names",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -272,7 +275,7 @@ setMethod("names",
setMethod("registerTempTable",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName) {
callJMethod(x@sdf, "registerTempTable", tableName)
invisible(callJMethod(x@sdf, "registerTempTable", tableName))
})

#' insertInto
Expand Down Expand Up @@ -399,23 +402,23 @@ setMethod("repartition",
dataFrame(sdf)
})

#' toJSON
#'
#' Convert the rows of a DataFrame into JSON objects and return an RDD where
#' each element contains a JSON string.
#'
#' @param x A SparkSQL DataFrame
#' @return A StringRRDD of JSON objects
#' @rdname tojson
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newRDD <- toJSON(df)
#'}
# toJSON
#
# Convert the rows of a DataFrame into JSON objects and return an RDD where
# each element contains a JSON string.
#
#@param x A SparkSQL DataFrame
# @return A StringRRDD of JSON objects
# @rdname tojson
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# newRDD <- toJSON(df)
#}
setMethod("toJSON",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -578,8 +581,8 @@ setMethod("limit",
dataFrame(res)
})

# Take the first NUM rows of a DataFrame and return a the results as a data.frame

#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
#'
#' @rdname take
#' @export
#' @examples
Expand Down Expand Up @@ -644,22 +647,22 @@ setMethod("first",
take(x, 1)
})

#' toRDD()
#'
#' Converts a Spark DataFrame to an RDD while preserving column names.
#'
#' @param x A Spark DataFrame
#'
#' @rdname DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' rdd <- toRDD(df)
#' }
# toRDD()
#
# Converts a Spark DataFrame to an RDD while preserving column names.
#
# @param x A Spark DataFrame
#
# @rdname DataFrame
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# rdd <- toRDD(df)
# }
setMethod("toRDD",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -706,6 +709,7 @@ setMethod("groupBy",
#'
#' Compute aggregates by specifying a list of columns
#'
#' @param x a DataFrame
#' @rdname DataFrame
#' @export
setMethod("agg",
Expand All @@ -721,53 +725,53 @@ setMethod("agg",
# the requested map function. #
###################################################################################

#' @rdname lapply
# @rdname lapply
setMethod("lapply",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapply(rdd, FUN)
})

#' @rdname lapply
# @rdname lapply
setMethod("map",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})

#' @rdname flatMap
# @rdname flatMap
setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
flatMap(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapplyPartition(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreach",
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreach(rdd, func)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"),
function(x, func) {
Expand All @@ -788,6 +792,7 @@ setMethod("$", signature(x = "DataFrame"),
getColumn(x, name)
})

#' @rdname select
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
Expand Down Expand Up @@ -1009,7 +1014,7 @@ setMethod("sortDF",
})

#' @rdname sortDF
#' @export
#' @aliases orderBy,DataFrame,function-method
setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) {
Expand Down Expand Up @@ -1046,7 +1051,7 @@ setMethod("filter",
})

#' @rdname filter
#' @export
#' @aliases where,DataFrame,function-method
setMethod("where",
signature(x = "DataFrame", condition = "characterOrColumn"),
function(x, condition) {
Expand Down Expand Up @@ -1276,3 +1281,40 @@ setMethod("saveAsTable",
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
})

#' describe
#'
#' Computes statistics for numeric columns.
#' If no columns are given, this function computes statistics for all numerical columns.
#'
#' @param x A DataFrame to be computed.
#' @param col A string of name
#' @param ... Additional expressions
#' @return A DataFrame
#' @rdname describe
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' describe(df)
#' describe(df, "col1")
#' describe(df, "col1", "col2")
#' }
setMethod("describe",
signature(x = "DataFrame", col = "character"),
function(x, col, ...) {
colList <- list(col, ...)
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})

#' @rdname describe
setMethod("describe",
signature(x = "DataFrame"),
function(x) {
colList <- as.list(c(columns(x)))
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})
Loading

0 comments on commit 106fd8e

Please sign in to comment.