Skip to content

Commit

Permalink
[SPARK-36128][SQL] Apply spark.sql.hive.metastorePartitionPruning for…
Browse files Browse the repository at this point in the history
… non-Hive tables that uses Hive metastore for partition management

### What changes were proposed in this pull request?

In `CatalogFileIndex.filterPartitions`, check the config `spark.sql.hive.metastorePartitionPruning` and don't pushdown predicates to remote HMS if it is false. Instead, fallback to the `listPartitions` API and do the filtering on the client side.

### Why are the changes needed?

Currently the config `spark.sql.hive.metastorePartitionPruning` is only effective for Hive tables, and for non-Hive tables we'd always use the `listPartitionsByFilter` API from HMS client. On the other hand, by default all data source tables also manage their partitions through HMS, when the config `spark.sql.hive.manageFilesourcePartitions` is turned on. Therefore, it seems reasonable to extend the above config for non-Hive tables as well.

In certain cases the remote HMS service could throw exceptions when using the `listPartitionsByFilter` API, which, on the Spark side, is unrecoverable at the current state. Therefore it would be better to allow users to disable the API by using the above config.

For instance, HMS only allow pushdown date column when direct SQL is used instead of JDO for interacting with the underlying RDBMS, and will throw exception otherwise. Even though the Spark Hive client will attempt to recover itself when the exception happens, it only does so when the config `hive.metastore.try.direct.sql` from remote HMS is `false`. There could be cases where the value of `hive.metastore.try.direct.sql` is true but remote HMS still throws exception.

### Does this PR introduce _any_ user-facing change?

Yes now the config `spark.sql.hive.metastorePartitionPruning` is extended for non-Hive tables which use HMS to manage their partition metadata.

### How was this patch tested?

Added a new unit test:
```
build/sbt "hive/testOnly *PruneFileSourcePartitionsSuite -- -z SPARK-36128"
```

Closes apache#33348 from sunchao/SPARK-36128-by-filter.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
sunchao authored and viirya committed Jul 16, 2021
1 parent 3218e4e commit 37dc3f9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, Predicate}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

object ExternalCatalogUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't
Expand Down Expand Up @@ -132,6 +133,19 @@ object ExternalCatalogUtils {
escapePathName(col) + "=" + partitionString
}

def listPartitionsByFilter(
conf: SQLConf,
catalog: SessionCatalog,
table: CatalogTable,
partitionFilters: Seq[Expression]): Seq[CatalogTablePartition] = {
if (conf.metastorePartitionPruning) {
catalog.listPartitionsByFilter(table.identifier, partitionFilters)
} else {
ExternalCatalogUtils.prunePartitionsByFilter(table, catalog.listPartitions(table.identifier),
partitionFilters, conf.sessionLocalTimeZone)
}
}

def prunePartitionsByFilter(
catalogTable: CatalogTable,
inputPartitions: Seq[CatalogTablePartition],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,7 @@ object SQLConf {
val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
"unmatching partitions can be eliminated earlier. This only affects Hive tables " +
"not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
"HiveUtils.CONVERT_METASTORE_ORC for more information).")
"unmatching partitions can be eliminated earlier.")
.version("1.5.0")
.booleanConf
.createWithDefault(true)
Expand All @@ -1005,7 +1003,8 @@ object SQLConf {
.doc("When true, enable metastore partition management for file source tables as well. " +
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
s"metastore to prune partitions during query planning when " +
s"$HIVE_METASTORE_PARTITION_PRUNING is set to true.")
.version("2.1.1")
.booleanConf
.createWithDefault(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -70,8 +70,8 @@ class CatalogFileIndex(
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val selectedPartitions = ExternalCatalogUtils.listPartitionsByFilter(
sparkSession.sessionState.conf, sparkSession.sessionState.catalog, table, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,6 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
normalizedFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionColumnSet)))
}

/**
* Prune the hive table using filters on the partitions of the table.
*/
private def prunePartitions(
relation: HiveTableRelation,
partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
if (conf.metastorePartitionPruning) {
session.sessionState.catalog.listPartitionsByFilter(
relation.tableMeta.identifier, partitionFilters.toSeq)
} else {
ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
partitionFilters.toSeq, conf.sessionLocalTimeZone)
}
}

/**
* Update the statistics of the table.
*/
Expand Down Expand Up @@ -111,7 +95,8 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
if (partitionKeyFilters.nonEmpty) {
val newPartitions = prunePartitions(relation, partitionKeyFilters)
val newPartitions = ExternalCatalogUtils.listPartitionsByFilter(conf,
session.sessionState.catalog, relation.tableMeta, partitionKeyFilters.toSeq)
val newTableMeta = updateTableMeta(relation, newPartitions, partitionKeyFilters)
val newRelation = relation.copy(
tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution

import org.scalatest.matchers.should.Matchers._

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand Down Expand Up @@ -115,7 +117,7 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) {
withTempPath { dir =>
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
.write.partitionBy("p").parquet(dir.getCanonicalPath)
withTempView("tmp") {
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
assertPrunedPartitions("SELECT COUNT(*) FROM tmp WHERE p = 0", 1, "(tmp.p = 0)")
Expand All @@ -125,6 +127,21 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase {
}
}

test("SPARK-36128: spark.sql.hive.metastorePartitionPruning should work for file data sources") {
Seq(true, false).foreach { enablePruning =>
withTable("tbl") {
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> enablePruning.toString) {
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
HiveCatalogMetrics.reset()
QueryTest.checkAnswer(sql("SELECT id FROM tbl WHERE p = 1"),
Seq(1, 4, 7).map(Row.apply(_)), checkToRDD = false) // avoid analyzing the query twice
val expectedCount = if (enablePruning) 1 else 3
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount == expectedCount)
}
}
}
}

override def getScanExecPartitionSize(plan: SparkPlan): Long = {
plan.collectFirst {
case p: FileSourceScanExec => p.selectedPartitions.length
Expand Down

0 comments on commit 37dc3f9

Please sign in to comment.