From 20ddf5fddf40b543edc61d6e4687988489dea64c Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 24 Mar 2016 22:59:35 -0700 Subject: [PATCH] [SPARK-14014][SQL] Integrate session catalog (attempt #2) ## What changes were proposed in this pull request? This reopens #11836, which was merged but promptly reverted because it introduced flaky Hive tests. ## How was this patch tested? See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`. Author: Andrew Or Closes #11938 from andrewor14/session-catalog-again. --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 3 +- project/MimaExcludes.scala | 3 + python/pyspark/sql/context.py | 2 +- .../sql/catalyst/analysis/Analyzer.scala | 20 +- .../spark/sql/catalyst/analysis/Catalog.scala | 218 -------- .../sql/catalyst/analysis/unresolved.scala | 2 +- .../catalyst/catalog/InMemoryCatalog.scala | 35 +- .../sql/catalyst/catalog/SessionCatalog.scala | 123 +++-- .../sql/catalyst/catalog/interface.scala | 2 + .../sql/catalyst/analysis/AnalysisSuite.scala | 6 +- .../sql/catalyst/analysis/AnalysisTest.scala | 23 +- .../analysis/DecimalPrecisionSuite.scala | 25 +- .../catalyst/catalog/CatalogTestCases.scala | 3 +- .../catalog/SessionCatalogSuite.scala | 20 +- .../BooleanSimplificationSuite.scala | 11 +- .../optimizer/EliminateSortsSuite.scala | 5 +- .../org/apache/spark/sql/SQLContext.scala | 73 ++- .../sql/execution/command/commands.scala | 8 +- .../spark/sql/execution/datasources/ddl.scala | 24 +- .../sql/execution/datasources/rules.scala | 10 +- .../spark/sql/internal/SessionState.scala | 7 +- .../apache/spark/sql/ListTablesSuite.scala | 15 +- .../apache/spark/sql/SQLContextSuite.scala | 9 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 22 +- .../parquet/ParquetQuerySuite.scala | 6 +- .../apache/spark/sql/test/SQLTestUtils.scala | 4 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 3 +- .../sql/hive/thriftserver/CliSuite.scala | 5 +- .../execution/HiveCompatibilitySuite.scala | 23 +- .../apache/spark/sql/hive/HiveCatalog.scala | 5 +- .../apache/spark/sql/hive/HiveContext.scala | 498 ++++++++++-------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 60 +-- .../spark/sql/hive/HiveSessionCatalog.scala | 104 ++++ .../spark/sql/hive/HiveSessionState.scala | 10 +- .../spark/sql/hive/client/HiveClient.scala | 3 - .../sql/hive/client/HiveClientImpl.scala | 4 - .../hive/execution/CreateTableAsSelect.scala | 4 +- .../hive/execution/CreateViewAsSelect.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 14 +- .../spark/sql/hive/execution/commands.scala | 9 +- .../apache/spark/sql/hive/test/TestHive.scala | 172 ++++-- .../hive/JavaMetastoreDataSourcesSuite.java | 5 +- .../sql/hive/test/TestHiveSingleton.scala | 38 ++ .../spark/sql/hive/ExpressionToSQLSuite.scala | 11 +- .../spark/sql/hive/HiveContextSuite.scala | 37 ++ .../hive/HiveDataFrameAnalyticsSuite.scala | 7 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 10 +- .../spark/sql/hive/ListTablesSuite.scala | 17 +- .../sql/hive/LogicalPlanToSQLSuite.scala | 15 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 32 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 +- .../spark/sql/hive/StatisticsSuite.scala | 3 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 - .../execution/AggregationQuerySuite.scala | 12 +- .../sql/hive/execution/HiveQuerySuite.scala | 16 +- .../sql/hive/execution/HiveUDFSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- .../sql/hive/execution/WindowQuerySuite.scala | 7 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../spark/sql/hive/orc/OrcSourceSuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 25 +- 61 files changed, 1043 insertions(+), 816 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala create mode 100644 sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 63acbadfa6a16..eef365b42e56d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", { test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) - expect_equal(grepl("Table not found: blah", retError), TRUE) + expect_equal(grepl("Table not found", retError), TRUE) + expect_equal(grepl("blah", retError), TRUE) }) irisDF <- suppressWarnings(createDataFrame(sqlContext, iris)) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 42eafcb0f52d0..915898389ca00 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -562,6 +562,9 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.Logging.initializeLogIfNecessary"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance") + ) ++ Seq( + // [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this") ) ++ Seq( // [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"), diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 9c2f6a3c5660f..4008332c84d0a 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -554,7 +554,7 @@ def tableNames(self, dbName=None): >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() True - >>> "table1" in sqlContext.tableNames("db") + >>> "table1" in sqlContext.tableNames("default") True """ if dbName is None: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d0a31e7620bb0..b344e041a5721 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing - * when all relations are already filled in and the analyzer needs only to resolve attribute - * references. + * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. + * Used for testing when all relations are already filled in and the analyzer needs only + * to resolve attribute references. */ object SimpleAnalyzer - extends Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = true)) + extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true)) +class SimpleAnalyzer(conf: CatalystConf) + extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and - * a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a + * [[SessionCatalog]] and a [[FunctionRegistry]]. */ class Analyzer( - catalog: Catalog, + catalog: SessionCatalog, registry: FunctionRegistry, conf: CatalystConf, maxIterations: Int = 100) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala deleted file mode 100644 index 2f0a4dbc107aa..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} - - -/** - * An interface for looking up relations by name. Used by an [[Analyzer]]. - */ -trait Catalog { - - val conf: CatalystConf - - def tableExists(tableIdent: TableIdentifier): Boolean - - def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan - - def setCurrentDatabase(databaseName: String): Unit = { - throw new UnsupportedOperationException - } - - /** - * Returns tuples of (tableName, isTemporary) for all tables in the given database. - * isTemporary is a Boolean value indicates if a table is a temporary or not. - */ - def getTables(databaseName: Option[String]): Seq[(String, Boolean)] - - def refreshTable(tableIdent: TableIdentifier): Unit - - def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit - - def unregisterTable(tableIdent: TableIdentifier): Unit - - def unregisterAllTables(): Unit - - /** - * Get the table name of TableIdentifier for temporary tables. - */ - protected def getTableName(tableIdent: TableIdentifier): String = { - // It is not allowed to specify database name for temporary tables. - // We check it here and throw exception if database is defined. - if (tableIdent.database.isDefined) { - throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + - "for temporary tables. If the table name has dots (.) in it, please quote the " + - "table name with backticks (`).") - } - if (conf.caseSensitiveAnalysis) { - tableIdent.table - } else { - tableIdent.table.toLowerCase - } - } -} - -class SimpleCatalog(val conf: CatalystConf) extends Catalog { - private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - tables.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - tables.remove(getTableName(tableIdent)) - } - - override def unregisterAllTables(): Unit = { - tables.clear() - } - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - tables.containsKey(getTableName(tableIdent)) - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - val tableName = getTableName(tableIdent) - val table = tables.get(tableName) - if (table == null) { - throw new AnalysisException("Table not found: " + tableName) - } - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are - // properly qualified with this alias. - alias - .map(a => SubqueryAlias(a, qualifiedTable)) - .getOrElse(qualifiedTable) - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - tables.keySet().asScala.map(_ -> true).toSeq - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} - -/** - * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with - * new logical plans. This can be used to bind query result to virtual tables, or replace tables - * with in-memory cached versions. Note that the set of overrides is stored in memory and thus - * lost when the JVM exits. - */ -trait OverrideCatalog extends Catalog { - private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] - - private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { - if (tableIdent.database.isDefined) { - None - } else { - Option(overrides.get(getTableName(tableIdent))) - } - } - - abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { - getOverriddenTable(tableIdent) match { - case Some(_) => true - case None => super.tableExists(tableIdent) - } - } - - abstract override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - getOverriddenTable(tableIdent) match { - case Some(table) => - val tableName = getTableName(tableIdent) - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes - // are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - - case None => super.lookupRelation(tableIdent, alias) - } - } - - abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - overrides.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - if (tableIdent.database.isEmpty) { - overrides.remove(getTableName(tableIdent)) - } - } - - override def unregisterAllTables(): Unit = { - overrides.clear() - } -} - -/** - * A trivial catalog that returns an error when a relation is requested. Used for testing when all - * relations are already filled in and the analyzer needs only to resolve attribute references. - */ -object EmptyCatalog extends Catalog { - - override val conf: CatalystConf = EmptyConf - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - throw new UnsupportedOperationException - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - throw new UnsupportedOperationException - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - throw new UnsupportedOperationException - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterAllTables(): Unit = { - throw new UnsupportedOperationException - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 9518309fbf8ea..e73d367a730e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) /** - * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. + * Holds the name of a relation that has yet to be looked up in a catalog. */ case class UnresolvedRelation( tableIdentifier: TableIdentifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 7ead1ddebe852..e216fa552804b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog { names.filter { funcName => regex.pattern.matcher(funcName).matches() } } - private def existsFunction(db: String, funcName: String): Boolean = { + private def functionExists(db: String, funcName: String): Boolean = { requireDbExists(db) catalog(db).functions.contains(funcName) } - private def existsTable(db: String, table: String): Boolean = { - requireDbExists(db) - catalog(db).tables.contains(table) - } - - private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { + private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } private def requireFunctionExists(db: String, funcName: String): Unit = { - if (!existsFunction(db, funcName)) { - throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") + if (!functionExists(db, funcName)) { + throw new AnalysisException( + s"Function not found: '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { - if (!existsTable(db, table)) { - throw new AnalysisException(s"Table '$table' does not exist in database '$db'") + if (!tableExists(db, table)) { + throw new AnalysisException( + s"Table not found: '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!existsPartition(db, table, spec)) { + if (!partitionExists(db, table, spec)) { throw new AnalysisException( - s"Partition does not exist in database '$db' table '$table': '$spec'") + s"Partition not found: database '$db' table '$table' does not contain: '$spec'") } } @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) val table = tableDefinition.name.table - if (existsTable(db, table)) { + if (tableExists(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog { table: String, ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) - if (existsTable(db, table)) { + if (tableExists(db, table)) { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } + override def tableExists(db: String, table: String): Boolean = synchronized { + requireDbExists(db) + catalog(db).tables.contains(table) + } + override def listTables(db: String): Seq[String] = synchronized { requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { - requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (existsFunction(db, func.name.funcName)) { + if (functionExists(db, func.name.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { catalog(db).functions.put(func.name.funcName, func) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 3ac2bcf7e8d03..34265faa74399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -31,17 +32,34 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -class SessionCatalog(externalCatalog: ExternalCatalog) { +class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { import ExternalCatalog._ - private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + def this(externalCatalog: ExternalCatalog) { + this(externalCatalog, new SimpleCatalystConf(true)) + } + + protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. - private[this] var currentDb = "default" + protected[this] var currentDb = { + val defaultName = "default" + val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) + // Initialize default database if it doesn't already exist + createDatabase(defaultDbDefinition, ignoreIfExists = true) + defaultName + } + + /** + * Format table name, taking into account case sensitivity. + */ + protected[this] def formatTableName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } // ---------------------------------------------------------------------------- // Databases @@ -105,8 +123,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -121,8 +139,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterTable(tableDefinition: CatalogTable): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -132,7 +150,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getTable(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) - externalCatalog.getTable(db, name.table) + val table = formatTableName(name.table) + externalCatalog.getTable(db, table) } // ------------------------------------------------------------- @@ -146,10 +165,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { name: String, tableDefinition: LogicalPlan, ignoreIfExists: Boolean): Unit = { - if (tempTables.containsKey(name) && !ignoreIfExists) { + val table = formatTableName(name) + if (tempTables.containsKey(table) && !ignoreIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } - tempTables.put(name, tableDefinition) + tempTables.put(table, tableDefinition) } /** @@ -166,11 +186,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { throw new AnalysisException("rename does not support moving tables across databases") } val db = oldName.database.getOrElse(currentDb) - if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { - externalCatalog.renameTable(db, oldName.table, newName.table) + val oldTableName = formatTableName(oldName.table) + val newTableName = formatTableName(newName.table) + if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { + externalCatalog.renameTable(db, oldTableName, newTableName) } else { - val table = tempTables.remove(oldName.table) - tempTables.put(newName.table, table) + val table = tempTables.remove(oldTableName) + tempTables.put(newTableName, table) } } @@ -183,10 +205,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - externalCatalog.dropTable(db, name.table, ignoreIfNotExists) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.dropTable(db, table, ignoreIfNotExists) } else { - tempTables.remove(name.table) + tempTables.remove(table) } } @@ -199,28 +222,43 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - val metadata = externalCatalog.getTable(db, name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + val metadata = externalCatalog.getTable(db, table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(name.table) + tempTables.get(table) } - val qualifiedTable = SubqueryAlias(name.table, relation) + val qualifiedTable = SubqueryAlias(table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } /** - * List all tables in the specified database, including temporary tables. + * Return whether a table with the specified name exists. + * + * Note: If a database is explicitly specified, then this will return whether the table + * exists in that particular database instead. In that case, even if there is a temporary + * table with the same name, we will return false if the specified database does not + * contain the table. */ - def listTables(db: String): Seq[TableIdentifier] = { - val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } - val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } - dbTables ++ _tempTables + def tableExists(name: TableIdentifier): Boolean = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.tableExists(db, table) + } else { + true // it's a temporary table + } } + /** + * List all tables in the specified database, including temporary tables. + */ + def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + /** * List all matching tables in the specified database, including temporary tables. */ @@ -234,6 +272,19 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { dbTables ++ _tempTables } + /** + * Refresh the cache entry for a metastore table, if any. + */ + def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } + + /** + * Drop all existing temporary tables. + * For testing only. + */ + def clearTempTables(): Unit = { + tempTables.clear() + } + /** * Return a temporary table exactly as it was stored. * For testing only. @@ -263,7 +314,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) + val table = formatTableName(tableName.table) + externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } /** @@ -275,7 +327,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) + val table = formatTableName(tableName.table) + externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } /** @@ -289,7 +342,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) + val table = formatTableName(tableName.table) + externalCatalog.renamePartitions(db, table, specs, newSpecs) } /** @@ -303,7 +357,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.alterPartitions(db, tableName.table, parts) + val table = formatTableName(tableName.table) + externalCatalog.alterPartitions(db, table, parts) } /** @@ -312,7 +367,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.getPartition(db, tableName.table, spec) + val table = formatTableName(tableName.table) + externalCatalog.getPartition(db, table, spec) } /** @@ -321,7 +377,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.listPartitions(db, tableName.table) + val table = formatTableName(tableName.table) + externalCatalog.listPartitions(db, table) } // ---------------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c4e49614c5c35..34803133f6a61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,6 +91,8 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable + def tableExists(db: String, table: String): Boolean + def listTables(db: String): Seq[String] def listTables(db: String, pattern: String): Seq[String] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9563f43259fbe..346e0524371d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest { } test("resolve relations") { - assertAnalysisError( - UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) - + assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq()) checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) - checkAnalysis( UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) - checkAnalysis( UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 39166c4f8ef73..6fa4beed99267 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -18,26 +18,21 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { - val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true) + protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false) - val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) - val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) - - caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - - new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { - override val extendedResolutionRules = EliminateSubqueryAliases :: Nil - } -> - new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) { + private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { + val conf = new SimpleCatalystConf(caseSensitive) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) + catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) + new Analyzer(catalog, EmptyFunctionRegistry, conf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 9aa685e1e8f55..31501864a8e13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -30,11 +31,11 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val catalog = new SimpleCatalog(conf) - val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) + private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + private val catalog = new SessionCatalog(new InMemoryCatalog, conf) + private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) - val relation = LocalRelation( + private val relation = LocalRelation( AttributeReference("i", IntegerType)(), AttributeReference("d1", DecimalType(2, 1))(), AttributeReference("d2", DecimalType(5, 2))(), @@ -43,15 +44,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { AttributeReference("b", DoubleType)() ) - val i: Expression = UnresolvedAttribute("i") - val d1: Expression = UnresolvedAttribute("d1") - val d2: Expression = UnresolvedAttribute("d2") - val u: Expression = UnresolvedAttribute("u") - val f: Expression = UnresolvedAttribute("f") - val b: Expression = UnresolvedAttribute("b") + private val i: Expression = UnresolvedAttribute("i") + private val d1: Expression = UnresolvedAttribute("d1") + private val d2: Expression = UnresolvedAttribute("d2") + private val u: Expression = UnresolvedAttribute("u") + private val f: Expression = UnresolvedAttribute("f") + private val b: Expression = UnresolvedAttribute("b") before { - catalog.registerTable(TableIdentifier("table"), relation) + catalog.createTempTable("table", relation, ignoreIfExists = true) } private def checkType(expression: Expression, expectedType: DataType): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index a1ea61920dd68..277c2d717e3dd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -225,13 +225,14 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("list tables without pattern") { val catalog = newBasicCatalog() + intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1").toSet == Set.empty) assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) } test("list tables with pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } + intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e1973ee258235..74e995cc5b4b9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -397,6 +397,24 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } + test("table exists") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) + assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) + assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) + // If database is explicitly specified, do not check temporary tables + val tempTable = Range(1, 10, 1, 10, Seq()) + catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + // If database is not explicitly specified, check the current database + catalog.setCurrentDatabase("db2") + assert(catalog.tableExists(TableIdentifier("tbl1"))) + assert(catalog.tableExists(TableIdentifier("tbl2"))) + assert(catalog.tableExists(TableIdentifier("tbl3"))) + } + test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) @@ -429,7 +447,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) intercept[AnalysisException] { - catalog.listTables("unknown_db") + catalog.listTables("unknown_db", "*") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 2ab31eea8ab38..e2c76b700f51c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -137,11 +138,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } - private val caseInsensitiveAnalyzer = - new Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = false)) + private val caseInsensitiveConf = new SimpleCatalystConf(false) + private val caseInsensitiveAnalyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf), + EmptyFunctionRegistry, + caseInsensitiveConf) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index a4c8d1c6d2aa8..3824c675630c4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) - val catalog = new SimpleCatalog(conf) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 853a74c827d47..e413e77bc1349 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,13 +25,14 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} @@ -65,13 +66,14 @@ class SQLContext private[sql]( @transient val sparkContext: SparkContext, @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, - val isRootContext: Boolean) + val isRootContext: Boolean, + @transient private[sql] val externalCatalog: ExternalCatalog) extends Logging with Serializable { self => - def this(sparkContext: SparkContext) = { - this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) + def this(sc: SparkContext) = { + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -109,7 +111,8 @@ class SQLContext private[sql]( sparkContext = sparkContext, cacheManager = cacheManager, listener = listener, - isRootContext = false) + isRootContext = false, + externalCatalog = externalCatalog) } /** @@ -186,6 +189,12 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs + // Extract `spark.sql.*` entries and put it in our SQLConf. + // Subclasses may additionally set these entries in other confs. + SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) => + setConf(k, v) + } + protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql) protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) @@ -199,30 +208,6 @@ class SQLContext private[sql]( sparkContext.addJar(path) } - { - // We extract spark sql settings from SparkContext's conf and put them to - // Spark SQL's conf. - // First, we populate the SQLConf (conf). So, we can make sure that other values using - // those settings in their construction can get the correct settings. - // For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version - // and spark.sql.hive.metastore.jars to get correctly constructed. - val properties = new Properties - sparkContext.getConf.getAll.foreach { - case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value) - case _ => - } - // We directly put those settings to conf to avoid of calling setConf, which may have - // side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive - // get constructed. If we call setConf directly, the constructed metadataHive may have - // wrong settings, or the construction may fail. - conf.setConf(properties) - // After we have populated SQLConf, we call setConf to populate other confs in the subclass - // (e.g. hiveconf in HiveContext). - properties.asScala.foreach { - case (key, value) => setConf(key, value) - } - } - /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into @@ -683,8 +668,10 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sessionState.catalog.registerTable( - sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan) + sessionState.catalog.createTempTable( + sessionState.sqlParser.parseTableIdentifier(tableName).table, + df.logicalPlan, + ignoreIfExists = true) } /** @@ -697,7 +684,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - sessionState.catalog.unregisterTable(TableIdentifier(tableName)) + sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true) } /** @@ -824,9 +811,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(): Array[String] = { - sessionState.catalog.getTables(None).map { - case (tableName, _) => tableName - }.toArray + tableNames(sessionState.catalog.getCurrentDatabase) } /** @@ -836,9 +821,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { - sessionState.catalog.getTables(Some(databaseName)).map { - case (tableName, _) => tableName - }.toArray + sessionState.catalog.listTables(databaseName).map(_.table).toArray } @transient @@ -1025,4 +1008,18 @@ object SQLContext { } sqlListener.get() } + + /** + * Extract `spark.sql.*` properties from the conf and return them as a [[Properties]]. + */ + private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = { + val properties = new Properties + sparkConf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.sql")) { + properties.setProperty(key, value) + } + } + properties + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 59c3ffcf488c7..964f0a7a7b4e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -339,10 +339,12 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma override def run(sqlContext: SQLContext): Seq[Row] = { // Since we need to return a Seq of rows, we will call getTables directly // instead of calling tables in sqlContext. - val rows = sqlContext.sessionState.catalog.getTables(databaseName).map { - case (tableName, isTemporary) => Row(tableName, isTemporary) + val catalog = sqlContext.sessionState.catalog + val db = databaseName.getOrElse(catalog.getCurrentDatabase) + val rows = catalog.listTables(db).map { t => + val isTemp = t.database.isEmpty + Row(t.table, isTemp) } - rows } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 9e8e0352db644..24923bbb10c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -93,15 +93,21 @@ case class CreateTempTableUsing( provider: String, options: Map[String, String]) extends RunnableCommand { + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary table '$tableIdent' should not have specified a database") + } + def run(sqlContext: SQLContext): Seq[Row] = { val dataSource = DataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, className = provider, options = options) - sqlContext.sessionState.catalog.registerTable( - tableIdent, - Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) + sqlContext.sessionState.catalog.createTempTable( + tableIdent.table, + Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan, + ignoreIfExists = true) Seq.empty[Row] } @@ -115,6 +121,11 @@ case class CreateTempTableUsingAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary table '$tableIdent' should not have specified a database") + } + override def run(sqlContext: SQLContext): Seq[Row] = { val df = Dataset.ofRows(sqlContext, query) val dataSource = DataSource( @@ -124,9 +135,10 @@ case class CreateTempTableUsingAsSelect( bucketSpec = None, options = options) val result = dataSource.write(mode, df) - sqlContext.sessionState.catalog.registerTable( - tableIdent, - Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan) + sqlContext.sessionState.catalog.createTempTable( + tableIdent.table, + Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan, + ignoreIfExists = true) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 63f0e4f8c96ac..28ac4583e9b25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} /** @@ -99,7 +101,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { /** * A rule to do various checks before inserting into or writing to a data source table. */ -private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { +private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) + extends (LogicalPlan => Unit) { + def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { @@ -139,7 +143,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } PartitioningUtils.validatePartitionColumnDataTypes( - r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis) + r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) // Get all input data source relations of the query. val srcRelations = query.collect { @@ -190,7 +194,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } PartitioningUtils.validatePartitionColumnDataTypes( - c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis) + c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) for { spec <- c.bucketSpec diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e6be0ab3bc420..e5f02caabcca4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.internal import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -45,7 +46,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Internal catalog for managing table and database states. */ - lazy val catalog: Catalog = new SimpleCatalog(conf) + lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf) /** * Internal catalog for managing functions registered by the user. @@ -68,7 +69,7 @@ private[sql] class SessionState(ctx: SQLContext) { DataSourceAnalysis :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) - override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) + override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index 2820e4fa23e13..bb54c525cb76d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -33,7 +33,8 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) } test("get all tables") { @@ -45,20 +46,22 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } - test("getting all Tables with a database name has no impact on returned table names") { + test("getting all tables with a database name has no impact on returned table names") { checkAnswer( - sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"), + sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) checkAnswer( - sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), + sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 2ad92b52c4ff0..2f62ad4850dee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ +class SQLContextSuite extends SparkFunSuite with SharedSparkContext { object DummyRule extends Rule[LogicalPlan] { def apply(p: LogicalPlan): LogicalPlan = p @@ -78,4 +78,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ sqlContext.experimental.extraOptimizations = Seq(DummyRule) assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } + + test("SQLContext can access `spark.sql.*` configs") { + sc.conf.set("spark.sql.with.or.without.you", "my love") + val sqlContext = new SQLContext(sc) + assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 077e579931303..c958eac266d61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1476,12 +1476,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-4699 case sensitivity SQL query") { - sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) - val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil - val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) - rdd.toDF().registerTempTable("testTable1") - checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) - sqlContext.setConf(SQLConf.CASE_SENSITIVE, true) + val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE) + try { + sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) + val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil + val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) + rdd.toDF().registerTempTable("testTable1") + checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) + } finally { + sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig) + } } test("SPARK-6145: ORDER BY test for nested fields") { @@ -1755,7 +1759,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { .format("parquet") .save(path) - val message = intercept[AnalysisException] { + // We don't support creating a temporary table while specifying a database + intercept[AnalysisException] { sqlContext.sql( s""" |CREATE TEMPORARY TABLE db.t @@ -1765,9 +1770,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |) """.stripMargin) }.getMessage - assert(message.contains("Specifying database name or other qualifiers are not allowed")) - // If you use backticks to quote the name of a temporary table having dot in it. + // If you use backticks to quote the name then it's OK. sqlContext.sql( s""" |CREATE TEMPORARY TABLE `db.t` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index f8166c7ddc4da..2f806ebba6f96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -51,7 +51,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("tmp"), ignoreIfNotExists = true) } test("overwriting") { @@ -61,7 +62,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("tmp"), ignoreIfNotExists = true) } test("self-join") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index d48358566e38e..80a85a6615974 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils * `f` returns. */ protected def activateDatabase(db: String)(f: => Unit): Unit = { - sqlContext.sql(s"USE $db") - try f finally sqlContext.sql(s"USE default") + sqlContext.sessionState.catalog.setCurrentDatabase(db) + try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default") } /** diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7fe31b0025272..57693284b01df 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,7 +150,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}") + SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( + s"${sessionState.database}") } // Execute -i init files (always in silent mode) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 032965d0d9c28..8e1ebe2937d23 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -193,10 +193,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))( - "" - -> "OK", - "" - -> "hive_test" + "" -> "hive_test" ) } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 05f59f15455ed..650797f7683e1 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -60,16 +60,19 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { } override def afterAll() { - TestHive.cacheTables = false - TimeZone.setDefault(originalTimeZone) - Locale.setDefault(originalLocale) - TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) - TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.sessionState.functionRegistry.restore() - - // For debugging dump some statistics about how much time was spent in various optimizer rules. - logWarning(RuleExecutor.dumpTimeSpent()) - super.afterAll() + try { + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) + TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + TestHive.sessionState.functionRegistry.restore() + + // For debugging dump some statistics about how much time was spent in various optimizer rules. + logWarning(RuleExecutor.dumpTimeSpent()) + } finally { + super.afterAll() + } } /** A list of tests deemed out of scope currently and thus completely disregarded. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 491f2aebb4f4a..0722fb02a8f9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -85,7 +85,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit withClient { getTable(db, table) } } - // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -182,6 +181,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit client.getTable(db, table) } + override def tableExists(db: String, table: String): Boolean = withClient { + client.getTableOption(db, table).isDefined + } + override def listTables(db: String): Seq[String] = withClient { requireDbExists(db) client.listTables(db) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 914f8e9a9893a..ca3ce43591f5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.`type`.HiveDecimal @@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -52,6 +53,7 @@ import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SQLConfEntry import org.apache.spark.sql.internal.SQLConf.SQLConfEntry._ import org.apache.spark.sql.types._ @@ -67,7 +69,7 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) override def foldable: Boolean = true override def nullable: Boolean = false override def eval(input: InternalRow): Any = { - UTF8String.fromString(ctx.metadataHive.currentDatabase) + UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase) } } @@ -81,15 +83,31 @@ class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, listener: SQLListener, - @transient private val execHive: HiveClientImpl, - @transient private val metaHive: HiveClient, - isRootContext: Boolean) - extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging { + @transient private[hive] val executionHive: HiveClientImpl, + @transient private[hive] val metadataHive: HiveClient, + isRootContext: Boolean, + @transient private[sql] val hiveCatalog: HiveCatalog) + extends SQLContext(sc, cacheManager, listener, isRootContext, hiveCatalog) with Logging { self => + private def this(sc: SparkContext, execHive: HiveClientImpl, metaHive: HiveClient) { + this( + sc, + new CacheManager, + SQLContext.createListenerAndUI(sc), + execHive, + metaHive, + true, + new HiveCatalog(metaHive)) + } + def this(sc: SparkContext) = { - this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true) + this( + sc, + HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), + HiveContext.newClientForMetadata(sc.conf, sc.hadoopConfiguration)) } + def this(sc: JavaSparkContext) = this(sc.sc) import org.apache.spark.sql.hive.HiveContext._ @@ -106,9 +124,10 @@ class HiveContext private[hive]( sc = sc, cacheManager = cacheManager, listener = listener, - execHive = executionHive.newSession(), - metaHive = metadataHive.newSession(), - isRootContext = false) + executionHive = executionHive.newSession(), + metadataHive = metadataHive.newSession(), + isRootContext = false, + hiveCatalog = hiveCatalog) } @transient @@ -149,41 +168,6 @@ class HiveContext private[hive]( */ protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS) - /** - * The version of the hive client that will be used to communicate with the metastore. Note that - * this does not necessarily need to be the same version of Hive that is used internally by - * Spark SQL for execution. - */ - protected[hive] def hiveMetastoreVersion: String = getConf(HIVE_METASTORE_VERSION) - - /** - * The location of the jars that should be used to instantiate the HiveMetastoreClient. This - * property can be one of three options: - * - a classpath in the standard format for both hive and hadoop. - * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This - * option is only valid when using the execution version of Hive. - * - maven - download the correct version of hive on demand from maven. - */ - protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS) - - /** - * A comma separated list of class prefixes that should be loaded using the classloader that - * is shared between Spark SQL and a specific version of Hive. An example of classes that should - * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need - * to be shared are those that interact with classes that are already shared. For example, - * custom appenders that are used by log4j. - */ - protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] = - getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "") - - /** - * A comma separated list of class prefixes that should explicitly be reloaded for each version - * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a - * prefix that typically would be shared (i.e. org.apache.spark.*) - */ - protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] = - getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "") - /* * hive thrift server use background spark sql thread pool to execute sql queries */ @@ -195,29 +179,6 @@ class HiveContext private[hive]( @transient protected[sql] lazy val substitutor = new VariableSubstitution() - /** - * The copy of the hive client that is used for execution. Currently this must always be - * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the - * client is used for execution related tasks like registering temporary functions or ensuring - * that the ThreadLocal SessionState is correctly populated. This copy of Hive is *not* used - * for storing persistent metadata, and only point to a dummy metastore in a temporary directory. - */ - @transient - protected[hive] lazy val executionHive: HiveClientImpl = if (execHive != null) { - execHive - } else { - logInfo(s"Initializing execution hive, version $hiveExecutionVersion") - val loader = new IsolatedClientLoader( - version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), - sparkConf = sc.conf, - execJars = Seq(), - hadoopConf = sc.hadoopConfiguration, - config = newTemporaryConfiguration(useInMemoryDerby = true), - isolationOn = false, - baseClassLoader = Utils.getContextOrSparkClassLoader) - loader.createClient().asInstanceOf[HiveClientImpl] - } - /** * Overrides default Hive configurations to avoid breaking changes to Spark SQL users. * - allow SQL11 keywords to be used as identifiers @@ -228,111 +189,6 @@ class HiveContext private[hive]( defaultOverrides() - /** - * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. - * The version of the Hive client that is used here must match the metastore that is configured - * in the hive-site.xml file. - */ - @transient - protected[hive] lazy val metadataHive: HiveClient = if (metaHive != null) { - metaHive - } else { - val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) - - // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options - // into the isolated client loader - val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf]) - - val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir") - logInfo("default warehouse location is " + defaultWarehouseLocation) - - // `configure` goes second to override other settings. - val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure - - val isolatedLoader = if (hiveMetastoreJars == "builtin") { - if (hiveExecutionVersion != hiveMetastoreVersion) { - throw new IllegalArgumentException( - "Builtin jars can only be used when hive execution version == hive metastore version. " + - s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " + - "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " + - s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.") - } - - // We recursively find all jars in the class loader chain, - // starting from the given classLoader. - def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { - case null => Array.empty[URL] - case urlClassLoader: URLClassLoader => - urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) - case other => allJars(other.getParent) - } - - val classLoader = Utils.getContextOrSparkClassLoader - val jars = allJars(classLoader) - if (jars.length == 0) { - throw new IllegalArgumentException( - "Unable to locate hive jars to connect to metastore. " + - "Please set spark.sql.hive.metastore.jars.") - } - - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") - new IsolatedClientLoader( - version = metaVersion, - sparkConf = sc.conf, - execJars = jars.toSeq, - hadoopConf = sc.hadoopConfiguration, - config = allConfig, - isolationOn = true, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } else if (hiveMetastoreJars == "maven") { - // TODO: Support for loading the jars from an already downloaded location. - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") - IsolatedClientLoader.forVersion( - hiveMetastoreVersion = hiveMetastoreVersion, - hadoopVersion = VersionInfo.getVersion, - sparkConf = sc.conf, - hadoopConf = sc.hadoopConfiguration, - config = allConfig, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } else { - // Convert to files and expand any directories. - val jars = - hiveMetastoreJars - .split(File.pathSeparator) - .flatMap { - case path if new File(path).getName() == "*" => - val files = new File(path).getParentFile().listFiles() - if (files == null) { - logWarning(s"Hive jar path '$path' does not exist.") - Nil - } else { - files.filter(_.getName().toLowerCase().endsWith(".jar")) - } - case path => - new File(path) :: Nil - } - .map(_.toURI.toURL) - - logInfo( - s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + - s"using ${jars.mkString(":")}") - new IsolatedClientLoader( - version = metaVersion, - sparkConf = sc.conf, - execJars = jars.toSeq, - hadoopConf = sc.hadoopConfiguration, - config = allConfig, - isolationOn = true, - barrierPrefixes = hiveMetastoreBarrierPrefixes, - sharedPrefixes = hiveMetastoreSharedPrefixes) - } - isolatedLoader.createClient() - } - protected[sql] override def parseSql(sql: String): LogicalPlan = { executionHive.withHiveState { super.parseSql(substitutor.substitute(hiveconf, sql)) @@ -432,7 +288,7 @@ class HiveContext private[hive]( // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - sessionState.catalog.client.alterTable( + sessionState.catalog.alterTable( relation.table.copy( properties = relation.table.properties + (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString))) @@ -459,64 +315,10 @@ class HiveContext private[hive]( setConf(entry.key, entry.stringConverter(value)) } - /** Overridden by child classes that need to set configuration before the client init. */ - protected def configure(): Map[String, String] = { - // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch - // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- - // compatibility when users are trying to connecting to a Hive metastore of lower version, - // because these options are expected to be integral values in lower versions of Hive. - // - // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according - // to their output time units. - Seq( - ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS, - ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS, - ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS, - ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS, - ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS, - ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS, - ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS, - ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, - ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS - ).map { case (confVar, unit) => - confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString - }.toMap - } - /** * SQLConf and HiveConf contracts: * - * 1. create a new SessionState for each HiveContext + * 1. create a new o.a.h.hive.ql.session.SessionState for each HiveContext * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be * set in the SQLConf *as well as* in the HiveConf. @@ -600,7 +402,7 @@ class HiveContext private[hive]( } -private[hive] object HiveContext { +private[hive] object HiveContext extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" @@ -666,6 +468,242 @@ private[hive] object HiveContext { defaultValue = Some(true), doc = "When set to true, Hive Thrift server executes SQL queries in an asynchronous way.") + /** + * The version of the hive client that will be used to communicate with the metastore. Note that + * this does not necessarily need to be the same version of Hive that is used internally by + * Spark SQL for execution. + */ + private def hiveMetastoreVersion(conf: SQLConf): String = { + conf.getConf(HIVE_METASTORE_VERSION) + } + + /** + * The location of the jars that should be used to instantiate the HiveMetastoreClient. This + * property can be one of three options: + * - a classpath in the standard format for both hive and hadoop. + * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This + * option is only valid when using the execution version of Hive. + * - maven - download the correct version of hive on demand from maven. + */ + private def hiveMetastoreJars(conf: SQLConf): String = { + conf.getConf(HIVE_METASTORE_JARS) + } + + /** + * A comma separated list of class prefixes that should be loaded using the classloader that + * is shared between Spark SQL and a specific version of Hive. An example of classes that should + * be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need + * to be shared are those that interact with classes that are already shared. For example, + * custom appenders that are used by log4j. + */ + private def hiveMetastoreSharedPrefixes(conf: SQLConf): Seq[String] = { + conf.getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "") + } + + /** + * A comma separated list of class prefixes that should explicitly be reloaded for each version + * of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a + * prefix that typically would be shared (i.e. org.apache.spark.*) + */ + private def hiveMetastoreBarrierPrefixes(conf: SQLConf): Seq[String] = { + conf.getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "") + } + + /** + * Configurations needed to create a [[HiveClient]]. + */ + private[hive] def hiveClientConfigurations(hiveconf: HiveConf): Map[String, String] = { + // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch + // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- + // compatibility when users are trying to connecting to a Hive metastore of lower version, + // because these options are expected to be integral values in lower versions of Hive. + // + // Here we enumerate all time `ConfVar`s and convert their values to numeric strings according + // to their output time units. + Seq( + ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY -> TimeUnit.SECONDS, + ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME -> TimeUnit.SECONDS, + ConfVars.HMSHANDLERINTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.METASTORE_EVENT_DB_LISTENER_TTL -> TimeUnit.SECONDS, + ConfVars.METASTORE_EVENT_CLEAN_FREQ -> TimeUnit.SECONDS, + ConfVars.METASTORE_EVENT_EXPIRY_DURATION -> TimeUnit.SECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL -> TimeUnit.SECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_WRITER_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS, + ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_TXN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL -> TimeUnit.SECONDS, + ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_HTTP_COOKIE_MAX_AGE -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME -> TimeUnit.SECONDS, + ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.SERVER_READ_SOCKET_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL -> TimeUnit.MILLISECONDS, + ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.SPARK_JOB_MONITOR_TIMEOUT -> TimeUnit.SECONDS, + ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT -> TimeUnit.MILLISECONDS, + ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT -> TimeUnit.MILLISECONDS + ).map { case (confVar, unit) => + confVar.varname -> hiveconf.getTimeVar(confVar, unit).toString + }.toMap + } + + /** + * Create a [[HiveClient]] used for execution. + * + * Currently this must always be Hive 13 as this is the version of Hive that is packaged + * with Spark SQL. This copy of the client is used for execution related tasks like + * registering temporary functions or ensuring that the ThreadLocal SessionState is + * correctly populated. This copy of Hive is *not* used for storing persistent metadata, + * and only point to a dummy metastore in a temporary directory. + */ + protected[hive] def newClientForExecution( + conf: SparkConf, + hadoopConf: Configuration): HiveClientImpl = { + logInfo(s"Initializing execution hive, version $hiveExecutionVersion") + val loader = new IsolatedClientLoader( + version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), + sparkConf = conf, + execJars = Seq(), + hadoopConf = hadoopConf, + config = newTemporaryConfiguration(useInMemoryDerby = true), + isolationOn = false, + baseClassLoader = Utils.getContextOrSparkClassLoader) + loader.createClient().asInstanceOf[HiveClientImpl] + } + + /** + * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. + * + * The version of the Hive client that is used here must match the metastore that is configured + * in the hive-site.xml file. + */ + private def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration): HiveClient = { + val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) + val configurations = hiveClientConfigurations(hiveConf) + newClientForMetadata(conf, hiveConf, hadoopConf, configurations) + } + + protected[hive] def newClientForMetadata( + conf: SparkConf, + hiveConf: HiveConf, + hadoopConf: Configuration, + configurations: Map[String, String]): HiveClient = { + val sqlConf = new SQLConf + sqlConf.setConf(SQLContext.getSQLProperties(conf)) + val hiveMetastoreVersion = HiveContext.hiveMetastoreVersion(sqlConf) + val hiveMetastoreJars = HiveContext.hiveMetastoreJars(sqlConf) + val hiveMetastoreSharedPrefixes = HiveContext.hiveMetastoreSharedPrefixes(sqlConf) + val hiveMetastoreBarrierPrefixes = HiveContext.hiveMetastoreBarrierPrefixes(sqlConf) + val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) + + val defaultWarehouseLocation = hiveConf.get("hive.metastore.warehouse.dir") + logInfo("default warehouse location is " + defaultWarehouseLocation) + + // `configure` goes second to override other settings. + val allConfig = hiveConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configurations + + val isolatedLoader = if (hiveMetastoreJars == "builtin") { + if (hiveExecutionVersion != hiveMetastoreVersion) { + throw new IllegalArgumentException( + "Builtin jars can only be used when hive execution version == hive metastore version. " + + s"Execution: $hiveExecutionVersion != Metastore: $hiveMetastoreVersion. " + + "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " + + s"or change ${HIVE_METASTORE_VERSION.key} to $hiveExecutionVersion.") + } + + // We recursively find all jars in the class loader chain, + // starting from the given classLoader. + def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { + case null => Array.empty[URL] + case urlClassLoader: URLClassLoader => + urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) + case other => allJars(other.getParent) + } + + val classLoader = Utils.getContextOrSparkClassLoader + val jars = allJars(classLoader) + if (jars.length == 0) { + throw new IllegalArgumentException( + "Unable to locate hive jars to connect to metastore. " + + "Please set spark.sql.hive.metastore.jars.") + } + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = conf, + hadoopConf = hadoopConf, + execJars = jars.toSeq, + config = allConfig, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } else if (hiveMetastoreJars == "maven") { + // TODO: Support for loading the jars from an already downloaded location. + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") + IsolatedClientLoader.forVersion( + hiveMetastoreVersion = hiveMetastoreVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = conf, + hadoopConf = hadoopConf, + config = allConfig, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } else { + // Convert to files and expand any directories. + val jars = + hiveMetastoreJars + .split(File.pathSeparator) + .flatMap { + case path if new File(path).getName == "*" => + val files = new File(path).getParentFile.listFiles() + if (files == null) { + logWarning(s"Hive jar path '$path' does not exist.") + Nil + } else { + files.filter(_.getName.toLowerCase.endsWith(".jar")) + } + case path => + new File(path) :: Nil + } + .map(_.toURI.toURL) + + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + + s"using ${jars.mkString(":")}") + new IsolatedClientLoader( + version = metaVersion, + sparkConf = conf, + hadoopConf = hadoopConf, + execJars = jars.toSeq, + config = allConfig, + isolationOn = true, + barrierPrefixes = hiveMetastoreBarrierPrefixes, + sharedPrefixes = hiveMetastoreSharedPrefixes) + } + isolatedLoader.createClient() + } + /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String] = { val withInMemoryMode = if (useInMemoryDerby) "memory:" else "" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 27e4cfc103bee..c7066d73631af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.DataTypeParser @@ -98,27 +98,33 @@ private[hive] object HiveSerDe { } -// TODO: replace this with o.a.s.sql.hive.HiveCatalog once we merge SQLContext and HiveContext +/** + * Legacy catalog for interacting with the Hive metastore. + * + * This is still used for things like creating data source tables, but in the future will be + * cleaned up to integrate more nicely with [[HiveCatalog]]. + */ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveContext) - extends Catalog with Logging { + extends Logging { val conf = hive.conf - /** Usages should lock on `this`. */ - protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) - /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { + private def getCurrentDatabase: String = { + hive.sessionState.catalog.getCurrentDatabase + } + + def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( - tableIdent.database.getOrElse(client.currentDatabase).toLowerCase, + tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, tableIdent.table.toLowerCase) } private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - t.name.database.getOrElse(client.currentDatabase).toLowerCase, + t.name.database.getOrElse(getCurrentDatabase).toLowerCase, t.name.table.toLowerCase) } @@ -194,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - override def refreshTable(tableIdent: TableIdentifier): Unit = { + def refreshTable(tableIdent: TableIdentifier): Unit = { // refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. // Since we also cache ParquetRelations converted from Hive Parquet tables and @@ -408,12 +414,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString } - override def tableExists(tableIdent: TableIdentifier): Boolean = { - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - client.getTableOption(dbName, tblName).isDefined - } - - override def lookupRelation( + def lookupRelation( tableIdent: TableIdentifier, alias: Option[String]): LogicalPlan = { val qualifiedTableName = getQualifiedTableName(tableIdent) @@ -555,12 +556,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - val db = databaseName.getOrElse(client.currentDatabase) - - client.listTables(db).map(tableName => (tableName, false)) - } - /** * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet * data source relations for better performance. @@ -716,27 +711,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } - /** - * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. - * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. - */ - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - throw new UnsupportedOperationException - } - - /** - * UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore. - * For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]]. - */ - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterAllTables(): Unit = {} - - override def setCurrentDatabase(databaseName: String): Unit = { - client.setCurrentDatabase(databaseName) - } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala new file mode 100644 index 0000000000000..aa44cba4b5641 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + + +class HiveSessionCatalog( + externalCatalog: HiveCatalog, + client: HiveClient, + context: HiveContext, + conf: SQLConf) + extends SessionCatalog(externalCatalog, conf) { + + override def setCurrentDatabase(db: String): Unit = { + super.setCurrentDatabase(db) + client.setCurrentDatabase(db) + } + + override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + val newName = name.copy(table = table) + metastoreCatalog.lookupRelation(newName, alias) + } else { + val relation = tempTables.get(table) + val tableWithQualifiers = SubqueryAlias(table, relation) + // If an alias was specified by the lookup, wrap the plan in a subquery so that + // attributes are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) + } + } + + // ---------------------------------------------------------------- + // | Methods and fields for interacting with HiveMetastoreCatalog | + // ---------------------------------------------------------------- + + // Catalog for handling data source tables. TODO: This really doesn't belong here since it is + // essentially a cache for metastore tables. However, it relies on a lot of session-specific + // things so it would be a lot of work to split its functionality between HiveSessionCatalog + // and HiveCatalog. We should still do it at some point... + private val metastoreCatalog = new HiveMetastoreCatalog(client, context) + + val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions + val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables + val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts + + override def refreshTable(name: TableIdentifier): Unit = { + metastoreCatalog.refreshTable(name) + } + + def invalidateTable(name: TableIdentifier): Unit = { + metastoreCatalog.invalidateTable(name) + } + + def invalidateCache(): Unit = { + metastoreCatalog.cachedDataSourceTables.invalidateAll() + } + + def createDataSourceTable( + name: TableIdentifier, + userSpecifiedSchema: Option[StructType], + partitionColumns: Array[String], + bucketSpec: Option[BucketSpec], + provider: String, + options: Map[String, String], + isExternal: Boolean): Unit = { + metastoreCatalog.createDataSourceTable( + name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal) + } + + def hiveDefaultTableFilePath(name: TableIdentifier): String = { + metastoreCatalog.hiveDefaultTableFilePath(name) + } + + // For testing only + private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { + val key = metastoreCatalog.getQualifiedTableName(table) + metastoreCatalog.cachedDataSourceTables.getIfPresent(key) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d9cd96d66f493..caa7f296ed16a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ @@ -35,9 +35,11 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) } /** - * A metadata catalog that points to the Hive metastore. + * Internal catalog for managing table and database states. */ - override lazy val catalog = new HiveMetastoreCatalog(ctx.metadataHive, ctx) with OverrideCatalog + override lazy val catalog = { + new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf) + } /** * Internal catalog for managing functions registered by the user. @@ -61,7 +63,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) DataSourceAnalysis :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) - override val extendedCheckRules = Seq(PreWriteCheck(catalog)) + override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index d214e5288eff0..f4d30358cafa8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -53,9 +53,6 @@ private[hive] trait HiveClient { /** Returns the names of tables in the given database that matches the given pattern. */ def listTables(dbName: String, pattern: String): Seq[String] - /** Returns the name of the active database. */ - def currentDatabase: String - /** Sets the name of current database. */ def setCurrentDatabase(databaseName: String): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 928408c52bd23..e4e15d13df658 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -241,10 +241,6 @@ private[hive] class HiveClientImpl( state.err = stream } - override def currentDatabase: String = withHiveState { - state.getCurrentDatabase - } - override def setCurrentDatabase(databaseName: String): Unit = withHiveState { if (getDatabaseOption(databaseName).isDefined) { state.setCurrentDatabase(databaseName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 391e2975d0086..5a61eef0f2439 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -69,10 +69,10 @@ case class CreateTableAsSelect( withFormat } - hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false) + hiveContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match { + hiveContext.sessionState.catalog.lookupRelation(tableIdentifier) match { case r: MetastoreRelation => r } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 8a1cf2caaaaaa..9ff520da1d41d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -56,7 +56,7 @@ private[hive] case class CreateViewAsSelect( case true if orReplace => // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext)) + hiveContext.metadataHive.alertView(prepareTable(sqlContext)) case true => // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already @@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect( "CREATE OR REPLACE VIEW AS") case false => - hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext)) + hiveContext.metadataHive.createView(prepareTable(sqlContext)) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 4ffd868242b86..430fa4616fc2b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -45,7 +45,7 @@ case class InsertIntoHiveTable( @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient private lazy val hiveContext = new Context(sc.hiveconf) - @transient private lazy val catalog = sc.sessionState.catalog + @transient private lazy val client = sc.metadataHive def output: Seq[Attribute] = Seq.empty @@ -186,8 +186,8 @@ case class InsertIntoHiveTable( // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false if (numDynamicPartitions > 0) { - catalog.synchronized { - catalog.client.loadDynamicPartitions( + client.synchronized { + client.loadDynamicPartitions( outputPath.toString, qualifiedTableName, orderedPartitionSpec, @@ -202,12 +202,12 @@ case class InsertIntoHiveTable( // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries // scalastyle:on val oldPart = - catalog.client.getPartitionOption( - catalog.client.getTable(table.databaseName, table.tableName), + client.getPartitionOption( + client.getTable(table.databaseName, table.tableName), partitionSpec) if (oldPart.isEmpty || !ifNotExists) { - catalog.client.loadPartition( + client.loadPartition( outputPath.toString, qualifiedTableName, orderedPartitionSpec, @@ -218,7 +218,7 @@ case class InsertIntoHiveTable( } } } else { - catalog.client.loadTable( + client.loadTable( outputPath.toString, // TODO: URI qualifiedTableName, overwrite, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 226b8e179604d..cd26a68f357c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -71,7 +71,8 @@ case class DropTable( } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") - hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName)) + hiveContext.sessionState.catalog.dropTable( + TableIdentifier(tableName), ignoreIfNotExists = true) Seq.empty[Row] } } @@ -142,7 +143,8 @@ case class CreateMetastoreDataSource( val optionsWithPath = if (!options.contains("path") && managedIfNoPath) { isExternal = false - options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> + hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } @@ -200,7 +202,8 @@ case class CreateMetastoreDataSourceAsSelect( val optionsWithPath = if (!options.contains("path")) { isExternal = false - options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) + options + ("path" -> + hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent)) } else { options } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 19c05f9cb0d9c..a1785ca03883e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -24,6 +24,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.ql.processors._ @@ -35,9 +37,11 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.command.CacheTableCommand +import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -55,10 +59,6 @@ object TestHive // SPARK-8910 .set("spark.ui.enabled", "false"))) -trait TestHiveSingleton { - protected val sqlContext: SQLContext = TestHive - protected val hiveContext: TestHiveContext = TestHive -} /** * A locally running test instance of Spark's Hive execution engine. @@ -71,10 +71,87 @@ trait TestHiveSingleton { * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of * test cases that rely on TestHive must be serialized. */ -class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { - self => +class TestHiveContext private[hive]( + sc: SparkContext, + cacheManager: CacheManager, + listener: SQLListener, + executionHive: HiveClientImpl, + metadataHive: HiveClient, + isRootContext: Boolean, + hiveCatalog: HiveCatalog, + val warehousePath: File, + val scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]) + extends HiveContext( + sc, + cacheManager, + listener, + executionHive, + metadataHive, + isRootContext, + hiveCatalog) { self => + + // Unfortunately, due to the complex interactions between the construction parameters + // and the limitations in scala constructors, we need many of these constructors to + // provide a shorthand to create a new TestHiveContext with only a SparkContext. + // This is not a great design pattern but it's necessary here. + + private def this( + sc: SparkContext, + executionHive: HiveClientImpl, + metadataHive: HiveClient, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]) { + this( + sc, + new CacheManager, + SQLContext.createListenerAndUI(sc), + executionHive, + metadataHive, + true, + new HiveCatalog(metadataHive), + warehousePath, + scratchDirPath, + metastoreTemporaryConf) + } + + private def this( + sc: SparkContext, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]) { + this( + sc, + HiveContext.newClientForExecution(sc.conf, sc.hadoopConfiguration), + TestHiveContext.newClientForMetadata( + sc.conf, sc.hadoopConfiguration, warehousePath, scratchDirPath, metastoreTemporaryConf), + warehousePath, + scratchDirPath, + metastoreTemporaryConf) + } - import HiveContext._ + def this(sc: SparkContext) { + this( + sc, + Utils.createTempDir(namePrefix = "warehouse"), + TestHiveContext.makeScratchDir(), + HiveContext.newTemporaryConfiguration(useInMemoryDerby = false)) + } + + override def newSession(): HiveContext = { + new TestHiveContext( + sc = sc, + cacheManager = cacheManager, + listener = listener, + executionHive = executionHive.newSession(), + metadataHive = metadataHive.newSession(), + isRootContext = false, + hiveCatalog = hiveCatalog, + warehousePath = warehousePath, + scratchDirPath = scratchDirPath, + metastoreTemporaryConf = metastoreTemporaryConf) + } // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. @@ -83,24 +160,12 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { hiveconf.set("hive.plan.serialization.format", "javaXML") - lazy val warehousePath = Utils.createTempDir(namePrefix = "warehouse-") - - lazy val scratchDirPath = { - val dir = Utils.createTempDir(namePrefix = "scratch-") - dir.delete() - dir - } - - private lazy val temporaryConfig = newTemporaryConfiguration(useInMemoryDerby = false) - - /** Sets up the system initially or after a RESET command */ - protected override def configure(): Map[String, String] = { - super.configure() ++ temporaryConfig ++ Map( - ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, - ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", - ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, - ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1" - ) + // A snapshot of the entries in the starting SQLConf + // We save this because tests can mutate this singleton object if they want + val initialSQLConf: SQLConf = { + val snapshot = new SQLConf + conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } + snapshot } val testTempDir = Utils.createTempDir() @@ -427,9 +492,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { cacheManager.clearCache() loadedTables.clear() - sessionState.catalog.cachedDataSourceTables.invalidateAll() - sessionState.catalog.client.reset() - sessionState.catalog.unregisterAllTables() + sessionState.catalog.clearTempTables() + sessionState.catalog.invalidateCache() + metadataHive.reset() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } @@ -448,13 +513,13 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // Lots of tests fail if we do not change the partition whitelist from the default. runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*") - configure().foreach { - case (k, v) => - metadataHive.runSqlHive(s"SET $k=$v") - } + // In case a test changed any of these values, restore all the original ones here. + TestHiveContext.hiveClientConfigurations( + hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf) + .foreach { case (k, v) => metadataHive.runSqlHive(s"SET $k=$v") } defaultOverrides() - runSqlHive("USE default") + sessionState.catalog.setCurrentDatabase("default") } catch { case e: Exception => logError("FATAL ERROR: Failed to reset TestDB state.", e) @@ -490,4 +555,43 @@ private[hive] object TestHiveContext { // Fewer shuffle partitions to speed up testing. SQLConf.SHUFFLE_PARTITIONS.key -> "5" ) + + /** + * Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore. + */ + private def newClientForMetadata( + conf: SparkConf, + hadoopConf: Configuration, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]): HiveClient = { + val hiveConf = new HiveConf(hadoopConf, classOf[HiveConf]) + HiveContext.newClientForMetadata( + conf, + hiveConf, + hadoopConf, + hiveClientConfigurations(hiveConf, warehousePath, scratchDirPath, metastoreTemporaryConf)) + } + + /** + * Configurations needed to create a [[HiveClient]]. + */ + private def hiveClientConfigurations( + hiveconf: HiveConf, + warehousePath: File, + scratchDirPath: File, + metastoreTemporaryConf: Map[String, String]): Map[String, String] = { + HiveContext.hiveClientConfigurations(hiveconf) ++ metastoreTemporaryConf ++ Map( + ConfVars.METASTOREWAREHOUSE.varname -> warehousePath.toURI.toString, + ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", + ConfVars.SCRATCHDIR.varname -> scratchDirPath.toURI.toString, + ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY.varname -> "1") + } + + private def makeScratchDir(): File = { + val scratchDir = Utils.createTempDir(namePrefix = "scratch") + scratchDir.delete() + scratchDir + } + } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index bd14a243eaeb4..2fc38e2b2d2e7 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -70,8 +70,9 @@ public void setUp() throws IOException { if (path.exists()) { path.delete(); } - hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath( - new TableIdentifier("javaSavedTable"))); + hiveManagedPath = new Path( + sqlContext.sessionState().catalog().hiveDefaultTableFilePath( + new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); if (fs.exists(hiveManagedPath)){ fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala new file mode 100644 index 0000000000000..154ada3daae51 --- /dev/null +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.test + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SQLContext + + +trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { + protected val sqlContext: SQLContext = TestHive + protected val hiveContext: TestHiveContext = TestHive + + protected override def afterAll(): Unit = { + try { + hiveContext.reset() + } finally { + super.afterAll() + } + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala index 72765f05e7e49..4c9c48a25c5b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionToSQLSuite.scala @@ -26,6 +26,7 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ protected override def beforeAll(): Unit = { + super.beforeAll() sql("DROP TABLE IF EXISTS t0") sql("DROP TABLE IF EXISTS t1") sql("DROP TABLE IF EXISTS t2") @@ -43,9 +44,13 @@ class ExpressionToSQLSuite extends SQLBuilderTest with SQLTestUtils { } override protected def afterAll(): Unit = { - sql("DROP TABLE IF EXISTS t0") - sql("DROP TABLE IF EXISTS t1") - sql("DROP TABLE IF EXISTS t2") + try { + sql("DROP TABLE IF EXISTS t0") + sql("DROP TABLE IF EXISTS t1") + sql("DROP TABLE IF EXISTS t2") + } finally { + super.afterAll() + } } private def checkSqlGeneration(hiveQl: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala new file mode 100644 index 0000000000000..b644a50613337 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.hive.test.TestHive + + +class HiveContextSuite extends SparkFunSuite { + + test("HiveContext can access `spark.sql.*` configs") { + // Avoid creating another SparkContext in the same JVM + val sc = TestHive.sparkContext + require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == + "org.apache.spark.sql.hive.execution.PairSerDe") + assert(TestHive.initialSQLConf.getConfString("spark.sql.hive.metastore.barrierPrefixes") == + "org.apache.spark.sql.hive.execution.PairSerDe") + assert(TestHive.metadataHive.getConf("spark.sql.hive.metastore.barrierPrefixes", "") == + "org.apache.spark.sql.hive.execution.PairSerDe") + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala index 35e433964da91..57f96e725a044 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala @@ -33,12 +33,17 @@ class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with private var testData: DataFrame = _ override def beforeAll() { + super.beforeAll() testData = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b") hiveContext.registerDataFrameAsTable(testData, "mytable") } override def afterAll(): Unit = { - hiveContext.dropTempTable("mytable") + try { + hiveContext.dropTempTable("mytable") + } finally { + super.afterAll() + } } test("rollup") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index ce7b08ab72f79..69673956135db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{QueryTest, Row, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} -class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton { +class HiveMetastoreCatalogSuite extends TestHiveSingleton { import hiveContext.implicits._ test("struct field should accept underscore in sub-column name") { @@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite .saveAsTable("t") } - val hiveTable = sessionState.catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) @@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite |AS SELECT 1 AS d1, "val_1" AS d2 """.stripMargin) - val hiveTable = sessionState.catalog.client.getTable("default", "t") + val hiveTable = sessionState.catalog.getTable(TableIdentifier("t", Some("default"))) assert(hiveTable.storage.inputFormat === Some(inputFormat)) assert(hiveTable.storage.outputFormat === Some(outputFormat)) assert(hiveTable.storage.serde === Some(serde)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 0a31ac64a20f5..5272f4192e4c8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -31,18 +31,25 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft val df = sparkContext.parallelize((1 to 10).map(i => (i, s"str$i"))).toDF("key", "value") override def beforeAll(): Unit = { + super.beforeAll() // The catalog in HiveContext is a case insensitive one. - sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan) + sessionState.catalog.createTempTable( + "ListTablesSuiteTable", df.logicalPlan, ignoreIfExists = true) sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)") sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB") sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)") } override def afterAll(): Unit = { - sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) - sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") - sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") - sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") + try { + sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) + sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable") + sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable") + sql("DROP DATABASE IF EXISTS ListTablesSuiteDB") + } finally { + super.afterAll() + } } test("get all tables of current database") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index f6b9072da4449..c9bcf819effaf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -27,6 +27,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { import testImplicits._ protected override def beforeAll(): Unit = { + super.beforeAll() sql("DROP TABLE IF EXISTS parquet_t0") sql("DROP TABLE IF EXISTS parquet_t1") sql("DROP TABLE IF EXISTS parquet_t2") @@ -64,11 +65,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { } override protected def afterAll(): Unit = { - sql("DROP TABLE IF EXISTS parquet_t0") - sql("DROP TABLE IF EXISTS parquet_t1") - sql("DROP TABLE IF EXISTS parquet_t2") - sql("DROP TABLE IF EXISTS parquet_t3") - sql("DROP TABLE IF EXISTS t0") + try { + sql("DROP TABLE IF EXISTS parquet_t0") + sql("DROP TABLE IF EXISTS parquet_t1") + sql("DROP TABLE IF EXISTS parquet_t2") + sql("DROP TABLE IF EXISTS parquet_t3") + sql("DROP TABLE IF EXISTS t0") + } finally { + super.afterAll() + } } private def checkHiveQl(hiveQl: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 3f3d0692b7b61..71652897e6548 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -44,6 +44,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv var jsonFilePath: String = _ override def beforeAll(): Unit = { + super.beforeAll() jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile } @@ -693,13 +694,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("SPARK-6024 wide schema support") { withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") { withTable("wide_schema") { - withTempDir( tempDir => { + withTempDir { tempDir => // We will need 80 splits for this schema if the threshold is 4000. val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) // Manually create a metastore data source table. sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("wide_schema"), + name = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -711,7 +712,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val actualSchema = table("wide_schema").schema assert(schema === actualSchema) - }) + } } } } @@ -737,7 +738,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv "spark.sql.sources.schema" -> schema.json, "EXTERNAL" -> "FALSE")) - sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false) + hiveCatalog.createTable("default", hiveTable, ignoreIfExists = false) invalidateTable(tableName) val actualSchema = table(tableName).schema @@ -752,7 +753,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = sessionState.catalog.client.getTable("default", tableName) + val metastoreTable = hiveCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt @@ -787,7 +788,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .sortBy("c") .saveAsTable(tableName) invalidateTable(tableName) - val metastoreTable = sessionState.catalog.client.getTable("default", tableName) + val metastoreTable = hiveCatalog.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) @@ -903,11 +904,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("skip hive metadata on table creation") { - withTempDir(tempPath => { + withTempDir { tempPath => val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("not_skip_hive_metadata"), + name = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -917,11 +918,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // As a proxy for verifying that the table was stored in Hive compatible format, // we verify that each column of the table is of native type StringType. - assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema + assert(hiveCatalog.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("skip_hive_metadata"), + name = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], bucketSpec = None, @@ -929,10 +930,11 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), isExternal = false) - // As a proxy for verifying that the table was stored in SparkSQL format, we verify that - // the table has a column type as array of StringType. - assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) - }) + // As a proxy for verifying that the table was stored in SparkSQL format, + // we verify that the table has a column type as array of StringType. + assert(hiveCatalog.getTable("default", "skip_hive_metadata").schema.forall { c => + HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) + }) + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index f3af60a0186f2..3c003506efcb1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,9 +25,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = sqlContext.range(10).coalesce(1).toDF() private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName) - val expectedPath = - hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName + val metastoreTable = hiveContext.hiveCatalog.getTable(dbName, tableName) + val expectedPath = hiveContext.hiveCatalog.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 151aacbdd1c44..ae026ed4964eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -121,7 +121,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { intercept[UnsupportedOperationException] { hiveContext.analyze("tempTable") } - hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable")) + hiveContext.sessionState.catalog.dropTable( + TableIdentifier("tempTable"), ignoreIfNotExists = true) } test("estimates the size of a test MetastoreRelation") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 295069228fea1..d59bca4c7ee4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -171,10 +171,6 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.listTables("default") === Seq("src")) } - test(s"$version: currentDatabase") { - assert(client.currentDatabase === "default") - } - test(s"$version: getDatabase") { client.getDatabase("default") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 81fd71201d338..94fbcb7ee2056 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -189,10 +189,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } override def afterAll(): Unit = { - sqlContext.sql("DROP TABLE IF EXISTS agg1") - sqlContext.sql("DROP TABLE IF EXISTS agg2") - sqlContext.sql("DROP TABLE IF EXISTS agg3") - sqlContext.dropTempTable("emptyTable") + try { + sqlContext.sql("DROP TABLE IF EXISTS agg1") + sqlContext.sql("DROP TABLE IF EXISTS agg2") + sqlContext.sql("DROP TABLE IF EXISTS agg3") + sqlContext.dropTempTable("emptyTable") + } finally { + super.afterAll() + } } test("group by function") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 5fe85eaef2b55..197a123905d2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -49,6 +49,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { import org.apache.spark.sql.hive.test.TestHive.implicits._ override def beforeAll() { + super.beforeAll() TestHive.cacheTables = true // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*) TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) @@ -57,11 +58,14 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } override def afterAll() { - TestHive.cacheTables = false - TimeZone.setDefault(originalTimeZone) - Locale.setDefault(originalLocale) - sql("DROP TEMPORARY FUNCTION udtf_count2") - super.afterAll() + try { + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + sql("DROP TEMPORARY FUNCTION udtf_count2") + } finally { + super.afterAll() + } } test("SPARK-4908: concurrent hive native commands") { @@ -1209,7 +1213,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("USE hive_test_db") assert("hive_test_db" == sql("select current_database()").first().getString(0)) - intercept[NoSuchDatabaseException] { + intercept[AnalysisException] { sql("USE not_existing_db") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index d7c529ab0e10d..b0e263dff986a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -47,7 +47,7 @@ case class ListStringCaseClass(l: Seq[String]) */ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - import hiveContext.{udf, sql} + import hiveContext.udf import hiveContext.implicits._ test("spark sql udf test that returns a struct") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index bc8896d4bd0ba..6199253d34db0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1325,6 +1325,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { .format("parquet") .save(path) + // We don't support creating a temporary table while specifying a database val message = intercept[AnalysisException] { sqlContext.sql( s""" @@ -1335,9 +1336,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |) """.stripMargin) }.getMessage - assert(message.contains("Specifying database name or other qualifiers are not allowed")) - // If you use backticks to quote the name of a temporary table having dot in it. + // If you use backticks to quote the name then it's OK. sqlContext.sql( s""" |CREATE TEMPORARY TABLE `db.t` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala index ea82b8c459695..c6b7eb63662c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.test.SQLTestUtils class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { override def beforeAll(): Unit = { + super.beforeAll() sql("DROP TABLE IF EXISTS part") sql( """ @@ -50,7 +51,11 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto } override def afterAll(): Unit = { - sql("DROP TABLE IF EXISTS part") + try { + sql("DROP TABLE IF EXISTS part") + } finally { + super.afterAll() + } } test("windowing.q -- 15. testExpressions") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index cc412241fb4da..92f424bac7ff3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) } - sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) } test("overwriting") { @@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(table("t"), data.map(Row.fromTuple)) } - sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true) } test("self-join") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index fe446774effbb..bdd3428a89742 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -68,8 +68,12 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } override def afterAll(): Unit = { - orcTableDir.delete() - orcTableAsDir.delete() + try { + orcTableDir.delete() + orcTableAsDir.delete() + } finally { + super.afterAll() + } } test("create temporary orc table") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index bb53179c3cce3..b6fc61d453c4c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.DataSourceScan import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} @@ -425,10 +426,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("Caching converted data source Parquet Relations") { - val _catalog = sessionState.catalog - def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = { + def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. - sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { + sessionState.catalog.getCachedDataSourceTable(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK case other => @@ -453,17 +453,17 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet") + var tableIdentifier = TableIdentifier("test_insert_parquet", Some("default")) // First, make sure the converted test_parquet is not cached. - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Table lookup will make the table cached. table("test_insert_parquet") checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. invalidateTable("test_insert_parquet") - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_insert_parquet @@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select a, b from jt").collect()) // Invalidate the cache. invalidateTable("test_insert_parquet") - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. sql( @@ -493,8 +493,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' """.stripMargin) - tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test") - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + tableIdentifier = TableIdentifier("test_parquet_partitioned_cache_test", Some("default")) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") @@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin).collect()) invalidateTable("test_parquet_partitioned_cache_test") - assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null) + assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } @@ -700,6 +700,7 @@ abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with var partitionedTableDirWithKeyAndComplexTypes: File = null override def beforeAll(): Unit = { + super.beforeAll() partitionedTableDir = Utils.createTempDir() normalTableDir = Utils.createTempDir()