From babfac82646d4e7c837d68cbf97137b525d498bd Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 3 Jun 2026 15:08:04 +0100 Subject: [PATCH] Try pushdown sort for DF Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 66 +++++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index aed2024d5d9..70e505b76c0 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -14,13 +14,17 @@ use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_execution::cache::cache_manager::FileMetadataCache; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::conjunction; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; @@ -190,6 +194,7 @@ pub struct VortexSource { natural_split_ranges: Arc]>>>, expression_convertor: Arc, pub(crate) vortex_reader_factory: Option>, + pub(crate) ordered: bool, vx_metrics_registry: Arc, file_metadata_cache: Option>, /// Whether to enable expression pushdown into the underlying Vortex scan. @@ -224,6 +229,7 @@ impl VortexSource { vortex_reader_factory: None, vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()), file_metadata_cache: None, + ordered: false, options: VortexTableOptions::default(), } } @@ -336,7 +342,7 @@ impl VortexSource { metrics_registry: Arc::clone(&self.vx_metrics_registry), layout_readers: Arc::clone(&self.layout_readers), natural_split_ranges: Arc::clone(&self.natural_split_ranges), - has_output_ordering: !base_config.output_ordering.is_empty(), + has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered, expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), projection_pushdown: self.options.projection_pushdown, @@ -383,6 +389,64 @@ impl FileSource for VortexSource { VORTEX_FILE_EXTENSION } + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + eq_properties: &EquivalenceProperties, + ) -> DFResult>> { + if order.is_empty() { + return Ok(SortOrderPushdownResult::Unsupported); + } + + if eq_properties.ordering_satisfy(order.iter().cloned())? { + let mut this = self.clone(); + this.ordered = true; + + return Ok(SortOrderPushdownResult::Exact { + inner: Arc::new(this) as Arc, + }); + } + + for prefix_len in 1..order.len() { + let prefix = order[..prefix_len].to_vec(); + if eq_properties.ordering_satisfy(prefix.iter().cloned())? { + return Ok(SortOrderPushdownResult::Unsupported); + } + } + + let sort_order = LexOrdering::new(order.iter().cloned()); + let column_in_file_schema = sort_order.as_ref().is_some_and(|s| { + s.first() + .expr + .as_any() + .downcast_ref::() + .is_some_and(|col| { + self.table_schema + .file_schema() + .field_with_name(col.name()) + .is_ok() + }) + }); + + if !column_in_file_schema { + return Ok(SortOrderPushdownResult::Unsupported); + } + + let is_descending = sort_order + .as_ref() + .is_some_and(|s| s.first().options.descending); + + if !is_descending { + let mut this = self.clone(); + this.ordered = true; + return Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(this) as Arc, + }); + } + + Ok(SortOrderPushdownResult::Unsupported) + } + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => {