Skip to content

Commit

Permalink
[SPARK-24478][SQL][FOLLOWUP] Move projection and filter push down to …
Browse files Browse the repository at this point in the history
…physical conversion

This is a followup of apache#21503, to completely move operator pushdown to the planner rule.

The code are mostly from apache#21319

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21574 from cloud-fan/followup.

Ref: LIHADOOP-48531

RB=1853689
G=superfriends-reviewers
R=zolin,fli,yezhou,mshen,latang
A=
  • Loading branch information
cloud-fan authored and otterc committed Oct 24, 2019
1 parent 548fc8d commit 3d863f3
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 117 deletions.
Expand Up @@ -23,10 +23,9 @@
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*
* Statistics are reported to the optimizer before a projection or any filters are pushed to the
* DataSourceReader. Implementations that return more accurate statistics based on projection and
* filters will not improve query performance until the planner can push operators before getting
* stats.
* Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader.
* Implementations that return more accurate statistics based on pushed operators will not improve
* query performance until the planner can push operators before getting stats.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics extends DataSourceReader {
Expand Down
Expand Up @@ -23,17 +23,24 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsReportStatistics}
import org.apache.spark.sql.types.StructType

/**
* A logical plan representing a data source v2 scan.
*
* @param source An instance of a [[DataSourceV2]] implementation.
* @param options The options for this scan. Used to create fresh [[DataSourceReader]].
* @param userSpecifiedSchema The user-specified schema for this scan. Used to create fresh
* [[DataSourceReader]].
*/
case class DataSourceV2Relation(
source: DataSourceV2,
output: Seq[AttributeReference],
options: Map[String, String],
userSpecifiedSchema: Option[StructType] = None)
userSpecifiedSchema: Option[StructType])
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {

import DataSourceV2Relation._
Expand All @@ -42,14 +49,7 @@ case class DataSourceV2Relation(

override def simpleString: String = "RelationV2 " + metadataString

lazy val v2Options: DataSourceOptions = makeV2Options(options)

def newReader: DataSourceReader = userSpecifiedSchema match {
case Some(userSchema) =>
source.asReadSupportWithSchema.createReader(userSchema, v2Options)
case None =>
source.asReadSupport.createReader(v2Options)
}
def newReader(): DataSourceReader = source.createReader(options, userSpecifiedSchema)

override def computeStats(): Statistics = newReader match {
case r: SupportsReportStatistics =>
Expand Down Expand Up @@ -139,83 +139,26 @@ object DataSourceV2Relation {
source.getClass.getSimpleName
}
}
}

private def makeV2Options(options: Map[String, String]): DataSourceOptions = {
new DataSourceOptions(options.asJava)
}

private def schema(
source: DataSourceV2,
v2Options: DataSourceOptions,
userSchema: Option[StructType]): StructType = {
val reader = userSchema match {
case Some(s) =>
source.asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
source.asReadSupport.createReader(v2Options)
def createReader(
options: Map[String, String],
userSpecifiedSchema: Option[StructType]): DataSourceReader = {
val v2Options = new DataSourceOptions(options.asJava)
userSpecifiedSchema match {
case Some(s) =>
asReadSupportWithSchema.createReader(s, v2Options)
case _ =>
asReadSupport.createReader(v2Options)
}
}
reader.readSchema()
}

def create(
source: DataSourceV2,
options: Map[String, String],
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val output = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
DataSourceV2Relation(source, output, options, userSpecifiedSchema)
}

def pushRequiredColumns(
relation: DataSourceV2Relation,
reader: DataSourceReader,
struct: StructType): Seq[AttributeReference] = {
reader match {
case projectionSupport: SupportsPushDownRequiredColumns =>
projectionSupport.pruneColumns(struct)
// return the output columns from the relation that were projected
val attrMap = relation.output.map(a => a.name -> a).toMap
projectionSupport.readSchema().map(f => attrMap(f.name))
case _ =>
relation.output
}
}

def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
val pushedFilters = r.pushedCatalystFilters()
(postScanFilters, pushedFilters)

case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = scala.collection.mutable.HashMap.empty[Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = scala.collection.mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
untranslatableExprs += filterExpr
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters =
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)

(untranslatableExprs ++ postScanFilters, pushedFilters)

case _ => (filters, Nil)
}
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
val reader = source.createReader(options, userSpecifiedSchema)
DataSourceV2Relation(
source, reader.readSchema().toAttributes, options, userSpecifiedSchema)
}
}
Expand Up @@ -17,51 +17,115 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.{execution, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet}
import scala.collection.mutable

import org.apache.spark.sql.{sources, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns}

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val projectSet = AttributeSet(project.flatMap(_.references))
val filterSet = AttributeSet(filters.flatMap(_.references))

val projection = if (filterSet.subsetOf(projectSet) &&
AttributeSet(relation.output) == projectSet) {
// When the required projection contains all of the filter columns and column pruning alone
// can produce the required projection, push the required projection.
// A final projection may still be needed if the data source produces a different column
// order or if it cannot prune all of the nested columns.
relation.output
} else {
// When there are filter columns not already in the required projection or when the required
// projection is more complicated than column pruning, base column pruning on the set of
// all columns needed by both.
(projectSet ++ filterSet).toSeq
}

val reader = relation.newReader
/**
* Pushes down filters to the data source reader
*
* @return pushed filter and post-scan filters.
*/
private def pushFilters(
reader: DataSourceReader,
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
reader match {
case r: SupportsPushDownCatalystFilters =>
val postScanFilters = r.pushCatalystFilters(filters.toArray)
val pushedFilters = r.pushedCatalystFilters()
(pushedFilters, postScanFilters)

case r: SupportsPushDownFilters =>
// A map from translated data source filters to original catalyst filter expressions.
val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, Expression]
// Catalyst filter expression that can't be translated to data source filters.
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]

for (filterExpr <- filters) {
val translated = DataSourceStrategy.translateFilter(filterExpr)
if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
untranslatableExprs += filterExpr
}
}

// Data source filters that need to be evaluated again after scanning. which means
// the data source cannot guarantee the rows returned can pass these filters.
// As a result we must return it so Spark can plan an extra filter operator.
val postScanFilters = r.pushFilters(translatedFilterToExpr.keys.toArray)
.map(translatedFilterToExpr)
// The filters which are marked as pushed to this data source
val pushedFilters = r.pushedFilters().map(translatedFilterToExpr)
(pushedFilters, untranslatableExprs ++ postScanFilters)

case _ => (Nil, filters)
}
}

val output = DataSourceV2Relation.pushRequiredColumns(relation, reader,
projection.asInstanceOf[Seq[AttributeReference]].toStructType)
/**
* Applies column pruning to the data source, w.r.t. the references of the given expressions.
*
* @return new output attributes after column pruning.
*/
// TODO: nested column pruning.
private def pruneColumns(
reader: DataSourceReader,
relation: DataSourceV2Relation,
exprs: Seq[Expression]): Seq[AttributeReference] = {
reader match {
case r: SupportsPushDownRequiredColumns =>
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
if (neededOutput != relation.output) {
r.pruneColumns(neededOutput.toStructType)
val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
r.readSchema().toAttributes.map {
// We have to keep the attribute id during transformation.
a => a.withExprId(nameToAttr(a.name).exprId)
}
} else {
relation.output
}

case _ => relation.output
}
}

val (postScanFilters, pushedFilters) = DataSourceV2Relation.pushFilters(reader, filters)

logInfo(s"Post-Scan Filters: ${postScanFilters.mkString(",")}")
logInfo(s"Pushed Filters: ${pushedFilters.mkString(", ")}")
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
val reader = relation.newReader()
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
val output = pruneColumns(reader, relation, project ++ postScanFilters)
logInfo(
s"""
|Pushing operators to ${relation.source.getClass}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)

val scan = DataSourceV2ScanExec(
output, relation.source, relation.options, pushedFilters, reader)

val filter = postScanFilters.reduceLeftOption(And)
val withFilter = filter.map(execution.FilterExec(_, scan)).getOrElse(scan)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

val withProjection = if (withFilter.output != project) {
execution.ProjectExec(project, withFilter)
ProjectExec(project, withFilter)
} else {
withFilter
}
Expand Down

0 comments on commit 3d863f3

Please sign in to comment.