Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use datafusion_common::{internal_err, Result};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::{
file::{FileSource, FileSourceFilterPushdownResult},
file_scan_config::FileScanConfig,
file_stream::FileOpener,
file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener,
};
use datafusion_expr::test::function_stub::count_udaf;
use datafusion_physical_expr::expressions::col;
Expand All @@ -47,6 +45,7 @@ use datafusion_physical_plan::{
coalesce_batches::CoalesceBatchesExec,
filter::FilterExec,
repartition::RepartitionExec,
FilterPushdownResult,
};
use datafusion_physical_plan::{
displayable, filter_pushdown::FilterPushdownSupport,
Expand Down Expand Up @@ -150,7 +149,7 @@ impl FileSource for TestSource {
&self,
filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<FileSourceFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
let support = match self.support {
Some(support) => support,
None => {
Expand All @@ -166,10 +165,7 @@ impl FileSource for TestSource {
predicate: Some(conjunction(filters.iter().map(Arc::clone))),
statistics: self.statistics.clone(),
});
Ok(FileSourceFilterPushdownResult::new(
new,
vec![support; filters.len()],
))
Ok(FilterPushdownResult::new(new, vec![support; filters.len()]))
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ pub trait FileSource: Send + Sync {
&self,
_filters: &[PhysicalExprRef],
_config: &ConfigOptions,
) -> Result<FileSourceFilterPushdownResult> {
Ok(FileSourceFilterPushdownResult::NotPushed)
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
Ok(FilterPushdownResult::NotPushed)
}
}

pub type FileSourceFilterPushdownResult = FilterPushdownResult<Arc<dyn FileSource>>;
16 changes: 7 additions & 9 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
metrics::ExecutionPlanMetricsSet,
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
DisplayAs, DisplayFormatType, ExecutionPlan,
DisplayAs, DisplayFormatType, ExecutionPlan, FilterPushdownResult,
};
use log::{debug, warn};

use crate::file_groups::FileGroup;
use crate::{
display::FileGroupsDisplay,
file::{FileSource, FileSourceFilterPushdownResult},
file::FileSource,
file_compression_type::FileCompressionType,
file_stream::FileStream,
source::{DataSource, DataSourceExec},
statistics::MinMaxStatistics,
PartitionedFile,
};
use crate::{file_groups::FileGroup, source::DataSourceFilterPushdownResult};

/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
Expand Down Expand Up @@ -591,18 +591,16 @@ impl DataSource for FileScanConfig {
&self,
filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<DataSourceFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
match self.file_source.try_pushdown_filters(filters, config)? {
FileSourceFilterPushdownResult::NotPushed => {
Ok(DataSourceFilterPushdownResult::NotPushed)
}
FileSourceFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::NotPushed => Ok(FilterPushdownResult::NotPushed),
FilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(
FileScanConfigBuilder::from(self.clone())
.with_source(inner)
.build(),
);
Ok(DataSourceFilterPushdownResult::Pushed {
Ok(FilterPushdownResult::Pushed {
inner: new_self,
support,
})
Expand Down
19 changes: 7 additions & 12 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanFilterPushdownResult,
FilterPushdownResult, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, FilterPushdownResult, PlanProperties,
};

use crate::file_scan_config::FileScanConfig;
Expand Down Expand Up @@ -86,13 +85,11 @@ pub trait DataSource: Send + Sync + Debug {
&self,
_filters: &[PhysicalExprRef],
_config: &ConfigOptions,
) -> Result<DataSourceFilterPushdownResult> {
Ok(DataSourceFilterPushdownResult::NotPushed)
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
Ok(FilterPushdownResult::NotPushed)
}
}

pub type DataSourceFilterPushdownResult = FilterPushdownResult<Arc<dyn DataSource>>;

/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET
///
/// `DataSourceExec` implements common functionality such as applying projections,
Expand Down Expand Up @@ -210,17 +207,15 @@ impl ExecutionPlan for DataSourceExec {
_plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
match self
.data_source
.try_pushdown_filters(parent_filters, config)?
{
DataSourceFilterPushdownResult::NotPushed => {
Ok(ExecutionPlanFilterPushdownResult::NotPushed)
}
DataSourceFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::NotPushed => Ok(FilterPushdownResult::NotPushed),
FilterPushdownResult::Pushed { inner, support } => {
let new_self = Arc::new(DataSourceExec::new(inner));
Ok(ExecutionPlanFilterPushdownResult::Pushed {
Ok(FilterPushdownResult::Pushed {
inner: new_self,
support,
})
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
use std::sync::Arc;

use datafusion_common::{config::ConfigOptions, DataFusionError, Result};
use datafusion_physical_plan::{
execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan,
};
use datafusion_physical_plan::{ExecutionPlan, FilterPushdownResult};

use crate::PhysicalOptimizerRule;

Expand Down Expand Up @@ -48,8 +46,8 @@ impl PhysicalOptimizerRule for PushdownFilter {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
match plan.try_pushdown_filters(&plan, &Vec::new(), config)? {
ExecutionPlanFilterPushdownResult::NotPushed => Ok(plan),
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::NotPushed => Ok(plan),
FilterPushdownResult::Pushed { inner, support } => {
if !support.is_empty() {
return Err(
DataFusionError::Plan(
Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use super::{
DisplayAs, ExecutionPlanProperties, FilterPushdownResult, PlanProperties, Statistics,
};
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -219,7 +221,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[datafusion_physical_expr::PhysicalExprRef],
config: &ConfigOptions,
) -> Result<crate::ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
try_pushdown_filters_to_input(plan, &self.input, parent_filters, config)
}
}
Expand Down
22 changes: 10 additions & 12 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
// By default assume that:
// * Parent filters can't be passed onto children.
// * We have no filters to contribute.
Expand All @@ -843,11 +843,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
let mut pushed = false;
for child in self.children() {
match child.try_pushdown_filters(child, &Vec::new(), config)? {
ExecutionPlanFilterPushdownResult::NotPushed => {
FilterPushdownResult::NotPushed => {
// No pushdown possible, keep this child as is
new_children.push(Arc::clone(child));
}
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::Pushed { inner, support } => {
// We have a child that has pushed down some filters
new_children.push(inner);
pushed = true;
Expand All @@ -863,18 +863,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
if pushed {
let new_inner =
with_new_children_if_necessary(Arc::clone(plan), new_children)?;
Ok(ExecutionPlanFilterPushdownResult::Pushed {
Ok(FilterPushdownResult::Pushed {
inner: new_inner,
support: vec![FilterPushdownSupport::Unsupported; parent_filters.len()],
})
} else {
Ok(ExecutionPlanFilterPushdownResult::NotPushed)
Ok(FilterPushdownResult::NotPushed)
}
}
}

pub type ExecutionPlanFilterPushdownResult = FilterPushdownResult<Arc<dyn ExecutionPlan>>;

/// A default implementation of [`ExecutionPlan::try_pushdown_filters`] that
/// pushes down filters transparently to an input.
///
Expand All @@ -887,17 +885,17 @@ pub fn try_pushdown_filters_to_input(
input: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
match input.try_pushdown_filters(input, parent_filters, config)? {
ExecutionPlanFilterPushdownResult::NotPushed => {
FilterPushdownResult::NotPushed => {
// No pushdown possible, keep this child as is
Ok(ExecutionPlanFilterPushdownResult::NotPushed)
Ok(FilterPushdownResult::NotPushed)
}
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::Pushed { inner, support } => {
// We have a child that has pushed down some filters
let new_inner =
with_new_children_if_necessary(Arc::clone(plan), vec![inner])?;
Ok(ExecutionPlanFilterPushdownResult::Pushed {
Ok(FilterPushdownResult::Pushed {
inner: new_inner,
support,
})
Expand Down
17 changes: 8 additions & 9 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::sync::Arc;
use std::task::{ready, Context, Poll};

use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayAs, ExecutionPlanProperties, FilterPushdownResult,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
Expand All @@ -31,7 +31,6 @@ use crate::projection::{
make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
ProjectionExec,
};
use crate::ExecutionPlanFilterPushdownResult;
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan,
Expand Down Expand Up @@ -442,7 +441,7 @@ impl ExecutionPlan for FilterExec {
_plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[PhysicalExprRef],
config: &ConfigOptions,
) -> Result<ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
// filters are in terms of the output columns of this plan
let mut all_filters = parent_filters.to_vec();
all_filters.push(Arc::clone(&self.predicate));
Expand All @@ -460,13 +459,13 @@ impl ExecutionPlan for FilterExec {
.input
.try_pushdown_filters(&self.input, &all_filters, config)?
{
ExecutionPlanFilterPushdownResult::NotPushed => {
FilterPushdownResult::NotPushed => {
if parent_filters.is_empty() {
return Ok(ExecutionPlanFilterPushdownResult::NotPushed);
return Ok(FilterPushdownResult::NotPushed);
}
(conjunction(all_filters), Arc::clone(&self.input))
}
ExecutionPlanFilterPushdownResult::Pushed { inner, support } => {
FilterPushdownResult::Pushed { inner, support } => {
// Split out the filters that the child plan handled and the ones it did not
let unhandled_filters = all_filters
.into_iter()
Expand All @@ -481,7 +480,7 @@ impl ExecutionPlan for FilterExec {
.collect::<Vec<_>>();
// If there are no unhandled filters and we have no projection, return the inner plan
if unhandled_filters.is_empty() && self.projection.is_none() {
return Ok(ExecutionPlanFilterPushdownResult::Pushed {
return Ok(FilterPushdownResult::Pushed {
inner,
support: vec![
FilterPushdownSupport::Exact;
Expand All @@ -508,7 +507,7 @@ impl ExecutionPlan for FilterExec {
cache,
projection: self.projection.clone(),
};
Ok(ExecutionPlanFilterPushdownResult::Pushed {
Ok(FilterPushdownResult::Pushed {
inner: Arc::new(new_self),
support: vec![FilterPushdownSupport::Exact; parent_filters.len()],
})
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDi
pub use crate::execution_plan::{
collect, collect_partitioned, displayable, execute_input_stream, execute_stream,
execute_stream_partitioned, get_plan_string, with_new_children_if_necessary,
ExecutionPlan, ExecutionPlanFilterPushdownResult, ExecutionPlanProperties,
PlanProperties,
ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
pub use crate::filter_pushdown::FilterPushdownResult;
pub use crate::metrics::Metric;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use std::{any::Any, vec};
use super::common::SharedMemoryReservation;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, ExecutionPlanProperties, FilterPushdownResult, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::execution_plan::{try_pushdown_filters_to_input, CardinalityEffect};
use crate::hash_utils::create_hashes;
Expand Down Expand Up @@ -730,7 +731,7 @@ impl ExecutionPlan for RepartitionExec {
plan: &Arc<dyn ExecutionPlan>,
parent_filters: &[datafusion_physical_expr::PhysicalExprRef],
config: &ConfigOptions,
) -> Result<crate::ExecutionPlanFilterPushdownResult> {
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
try_pushdown_filters_to_input(plan, &self.input, parent_filters, config)
}
}
Expand Down