Skip to content

Commit

Permalink
Merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
falaki committed Jul 31, 2015
2 parents ae1a4cf + 3fc0cb9 commit 30fbaf8
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 123 deletions.
6 changes: 6 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ exportMethods("arrange",
"count",
"crosstab",
"describe",
"dim",
"distinct",
"dropna",
"dtypes",
Expand All @@ -46,11 +47,15 @@ exportMethods("arrange",
"join",
"limit",
"merge",
"names",
"ncol",
"nrow",
"orderBy",
"mutate",
"names",
"persist",
"printSchema",
"rbind",
"registerTempTable",
"rename",
"repartition",
Expand All @@ -68,6 +73,7 @@ exportMethods("arrange",
"summary",
"take",
"unionAll",
"unique",
"unpersist",
"where",
"withColumn",
Expand Down
90 changes: 90 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ setMethod("names",
columns(x)
})

#' @rdname columns
setMethod("names<-",
signature(x = "DataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
dataFrame(sdf)
}
})

#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
Expand Down Expand Up @@ -473,6 +483,18 @@ setMethod("distinct",
dataFrame(sdf)
})

#' @title Distinct rows in a DataFrame
#
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
#'
#' @rdname unique
#' @aliases unique
setMethod("unique",
signature(x = "DataFrame"),
function(x) {
distinct(x)
})

#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
Expand Down Expand Up @@ -534,6 +556,58 @@ setMethod("count",
callJMethod(x@sdf, "count")
})

#' @title Number of rows for a DataFrame
#' @description Returns number of rows in a DataFrames
#'
#' @name nrow
#'
#' @rdname nrow
#' @aliases count
setMethod("nrow",
signature(x = "DataFrame"),
function(x) {
count(x)
})

#' Returns the number of columns in a DataFrame
#'
#' @param x a SparkSQL DataFrame
#'
#' @rdname ncol
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' ncol(df)
#' }
setMethod("ncol",
signature(x = "DataFrame"),
function(x) {
length(columns(x))
})

#' Returns the dimentions (number of rows and columns) of a DataFrame
#' @param x a SparkSQL DataFrame
#'
#' @rdname dim
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' dim(df)
#' }
setMethod("dim",
signature(x = "DataFrame"),
function(x) {
c(count(x), ncol(x))
})

#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
#'
#' @param x A SparkSQL DataFrame
Expand Down Expand Up @@ -1240,6 +1314,22 @@ setMethod("unionAll",
dataFrame(unioned)
})

#' @title Union two or more DataFrames
#
#' @description Returns a new DataFrame containing rows of all parameters.
#
#' @rdname rbind
#' @aliases unionAll
setMethod("rbind",
signature(... = "DataFrame"),
function(x, ..., deparse.level = 1) {
if (nargs() == 3) {
unionAll(x, ...)
} else {
unionAll(x, Recall(..., deparse.level = 1))
}
})

#' Intersect
#'
#' Return a new DataFrame containing rows only in both this DataFrame
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1264,12 +1264,12 @@ setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
func <- function(part) {
trim.trailing.func <- function(x) {
trim_trailing_func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim.trailing.func))
input <- unlist(lapply(part, trim_trailing_func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim.trailing.func)
lapply(res, trim_trailing_func)
}
lapplyPartition(x, func)
})
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,7 @@ setGeneric("upper", function(x) { standardGeneric("upper") })
#' @rdname glm
#' @export
setGeneric("glm")

#' @rdname rbind
#' @export
setGeneric("rbind", signature = "...")
26 changes: 22 additions & 4 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ test_that("create DataFrame from RDD", {
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_is(df, "DataFrame")
expect_equal(count(df), 10)
expect_equal(nrow(df), 10)
expect_equal(ncol(df), 2)
expect_equal(dim(df), c(10, 2))
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

Expand Down Expand Up @@ -128,7 +131,9 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
localDF <- data.frame(name=c("John", "Smith", "Sarah"),
age=c(19, 23, 18),
height=c(164.10, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
Expand Down Expand Up @@ -489,7 +494,7 @@ test_that("head() and first() return the correct data", {
expect_equal(nrow(testFirst), 1)
})

test_that("distinct() on DataFrames", {
test_that("distinct() and unique on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
Expand All @@ -501,6 +506,10 @@ test_that("distinct() on DataFrames", {
uniques <- distinct(df)
expect_is(uniques, "DataFrame")
expect_equal(count(uniques), 3)

uniques2 <- unique(df)
expect_is(uniques2, "DataFrame")
expect_equal(count(uniques2), 3)
})

test_that("sample on a DataFrame", {
Expand Down Expand Up @@ -819,7 +828,7 @@ test_that("isLocal()", {
expect_false(isLocal(df))
})

test_that("unionAll(), except(), and intersect() on a DataFrame", {
test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
df <- jsonFile(sqlContext, jsonPath)

lines <- c("{\"name\":\"Bob\", \"age\":24}",
Expand All @@ -834,6 +843,11 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
expect_equal(count(unioned), 6)
expect_equal(first(unioned)$name, "Michael")

unioned2 <- arrange(rbind(unioned, df, df2), df$age)
expect_is(unioned2, "DataFrame")
expect_equal(count(unioned2), 12)
expect_equal(first(unioned2)$name, "Michael")

excepted <- arrange(except(df, df2), desc(df$age))
expect_is(unioned, "DataFrame")
expect_equal(count(excepted), 2)
Expand All @@ -857,7 +871,7 @@ test_that("withColumn() and withColumnRenamed()", {
expect_equal(columns(newDF2)[1], "newerAge")
})

test_that("mutate() and rename()", {
test_that("mutate(), rename() and names()", {
df <- jsonFile(sqlContext, jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_equal(length(columns(newDF)), 3)
Expand All @@ -867,6 +881,10 @@ test_that("mutate() and rename()", {
newDF2 <- rename(df, newerAge = df$age)
expect_equal(length(columns(newDF2)), 2)
expect_equal(columns(newDF2)[1], "newerAge")

names(newDF2) <- c("newerName", "evenNewerAge")
expect_equal(length(names(newDF2)), 2)
expect_equal(names(newDF2)[1], "newerName")
})

test_that("write.df() on DataFrame and works with parquetFile", {
Expand Down
4 changes: 2 additions & 2 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then

rm -rf $SPARK_REPO

build/mvn -DskipTests -Pyarn -Phive -Prelease\
build/mvn -DskipTests -Pyarn -Phive \
-Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
clean install

./dev/change-scala-version.sh 2.11

build/mvn -DskipTests -Pyarn -Phive -Prelease\
build/mvn -DskipTests -Pyarn -Phive \
-Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
clean install

Expand Down
2 changes: 1 addition & 1 deletion docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ test = sc.parallelize([(4L, "spark i j k"),
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print row
print(row)

sc.stop()
{% endhighlight %}
Expand Down
Loading

0 comments on commit 30fbaf8

Please sign in to comment.