diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 7c9988cd2688..2e00c80e4f98 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -30,9 +30,7 @@ use datafusion_common::{internal_err, Result}; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{ - file::{FileSource, FileSourceFilterPushdownResult}, - file_scan_config::FileScanConfig, - file_stream::FileOpener, + file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, }; use datafusion_expr::test::function_stub::count_udaf; use datafusion_physical_expr::expressions::col; @@ -47,6 +45,7 @@ use datafusion_physical_plan::{ coalesce_batches::CoalesceBatchesExec, filter::FilterExec, repartition::RepartitionExec, + FilterPushdownResult, }; use datafusion_physical_plan::{ displayable, filter_pushdown::FilterPushdownSupport, @@ -150,7 +149,7 @@ impl FileSource for TestSource { &self, filters: &[PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { let support = match self.support { Some(support) => support, None => { @@ -166,10 +165,7 @@ impl FileSource for TestSource { predicate: Some(conjunction(filters.iter().map(Arc::clone))), statistics: self.statistics.clone(), }); - Ok(FileSourceFilterPushdownResult::new( - new, - vec![support; filters.len()], - )) + Ok(FilterPushdownResult::new(new, vec![support; filters.len()])) } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index af0c68607edf..f360191873ac 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -103,9 +103,7 @@ pub trait FileSource: Send + Sync { &self, _filters: &[PhysicalExprRef], _config: &ConfigOptions, - ) -> Result { - Ok(FileSourceFilterPushdownResult::NotPushed) + ) -> Result>> { + Ok(FilterPushdownResult::NotPushed) } } - -pub type FileSourceFilterPushdownResult = FilterPushdownResult>; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f8235e7cff67..494171a2f062 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -46,20 +46,20 @@ use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, - DisplayAs, DisplayFormatType, ExecutionPlan, + DisplayAs, DisplayFormatType, ExecutionPlan, FilterPushdownResult, }; use log::{debug, warn}; +use crate::file_groups::FileGroup; use crate::{ display::FileGroupsDisplay, - file::{FileSource, FileSourceFilterPushdownResult}, + file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStream, source::{DataSource, DataSourceExec}, statistics::MinMaxStatistics, PartitionedFile, }; -use crate::{file_groups::FileGroup, source::DataSourceFilterPushdownResult}; /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. @@ -591,18 +591,16 @@ impl DataSource for FileScanConfig { &self, filters: &[PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { match self.file_source.try_pushdown_filters(filters, config)? { - FileSourceFilterPushdownResult::NotPushed => { - Ok(DataSourceFilterPushdownResult::NotPushed) - } - FileSourceFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::NotPushed => Ok(FilterPushdownResult::NotPushed), + FilterPushdownResult::Pushed { inner, support } => { let new_self = Arc::new( FileScanConfigBuilder::from(self.clone()) .with_source(inner) .build(), ); - Ok(DataSourceFilterPushdownResult::Pushed { + Ok(FilterPushdownResult::Pushed { inner: new_self, support, }) diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 9df818c941a0..d3bcc9a75c36 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -26,8 +26,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanFilterPushdownResult, - FilterPushdownResult, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, FilterPushdownResult, PlanProperties, }; use crate::file_scan_config::FileScanConfig; @@ -86,13 +85,11 @@ pub trait DataSource: Send + Sync + Debug { &self, _filters: &[PhysicalExprRef], _config: &ConfigOptions, - ) -> Result { - Ok(DataSourceFilterPushdownResult::NotPushed) + ) -> Result>> { + Ok(FilterPushdownResult::NotPushed) } } -pub type DataSourceFilterPushdownResult = FilterPushdownResult>; - /// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET /// /// `DataSourceExec` implements common functionality such as applying projections, @@ -210,17 +207,15 @@ impl ExecutionPlan for DataSourceExec { _plan: &Arc, parent_filters: &[PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { match self .data_source .try_pushdown_filters(parent_filters, config)? { - DataSourceFilterPushdownResult::NotPushed => { - Ok(ExecutionPlanFilterPushdownResult::NotPushed) - } - DataSourceFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::NotPushed => Ok(FilterPushdownResult::NotPushed), + FilterPushdownResult::Pushed { inner, support } => { let new_self = Arc::new(DataSourceExec::new(inner)); - Ok(ExecutionPlanFilterPushdownResult::Pushed { + Ok(FilterPushdownResult::Pushed { inner: new_self, support, }) diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index c01f14e79c8d..2c486cc19ccb 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -18,9 +18,7 @@ use std::sync::Arc; use datafusion_common::{config::ConfigOptions, DataFusionError, Result}; -use datafusion_physical_plan::{ - execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan, -}; +use datafusion_physical_plan::{ExecutionPlan, FilterPushdownResult}; use crate::PhysicalOptimizerRule; @@ -48,8 +46,8 @@ impl PhysicalOptimizerRule for PushdownFilter { config: &ConfigOptions, ) -> Result> { match plan.try_pushdown_filters(&plan, &Vec::new(), config)? { - ExecutionPlanFilterPushdownResult::NotPushed => Ok(plan), - ExecutionPlanFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::NotPushed => Ok(plan), + FilterPushdownResult::Pushed { inner, support } => { if !support.is_empty() { return Err( DataFusionError::Plan( diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 77dabb72aa76..e8246fe4aa1a 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; +use super::{ + DisplayAs, ExecutionPlanProperties, FilterPushdownResult, PlanProperties, Statistics, +}; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; @@ -219,7 +221,7 @@ impl ExecutionPlan for CoalesceBatchesExec { plan: &Arc, parent_filters: &[datafusion_physical_expr::PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { try_pushdown_filters_to_input(plan, &self.input, parent_filters, config) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 278b4607c955..d41437dab5b3 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -833,7 +833,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { plan: &Arc, parent_filters: &[PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { // By default assume that: // * Parent filters can't be passed onto children. // * We have no filters to contribute. @@ -843,11 +843,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { let mut pushed = false; for child in self.children() { match child.try_pushdown_filters(child, &Vec::new(), config)? { - ExecutionPlanFilterPushdownResult::NotPushed => { + FilterPushdownResult::NotPushed => { // No pushdown possible, keep this child as is new_children.push(Arc::clone(child)); } - ExecutionPlanFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::Pushed { inner, support } => { // We have a child that has pushed down some filters new_children.push(inner); pushed = true; @@ -863,18 +863,16 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { if pushed { let new_inner = with_new_children_if_necessary(Arc::clone(plan), new_children)?; - Ok(ExecutionPlanFilterPushdownResult::Pushed { + Ok(FilterPushdownResult::Pushed { inner: new_inner, support: vec![FilterPushdownSupport::Unsupported; parent_filters.len()], }) } else { - Ok(ExecutionPlanFilterPushdownResult::NotPushed) + Ok(FilterPushdownResult::NotPushed) } } } -pub type ExecutionPlanFilterPushdownResult = FilterPushdownResult>; - /// A default implementation of [`ExecutionPlan::try_pushdown_filters`] that /// pushes down filters transparently to an input. /// @@ -887,17 +885,17 @@ pub fn try_pushdown_filters_to_input( input: &Arc, parent_filters: &[PhysicalExprRef], config: &ConfigOptions, -) -> Result { +) -> Result>> { match input.try_pushdown_filters(input, parent_filters, config)? { - ExecutionPlanFilterPushdownResult::NotPushed => { + FilterPushdownResult::NotPushed => { // No pushdown possible, keep this child as is - Ok(ExecutionPlanFilterPushdownResult::NotPushed) + Ok(FilterPushdownResult::NotPushed) } - ExecutionPlanFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::Pushed { inner, support } => { // We have a child that has pushed down some filters let new_inner = with_new_children_if_necessary(Arc::clone(plan), vec![inner])?; - Ok(ExecutionPlanFilterPushdownResult::Pushed { + Ok(FilterPushdownResult::Pushed { inner: new_inner, support, }) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5cf6064af9b4..46d439430017 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -21,8 +21,8 @@ use std::sync::Arc; use std::task::{ready, Context, Poll}; use super::{ - ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, ExecutionPlanProperties, FilterPushdownResult, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::common::can_project; use crate::execution_plan::CardinalityEffect; @@ -31,7 +31,6 @@ use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, ProjectionExec, }; -use crate::ExecutionPlanFilterPushdownResult; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, @@ -442,7 +441,7 @@ impl ExecutionPlan for FilterExec { _plan: &Arc, parent_filters: &[PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { // filters are in terms of the output columns of this plan let mut all_filters = parent_filters.to_vec(); all_filters.push(Arc::clone(&self.predicate)); @@ -460,13 +459,13 @@ impl ExecutionPlan for FilterExec { .input .try_pushdown_filters(&self.input, &all_filters, config)? { - ExecutionPlanFilterPushdownResult::NotPushed => { + FilterPushdownResult::NotPushed => { if parent_filters.is_empty() { - return Ok(ExecutionPlanFilterPushdownResult::NotPushed); + return Ok(FilterPushdownResult::NotPushed); } (conjunction(all_filters), Arc::clone(&self.input)) } - ExecutionPlanFilterPushdownResult::Pushed { inner, support } => { + FilterPushdownResult::Pushed { inner, support } => { // Split out the filters that the child plan handled and the ones it did not let unhandled_filters = all_filters .into_iter() @@ -481,7 +480,7 @@ impl ExecutionPlan for FilterExec { .collect::>(); // If there are no unhandled filters and we have no projection, return the inner plan if unhandled_filters.is_empty() && self.projection.is_none() { - return Ok(ExecutionPlanFilterPushdownResult::Pushed { + return Ok(FilterPushdownResult::Pushed { inner, support: vec![ FilterPushdownSupport::Exact; @@ -508,7 +507,7 @@ impl ExecutionPlan for FilterExec { cache, projection: self.projection.clone(), }; - Ok(ExecutionPlanFilterPushdownResult::Pushed { + Ok(FilterPushdownResult::Pushed { inner: Arc::new(new_self), support: vec![FilterPushdownSupport::Exact; parent_filters.len()], }) diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 64e3d6f39a18..d2f52d79d988 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -43,8 +43,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDi pub use crate::execution_plan::{ collect, collect_partitioned, displayable, execute_input_stream, execute_stream, execute_stream_partitioned, get_plan_string, with_new_children_if_necessary, - ExecutionPlan, ExecutionPlanFilterPushdownResult, ExecutionPlanProperties, - PlanProperties, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; pub use crate::filter_pushdown::FilterPushdownResult; pub use crate::metrics::Metric; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 53d53900af2d..c5e59f4a265a 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -27,7 +27,8 @@ use std::{any::Any, vec}; use super::common::SharedMemoryReservation; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ - DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, + DisplayAs, ExecutionPlanProperties, FilterPushdownResult, RecordBatchStream, + SendableRecordBatchStream, }; use crate::execution_plan::{try_pushdown_filters_to_input, CardinalityEffect}; use crate::hash_utils::create_hashes; @@ -730,7 +731,7 @@ impl ExecutionPlan for RepartitionExec { plan: &Arc, parent_filters: &[datafusion_physical_expr::PhysicalExprRef], config: &ConfigOptions, - ) -> Result { + ) -> Result>> { try_pushdown_filters_to_input(plan, &self.input, parent_filters, config) } }