Skip to content

Commit

Permalink
[SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the ha…
Browse files Browse the repository at this point in the history
…ndling of Project and Filter over partitioned relation

A followup of apache#20988

`PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation.

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21111 from cloud-fan/refactor.

(cherry picked from commit f70f46d)

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
  • Loading branch information
cloud-fan authored and jzhuge committed Feb 7, 2019
1 parent ebb45b6 commit b65d684
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ object LocalRelation {
}
}

/**
* Logical plan node for scanning data from a local collection.
*
* @param data The local collection holding the data. It doesn't need to be sent to executors
* and then doesn't need to be serializable.
*/
case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Physical plan node for scanning data from a local collection.
*
* `Seq` may not be serializable and ideally we should not send `rows` and `unsafeRows`
* to the executors. Thus marking them as transient.
*/
case class LocalTableScanExec(
output: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
Expand All @@ -49,9 +50,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
}

plan.transform {
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
case a @ Aggregate(_, aggExprs, child @ PhysicalOperation(
projectList, filters, PartitionedRelation(partAttrs, rel))) =>
// We only apply this optimization when only partitioned attributes are scanned.
if (a.references.subsetOf(attrs)) {
if (AttributeSet((projectList ++ filters).flatMap(_.references)).subsetOf(partAttrs)) {
// The project list and filters all only refer to partition attributes, which means the
// the Aggregator operator can also only refer to partition attributes, and filters are
// all partition filters. This is a metadata only query we can optimize.
val aggFunctions = aggExprs.flatMap(_.collect {
case agg: AggregateExpression => agg
})
Expand Down Expand Up @@ -107,7 +112,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
partFilters: Seq[Expression]): LogicalPlan = {
// this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
// relation's schema. PartitionedRelation ensures that the filters only reference partition cols
val relFilters = partFilters.map { e =>
val normalizedFilters = partFilters.map { e =>
e transform {
case a: AttributeReference =>
a.withName(relation.output.find(_.semanticEquals(a)).get.name)
Expand All @@ -119,11 +124,8 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(relFilters, Nil)
// partition data may be a stream, which can cause serialization to hit stack level too
// deep exceptions because it is a recursive structure in memory. converting to array
// avoids the problem.
LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
val partitionData = fsRelation.location.listFiles(normalizedFilters, Nil)
LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)

case relation: HiveTableRelation =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Expand All @@ -132,7 +134,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(SQLConf.get.sessionLocalTimeZone)
val partitions = if (partFilters.nonEmpty) {
catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
catalog.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters)
} else {
catalog.listPartitions(relation.tableMeta.identifier)
}
Expand All @@ -142,10 +144,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
})
}
// partition data may be a stream, which can cause serialization to hit stack level too
// deep exceptions because it is a recursive structure in memory. converting to array
// avoids the problem.
LocalRelation(partAttrs, partitionData.toArray)
LocalRelation(partAttrs, partitionData)

case _ =>
throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
Expand All @@ -156,44 +155,21 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic

/**
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
* pair of the partition attributes, partition filters, and the table relation node.
*
* It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
* deterministic expressions, and returns result after reaching the partitioned table relation
* node.
* pair of the partition attributes and the table relation node.
*/
object PartitionedRelation extends PredicateHelper {

def unapply(
plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
plan match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
if fsRelation.partitionSchema.nonEmpty =>
if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
Some((partAttrs, partAttrs, Nil, l))
Some((partAttrs, l))

case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
val partAttrs = AttributeSet(
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
Some((partAttrs, partAttrs, Nil, relation))

case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
if (p.references.subsetOf(attrs)) {
Some((partAttrs, p.outputSet, filters, relation))
} else {
None
}
}

case f @ Filter(condition, child) if condition.deterministic =>
unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
if (f.references.subsetOf(partAttrs)) {
Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
} else {
None
}
}
Some((partAttrs, relation))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

Expand All @@ -32,13 +33,22 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto

import spark.implicits._

before {
override def beforeAll(): Unit = {
super.beforeAll()
sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
(0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
}

override protected def afterAll(): Unit = {
try {
sql("DROP TABLE IF EXISTS metadata_only")
} finally {
super.afterAll()
}
}

test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
withTable("metadata_only") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount

// verify the number of matching partitions
Expand All @@ -50,7 +60,7 @@ class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleto
}

test("SPARK-23877: filter on projected expression") {
withTable("metadata_only") {
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount

// verify the matching partitions
Expand Down

0 comments on commit b65d684

Please sign in to comment.