diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index b5c7d5969b083..b2d92bdf4840e 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -29,6 +29,7 @@ exportMethods("arrange", "count", "crosstab", "describe", + "dim", "distinct", "dropna", "dtypes", @@ -46,11 +47,15 @@ exportMethods("arrange", "join", "limit", "merge", + "names", + "ncol", + "nrow", "orderBy", "mutate", "names", "persist", "printSchema", + "rbind", "registerTempTable", "rename", "repartition", @@ -68,6 +73,7 @@ exportMethods("arrange", "summary", "take", "unionAll", + "unique", "unpersist", "where", "withColumn", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index a9992f2ea4dc9..005f66f3f1660 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -255,6 +255,16 @@ setMethod("names", columns(x) }) +#' @rdname columns +setMethod("names<-", + signature(x = "DataFrame"), + function(x, value) { + if (!is.null(value)) { + sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value))) + dataFrame(sdf) + } + }) + #' Register Temporary Table #' #' Registers a DataFrame as a Temporary Table in the SQLContext @@ -473,6 +483,18 @@ setMethod("distinct", dataFrame(sdf) }) +#' @title Distinct rows in a DataFrame +# +#' @description Returns a new DataFrame containing distinct rows in this DataFrame +#' +#' @rdname unique +#' @aliases unique +setMethod("unique", + signature(x = "DataFrame"), + function(x) { + distinct(x) + }) + #' Sample #' #' Return a sampled subset of this DataFrame using a random seed. @@ -534,6 +556,58 @@ setMethod("count", callJMethod(x@sdf, "count") }) +#' @title Number of rows for a DataFrame +#' @description Returns number of rows in a DataFrames +#' +#' @name nrow +#' +#' @rdname nrow +#' @aliases count +setMethod("nrow", + signature(x = "DataFrame"), + function(x) { + count(x) + }) + +#' Returns the number of columns in a DataFrame +#' +#' @param x a SparkSQL DataFrame +#' +#' @rdname ncol +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlContext, path) +#' ncol(df) +#' } +setMethod("ncol", + signature(x = "DataFrame"), + function(x) { + length(columns(x)) + }) + +#' Returns the dimentions (number of rows and columns) of a DataFrame +#' @param x a SparkSQL DataFrame +#' +#' @rdname dim +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlContext, path) +#' dim(df) +#' } +setMethod("dim", + signature(x = "DataFrame"), + function(x) { + c(count(x), ncol(x)) + }) + #' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame. #' #' @param x A SparkSQL DataFrame @@ -1240,6 +1314,22 @@ setMethod("unionAll", dataFrame(unioned) }) +#' @title Union two or more DataFrames +# +#' @description Returns a new DataFrame containing rows of all parameters. +# +#' @rdname rbind +#' @aliases unionAll +setMethod("rbind", + signature(... = "DataFrame"), + function(x, ..., deparse.level = 1) { + if (nargs() == 3) { + unionAll(x, ...) + } else { + unionAll(x, Recall(..., deparse.level = 1)) + } + }) + #' Intersect #' #' Return a new DataFrame containing rows only in both this DataFrame diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 2a013b3dbb968..051e441d4e063 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1264,12 +1264,12 @@ setMethod("pipeRDD", signature(x = "RDD", command = "character"), function(x, command, env = list()) { func <- function(part) { - trim.trailing.func <- function(x) { + trim_trailing_func <- function(x) { sub("[\r\n]*$", "", toString(x)) } - input <- unlist(lapply(part, trim.trailing.func)) + input <- unlist(lapply(part, trim_trailing_func)) res <- system2(command, stdout = TRUE, input = input, env = env) - lapply(res, trim.trailing.func) + lapply(res, trim_trailing_func) } lapplyPartition(x, func) }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index ed382528bf486..5f127d74fa1f9 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -677,3 +677,7 @@ setGeneric("upper", function(x) { standardGeneric("upper") }) #' @rdname glm #' @export setGeneric("glm") + +#' @rdname rbind +#' @export +setGeneric("rbind", signature = "...") diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index ddb9b6edd9904..64e3a8bb824a3 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -88,6 +88,9 @@ test_that("create DataFrame from RDD", { df <- createDataFrame(sqlContext, rdd, list("a", "b")) expect_is(df, "DataFrame") expect_equal(count(df), 10) + expect_equal(nrow(df), 10) + expect_equal(ncol(df), 2) + expect_equal(dim(df), c(10, 2)) expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) @@ -128,7 +131,9 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float"))) expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5)) - localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7)) + localDF <- data.frame(name=c("John", "Smith", "Sarah"), + age=c(19, 23, 18), + height=c(164.10, 181.4, 173.7)) df <- createDataFrame(sqlContext, localDF, schema) expect_is(df, "DataFrame") expect_equal(count(df), 3) @@ -489,7 +494,7 @@ test_that("head() and first() return the correct data", { expect_equal(nrow(testFirst), 1) }) -test_that("distinct() on DataFrames", { +test_that("distinct() and unique on DataFrames", { lines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"Justin\", \"age\":19}", @@ -501,6 +506,10 @@ test_that("distinct() on DataFrames", { uniques <- distinct(df) expect_is(uniques, "DataFrame") expect_equal(count(uniques), 3) + + uniques2 <- unique(df) + expect_is(uniques2, "DataFrame") + expect_equal(count(uniques2), 3) }) test_that("sample on a DataFrame", { @@ -819,7 +828,7 @@ test_that("isLocal()", { expect_false(isLocal(df)) }) -test_that("unionAll(), except(), and intersect() on a DataFrame", { +test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", { df <- jsonFile(sqlContext, jsonPath) lines <- c("{\"name\":\"Bob\", \"age\":24}", @@ -834,6 +843,11 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", { expect_equal(count(unioned), 6) expect_equal(first(unioned)$name, "Michael") + unioned2 <- arrange(rbind(unioned, df, df2), df$age) + expect_is(unioned2, "DataFrame") + expect_equal(count(unioned2), 12) + expect_equal(first(unioned2)$name, "Michael") + excepted <- arrange(except(df, df2), desc(df$age)) expect_is(unioned, "DataFrame") expect_equal(count(excepted), 2) @@ -857,7 +871,7 @@ test_that("withColumn() and withColumnRenamed()", { expect_equal(columns(newDF2)[1], "newerAge") }) -test_that("mutate() and rename()", { +test_that("mutate(), rename() and names()", { df <- jsonFile(sqlContext, jsonPath) newDF <- mutate(df, newAge = df$age + 2) expect_equal(length(columns(newDF)), 3) @@ -867,6 +881,10 @@ test_that("mutate() and rename()", { newDF2 <- rename(df, newerAge = df$age) expect_equal(length(columns(newDF2)), 2) expect_equal(columns(newDF2)[1], "newerAge") + + names(newDF2) <- c("newerName", "evenNewerAge") + expect_equal(length(names(newDF2)), 2) + expect_equal(names(newDF2)[1], "newerName") }) test_that("write.df() on DataFrame and works with parquetFile", { diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 86a7a4068c40e..4311c8c9e4ca6 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,13 +118,13 @@ if [[ ! "$@" =~ --skip-publish ]]; then rm -rf $SPARK_REPO - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-scala-version.sh 2.11 - build/mvn -DskipTests -Pyarn -Phive -Prelease\ + build/mvn -DskipTests -Pyarn -Phive \ -Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install diff --git a/docs/ml-guide.md b/docs/ml-guide.md index 8c46adf256a9a..b6ca50e98db02 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -561,7 +561,7 @@ test = sc.parallelize([(4L, "spark i j k"), prediction = model.transform(test) selected = prediction.select("id", "text", "prediction") for row in selected.collect(): - print row + print(row) sc.stop() {% endhighlight %} diff --git a/docs/mllib-evaluation-metrics.md b/docs/mllib-evaluation-metrics.md index 4ca0bb06b26a6..7066d5c97418c 100644 --- a/docs/mllib-evaluation-metrics.md +++ b/docs/mllib-evaluation-metrics.md @@ -302,10 +302,10 @@ predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp metrics = BinaryClassificationMetrics(predictionAndLabels) # Area under precision-recall curve -print "Area under PR = %s" % metrics.areaUnderPR +print("Area under PR = %s" % metrics.areaUnderPR) # Area under ROC curve -print "Area under ROC = %s" % metrics.areaUnderROC +print("Area under ROC = %s" % metrics.areaUnderROC) {% endhighlight %} @@ -606,24 +606,24 @@ metrics = MulticlassMetrics(predictionAndLabels) precision = metrics.precision() recall = metrics.recall() f1Score = metrics.fMeasure() -print "Summary Stats" -print "Precision = %s" % precision -print "Recall = %s" % recall -print "F1 Score = %s" % f1Score +print("Summary Stats") +print("Precision = %s" % precision) +print("Recall = %s" % recall) +print("F1 Score = %s" % f1Score) # Statistics by class labels = data.map(lambda lp: lp.label).distinct().collect() for label in sorted(labels): - print "Class %s precision = %s" % (label, metrics.precision(label)) - print "Class %s recall = %s" % (label, metrics.recall(label)) - print "Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)) + print("Class %s precision = %s" % (label, metrics.precision(label))) + print("Class %s recall = %s" % (label, metrics.recall(label))) + print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0))) # Weighted stats -print "Weighted recall = %s" % metrics.weightedRecall -print "Weighted precision = %s" % metrics.weightedPrecision -print "Weighted F(1) Score = %s" % metrics.weightedFMeasure() -print "Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5) -print "Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate +print("Weighted recall = %s" % metrics.weightedRecall) +print("Weighted precision = %s" % metrics.weightedPrecision) +print("Weighted F(1) Score = %s" % metrics.weightedFMeasure()) +print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5)) +print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate) {% endhighlight %} @@ -881,28 +881,28 @@ scoreAndLabels = sc.parallelize([ metrics = MultilabelMetrics(scoreAndLabels) # Summary stats -print "Recall = %s" % metrics.recall() -print "Precision = %s" % metrics.precision() -print "F1 measure = %s" % metrics.f1Measure() -print "Accuracy = %s" % metrics.accuracy +print("Recall = %s" % metrics.recall()) +print("Precision = %s" % metrics.precision()) +print("F1 measure = %s" % metrics.f1Measure()) +print("Accuracy = %s" % metrics.accuracy) # Individual label stats labels = scoreAndLabels.flatMap(lambda x: x[1]).distinct().collect() for label in labels: - print "Class %s precision = %s" % (label, metrics.precision(label)) - print "Class %s recall = %s" % (label, metrics.recall(label)) - print "Class %s F1 Measure = %s" % (label, metrics.f1Measure(label)) + print("Class %s precision = %s" % (label, metrics.precision(label))) + print("Class %s recall = %s" % (label, metrics.recall(label))) + print("Class %s F1 Measure = %s" % (label, metrics.f1Measure(label))) # Micro stats -print "Micro precision = %s" % metrics.microPrecision -print "Micro recall = %s" % metrics.microRecall -print "Micro F1 measure = %s" % metrics.microF1Measure +print("Micro precision = %s" % metrics.microPrecision) +print("Micro recall = %s" % metrics.microRecall) +print("Micro F1 measure = %s" % metrics.microF1Measure) # Hamming loss -print "Hamming loss = %s" % metrics.hammingLoss +print("Hamming loss = %s" % metrics.hammingLoss) # Subset accuracy -print "Subset accuracy = %s" % metrics.subsetAccuracy +print("Subset accuracy = %s" % metrics.subsetAccuracy) {% endhighlight %} @@ -1283,10 +1283,10 @@ scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1]) metrics = RegressionMetrics(scoreAndLabels) # Root mean sqaured error -print "RMSE = %s" % metrics.rootMeanSquaredError +print("RMSE = %s" % metrics.rootMeanSquaredError) # R-squared -print "R-squared = %s" % metrics.r2 +print("R-squared = %s" % metrics.r2) {% endhighlight %} @@ -1479,17 +1479,17 @@ valuesAndPreds = parsedData.map(lambda p: (float(model.predict(p.features)), p.l metrics = RegressionMetrics(valuesAndPreds) # Squared Error -print "MSE = %s" % metrics.meanSquaredError -print "RMSE = %s" % metrics.rootMeanSquaredError +print("MSE = %s" % metrics.meanSquaredError) +print("RMSE = %s" % metrics.rootMeanSquaredError) # R-squared -print "R-squared = %s" % metrics.r2 +print("R-squared = %s" % metrics.r2) # Mean absolute error -print "MAE = %s" % metrics.meanAbsoluteError +print("MAE = %s" % metrics.meanAbsoluteError) # Explained variance -print "Explained variance = %s" % metrics.explainedVariance +print("Explained variance = %s" % metrics.explainedVariance) {% endhighlight %} diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index a69e41e2a1936..de86aba2ae627 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -221,7 +221,7 @@ model = word2vec.fit(inp) synonyms = model.findSynonyms('china', 40) for word, cosine_distance in synonyms: - print "{}: {}".format(word, cosine_distance) + print("{}: {}".format(word, cosine_distance)) {% endhighlight %} diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index de5d6485f9b5f..be04d0b4b53a8 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -95,9 +95,9 @@ mat = ... # an RDD of Vectors # Compute column summary statistics. summary = Statistics.colStats(mat) -print summary.mean() -print summary.variance() -print summary.numNonzeros() +print(summary.mean()) +print(summary.variance()) +print(summary.numNonzeros()) {% endhighlight %} @@ -183,12 +183,12 @@ seriesY = ... # must have the same number of partitions and cardinality as serie # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a # method is not specified, Pearson's method will be used by default. -print Statistics.corr(seriesX, seriesY, method="pearson") +print(Statistics.corr(seriesX, seriesY, method="pearson")) data = ... # an RDD of Vectors # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method. # If a method is not specified, Pearson's method will be used by default. -print Statistics.corr(data, method="pearson") +print(Statistics.corr(data, method="pearson")) {% endhighlight %} @@ -398,14 +398,14 @@ vec = Vectors.dense(...) # a vector composed of the frequencies of events # compute the goodness of fit. If a second vector to test against is not supplied as a parameter, # the test runs against a uniform distribution. goodnessOfFitTestResult = Statistics.chiSqTest(vec) -print goodnessOfFitTestResult # summary of the test including the p-value, degrees of freedom, - # test statistic, the method used, and the null hypothesis. +print(goodnessOfFitTestResult) # summary of the test including the p-value, degrees of freedom, + # test statistic, the method used, and the null hypothesis. mat = Matrices.dense(...) # a contingency matrix # conduct Pearson's independence test on the input contingency matrix independenceTestResult = Statistics.chiSqTest(mat) -print independenceTestResult # summary of the test including the p-value, degrees of freedom... +print(independenceTestResult) # summary of the test including the p-value, degrees of freedom... obs = sc.parallelize(...) # LabeledPoint(feature, label) . @@ -415,8 +415,8 @@ obs = sc.parallelize(...) # LabeledPoint(feature, label) . featureTestResults = Statistics.chiSqTest(obs) for i, result in enumerate(featureTestResults): - print "Column $d:" % (i + 1) - print result + print("Column $d:" % (i + 1)) + print(result) {% endhighlight %} diff --git a/docs/quick-start.md b/docs/quick-start.md index bb39e4111f244..ce2cc9d2169cd 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -406,7 +406,7 @@ logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() -print "Lines with a: %i, lines with b: %i" % (numAs, numBs) +print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) {% endhighlight %} diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 95945eb7fc8a0..d31baa080cbce 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -570,7 +570,7 @@ teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 1 # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): - print teenName + print(teenName) {% endhighlight %} @@ -752,7 +752,7 @@ results = sqlContext.sql("SELECT name FROM people") # The results of SQL queries are RDDs and support all the normal RDD operations. names = results.map(lambda p: "Name: " + p.name) for name in names.collect(): - print name + print(name) {% endhighlight %} @@ -1006,7 +1006,7 @@ parquetFile.registerTempTable("parquetFile"); teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): - print teenName + print(teenName) {% endhighlight %} diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 2f3013b533eb0..4663b3f14c527 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1525,7 +1525,7 @@ def getSqlContextInstance(sparkContext): words = ... # DStream of strings def process(time, rdd): - print "========= %s =========" % str(time) + print("========= %s =========" % str(time)) try: # Get the singleton instance of SQLContext sqlContext = getSqlContextInstance(rdd.context) diff --git a/pom.xml b/pom.xml index e351c7c19df96..1371a1b6bd9f1 100644 --- a/pom.xml +++ b/pom.xml @@ -160,9 +160,6 @@ 2.4.4 1.1.1.7 1.1.2 - - false - ${java.home} - ${create.dependency.reduced.pom} @@ -1836,26 +1835,6 @@ - - - release - - - true - - -