Skip to content

Commit

Permalink
[SPARK-16693][SPARKR] Remove methods deprecated
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Remove deprecated functions which includes:
SQLContext/HiveContext stuff
sparkR.init
jsonFile
parquetFile
registerTempTable
saveAsParquetFile
unionAll
createExternalTable
dropTempTable

## How was this patch tested?

jenkins

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes apache#22843 from felixcheung/rrddapi.
  • Loading branch information
felixcheung authored and Felix Cheung committed Oct 27, 2018
1 parent d5573c5 commit 41e1416
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 453 deletions.
11 changes: 1 addition & 10 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ importFrom("utils", "download.file", "object.size", "packageVersion", "tail", "u

# S3 methods exported
export("sparkR.session")
export("sparkR.init")
export("sparkR.stop")
export("sparkR.session.stop")
export("sparkR.stop")
export("sparkR.conf")
export("sparkR.version")
export("sparkR.uiWebUrl")
Expand All @@ -42,9 +41,6 @@ export("sparkR.callJStatic")

export("install.spark")

export("sparkRSQL.init",
"sparkRHive.init")

# MLlib integration
exportMethods("glm",
"spark.glm",
Expand Down Expand Up @@ -151,15 +147,13 @@ exportMethods("arrange",
"printSchema",
"randomSplit",
"rbind",
"registerTempTable",
"rename",
"repartition",
"repartitionByRange",
"rollup",
"sample",
"sample_frac",
"sampleBy",
"saveAsParquetFile",
"saveAsTable",
"saveDF",
"schema",
Expand All @@ -175,7 +169,6 @@ exportMethods("arrange",
"toJSON",
"transform",
"union",
"unionAll",
"unionByName",
"unique",
"unpersist",
Expand Down Expand Up @@ -415,10 +408,8 @@ export("as.DataFrame",
"cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
"listColumns",
"listDatabases",
Expand Down
52 changes: 1 addition & 51 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ setMethod("showDF",
#' show
#'
#' If eager evaluation is enabled and the Spark object is a SparkDataFrame, evaluate the
#' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
#' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
#' and type information of the Spark object.
#'
#' @param object a Spark object. Can be a SparkDataFrame, Column, GroupedData, WindowSpec.
Expand Down Expand Up @@ -521,32 +521,6 @@ setMethod("createOrReplaceTempView",
invisible(callJMethod(x@sdf, "createOrReplaceTempView", viewName))
})

#' (Deprecated) Register Temporary Table
#'
#' Registers a SparkDataFrame as a Temporary Table in the SparkSession
#' @param x A SparkDataFrame
#' @param tableName A character vector containing the name of the table
#'
#' @seealso \link{createOrReplaceTempView}
#' @rdname registerTempTable-deprecated
#' @name registerTempTable
#' @aliases registerTempTable,SparkDataFrame,character-method
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' registerTempTable(df, "json_df")
#' new_df <- sql("SELECT * FROM json_df")
#'}
#' @note registerTempTable since 1.4.0
setMethod("registerTempTable",
signature(x = "SparkDataFrame", tableName = "character"),
function(x, tableName) {
.Deprecated("createOrReplaceTempView")
invisible(callJMethod(x@sdf, "createOrReplaceTempView", tableName))
})

#' insertInto
#'
#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
Expand Down Expand Up @@ -956,7 +930,6 @@ setMethod("write.orc",
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
Expand All @@ -967,17 +940,6 @@ setMethod("write.parquet",
invisible(handledCallJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @aliases saveAsParquetFile,SparkDataFrame,character-method
#' @note saveAsParquetFile since 1.4.0
setMethod("saveAsParquetFile",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Save the content of SparkDataFrame in a text file at the specified path.
#'
#' Save the content of the SparkDataFrame in a text file at the specified path.
Expand Down Expand Up @@ -2762,18 +2724,6 @@ setMethod("union",
dataFrame(unioned)
})

#' unionAll is deprecated - use union instead
#' @rdname union
#' @name unionAll
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
#' @note unionAll since 1.4.0
setMethod("unionAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
.Deprecated("union")
union(x, y)
})

#' Return a new SparkDataFrame containing the union of rows, matched by column names
#'
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
Expand Down
95 changes: 10 additions & 85 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,6 @@ getInternalType <- function(x) {
stop(paste("Unsupported type for SparkDataFrame:", class(x))))
}

#' Temporary function to reroute old S3 Method call to new
#' This function is specifically implemented to remove SQLContext from the parameter list.
#' It determines the target to route the call by checking the parent of this callsite (say 'func').
#' The target should be called 'func.default'.
#' We need to check the class of x to ensure it is SQLContext/HiveContext before dispatching.
#' @param newFuncSig name of the function the user should call instead in the deprecation message
#' @param x the first parameter of the original call
#' @param ... the rest of parameter to pass along
#' @return whatever the target returns
#' @noRd
dispatchFunc <- function(newFuncSig, x, ...) {
# When called with SparkR::createDataFrame, sys.call()[[1]] returns c(::, SparkR, createDataFrame)
callsite <- as.character(sys.call(sys.parent())[[1]])
funcName <- callsite[[length(callsite)]]
f <- get(paste0(funcName, ".default"))
# Strip sqlContext from list of parameters and then pass the rest along.
contextNames <- c("org.apache.spark.sql.SQLContext",
"org.apache.spark.sql.hive.HiveContext",
"org.apache.spark.sql.hive.test.TestHiveContext",
"org.apache.spark.sql.SparkSession")
if (missing(x) && length(list(...)) == 0) {
f()
} else if (class(x) == "jobj" &&
any(grepl(paste(contextNames, collapse = "|"), getClassName.jobj(x)))) {
.Deprecated(newFuncSig, old = paste0(funcName, "(sqlContext...)"))
f(...)
} else {
f(x, ...)
}
}

#' return the SparkSession
#' @noRd
getSparkSession <- function() {
Expand Down Expand Up @@ -198,11 +167,10 @@ getDefaultSqlSource <- function() {
#' df4 <- createDataFrame(cars, numPartitions = 2)
#' }
#' @name createDataFrame
#' @method createDataFrame default
#' @note createDataFrame since 1.4.0
# TODO(davies): support sampling and infer type from NA
createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
sparkSession <- getSparkSession()

if (is.data.frame(data)) {
Expand Down Expand Up @@ -285,31 +253,18 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
dataFrame(sdf)
}

createDataFrame <- function(x, ...) {
dispatchFunc("createDataFrame(data, schema = NULL)", x, ...)
}

#' @rdname createDataFrame
#' @aliases createDataFrame
#' @method as.DataFrame default
#' @note as.DataFrame since 1.6.0
as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
as.DataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
createDataFrame(data, schema, samplingRatio, numPartitions)
}

#' @param ... additional argument(s).
#' @rdname createDataFrame
#' @aliases as.DataFrame
as.DataFrame <- function(data, ...) {
dispatchFunc("as.DataFrame(data, schema = NULL)", data, ...)
}

#' toDF
#'
#' Converts an RDD to a SparkDataFrame by infer the types.
#'
#' @param x An RDD
#'
#' @rdname SparkDataFrame
#' @noRd
#' @examples
Expand Down Expand Up @@ -345,9 +300,8 @@ setMethod("toDF", signature(x = "RDD"),
#' df <- read.json(path, multiLine = TRUE)
#' }
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
read.json.default <- function(path, ...) {
read.json <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the text file path
Expand All @@ -358,10 +312,6 @@ read.json.default <- function(path, ...) {
dataFrame(sdf)
}

read.json <- function(x, ...) {
dispatchFunc("read.json(path)", x, ...)
}

#' Create a SparkDataFrame from an ORC file.
#'
#' Loads an ORC file, returning the result as a SparkDataFrame.
Expand All @@ -388,13 +338,12 @@ read.orc <- function(path, ...) {
#' Loads a Parquet file, returning the result as a SparkDataFrame.
#'
#' @param path path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @param ... additional data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.parquet
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
read.parquet.default <- function(path, ...) {
read.parquet <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the Parquet file path
Expand All @@ -405,10 +354,6 @@ read.parquet.default <- function(path, ...) {
dataFrame(sdf)
}

read.parquet <- function(x, ...) {
dispatchFunc("read.parquet(...)", x, ...)
}

#' Create a SparkDataFrame from a text file.
#'
#' Loads text files and returns a SparkDataFrame whose schema starts with
Expand All @@ -428,9 +373,8 @@ read.parquet <- function(x, ...) {
#' df <- read.text(path)
#' }
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
read.text.default <- function(path, ...) {
read.text <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the text file path
Expand All @@ -441,10 +385,6 @@ read.text.default <- function(path, ...) {
dataFrame(sdf)
}

read.text <- function(x, ...) {
dispatchFunc("read.text(path)", x, ...)
}

#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a SparkDataFrame.
Expand All @@ -461,18 +401,13 @@ read.text <- function(x, ...) {
#' new_df <- sql("SELECT * FROM table")
#' }
#' @name sql
#' @method sql default
#' @note sql since 1.4.0
sql.default <- function(sqlQuery) {
sql <- function(sqlQuery) {
sparkSession <- getSparkSession()
sdf <- callJMethod(sparkSession, "sql", sqlQuery)
dataFrame(sdf)
}

sql <- function(x, ...) {
dispatchFunc("sql(sqlQuery)", x, ...)
}

#' Create a SparkDataFrame from a SparkSQL table or view
#'
#' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
Expand Down Expand Up @@ -531,9 +466,8 @@ tableToDF <- function(tableName) {
#' df4 <- read.df(mapTypeJsonPath, "json", stringSchema, multiLine = TRUE)
#' }
#' @name read.df
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
read.df <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be character, NULL or omitted.")
}
Expand Down Expand Up @@ -568,22 +502,13 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
dataFrame(sdf)
}

read.df <- function(x = NULL, ...) {
dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' @rdname read.df
#' @name loadDF
#' @method loadDF default
#' @note loadDF since 1.6.0
loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
loadDF <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df(path, source, schema, ...)
}

loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' Create a SparkDataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
Expand Down
Loading

0 comments on commit 41e1416

Please sign in to comment.