Skip to content

Commit

Permalink
First round of code review feedback
Browse files Browse the repository at this point in the history
Changes:
- Predicates that are pushed down to Parquet are now kept track off
- Predicates which are pushed down are removed from the higher-level
  filters
- Filter enable setting renamed to "spark.sql.hints.parquetFilterPushdown"
- Smaller changes, code formatting, imports, etc.
  • Loading branch information
AndreSchumacher committed May 16, 2014
1 parent b0f7806 commit a86553b
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
// Note: we do not actually remove the filters that were pushed down to Parquet from
// the plan, in case that some of the predicates cannot be evaluated there because
// they contain complex operations, such as CASTs.
// TODO: rethink whether conjuntions that are handed down to Parquet should be removed
// from the list of higher-level filters.
pruneFilterProject(
projectList,
filters,
ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
val remainingFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
filters.filter {
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
// all filters that have been pushed down. Note that a predicate such as
// "A AND B" can result in "A" being pushed down.
filter =>
val recordFilter = ParquetFilters.createFilter(filter)
if (!recordFilter.isDefined) {
// First case: the pushdown did not result in any record filter.
true
} else {
// Second case: a record filter was created; here we are conservative in
// the sense that even if "A" was pushed and we check for "A AND B" we
// still want to keep "A AND B" in the higher-level filter, not just "B".
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
}
}
} else {
filters
}
pruneFilterProject(
projectList,
remainingFilters,
ParquetTableScan(_, relation, Some(filters))(sparkContext)) :: Nil
}

case _ => Nil
}
}
Expand Down
Loading

0 comments on commit a86553b

Please sign in to comment.