Skip to content

Commit

Permalink
[SPARK-14014][SQL] Integrate session catalog (attempt #2)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This reopens apache#11836, which was merged but promptly reverted because it introduced flaky Hive tests.

## How was this patch tested?

See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes apache#11938 from andrewor14/session-catalog-again.
  • Loading branch information
Andrew Or committed Mar 25, 2016
1 parent 1c70b76 commit 20ddf5f
Show file tree
Hide file tree
Showing 61 changed files with 1,043 additions and 816 deletions.
3 changes: 2 additions & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", {

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table not found: blah", retError), TRUE)
expect_equal(grepl("Table not found", retError), TRUE)
expect_equal(grepl("blah", retError), TRUE)
})

irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance")
) ++ Seq(
// [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this")
) ++ Seq(
// [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"),
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def tableNames(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
>>> "table1" in sqlContext.tableNames("default")
True
"""
if dbName is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
* A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer
extends Analyzer(
EmptyCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true))
extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
class SimpleAnalyzer(conf: CatalystConf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
* [[UnresolvedRelation]]s into fully typed objects using information in a
* [[SessionCatalog]] and a [[FunctionRegistry]].
*/
class Analyzer(
catalog: Catalog,
catalog: SessionCatalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)

/**
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
* Holds the name of a relation that has yet to be looked up in a catalog.
*/
case class UnresolvedRelation(
tableIdentifier: TableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog {
names.filter { funcName => regex.pattern.matcher(funcName).matches() }
}

private def existsFunction(db: String, funcName: String): Boolean = {
private def functionExists(db: String, funcName: String): Boolean = {
requireDbExists(db)
catalog(db).functions.contains(funcName)
}

private def existsTable(db: String, table: String): Boolean = {
requireDbExists(db)
catalog(db).tables.contains(table)
}

private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = {
private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = {
requireTableExists(db, table)
catalog(db).tables(table).partitions.contains(spec)
}

private def requireFunctionExists(db: String, funcName: String): Unit = {
if (!existsFunction(db, funcName)) {
throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'")
if (!functionExists(db, funcName)) {
throw new AnalysisException(
s"Function not found: '$funcName' does not exist in database '$db'")
}
}

private def requireTableExists(db: String, table: String): Unit = {
if (!existsTable(db, table)) {
throw new AnalysisException(s"Table '$table' does not exist in database '$db'")
if (!tableExists(db, table)) {
throw new AnalysisException(
s"Table not found: '$table' does not exist in database '$db'")
}
}

private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
if (!existsPartition(db, table, spec)) {
if (!partitionExists(db, table, spec)) {
throw new AnalysisException(
s"Partition does not exist in database '$db' table '$table': '$spec'")
s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
}
}

Expand Down Expand Up @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog {
ignoreIfExists: Boolean): Unit = synchronized {
requireDbExists(db)
val table = tableDefinition.name.table
if (existsTable(db, table)) {
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
}
Expand All @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog {
table: String,
ignoreIfNotExists: Boolean): Unit = synchronized {
requireDbExists(db)
if (existsTable(db, table)) {
if (tableExists(db, table)) {
catalog(db).tables.remove(table)
} else {
if (!ignoreIfNotExists) {
Expand All @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).tables(table).table
}

override def tableExists(db: String, table: String): Boolean = synchronized {
requireDbExists(db)
catalog(db).tables.contains(table)
}

override def listTables(db: String): Seq[String] = synchronized {
requireDbExists(db)
catalog(db).tables.keySet.toSeq
}

override def listTables(db: String, pattern: String): Seq[String] = synchronized {
requireDbExists(db)
filterPattern(listTables(db), pattern)
}

Expand Down Expand Up @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog {

override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
requireDbExists(db)
if (existsFunction(db, func.name.funcName)) {
if (functionExists(db, func.name.funcName)) {
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
} else {
catalog(db).functions.put(func.name.funcName, func)
Expand Down
Loading

0 comments on commit 20ddf5f

Please sign in to comment.