From fd87612bf5dd486754f2a36f57bd68641d471b17 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 30 Apr 2026 15:35:45 +0100 Subject: [PATCH] Actually wire the pluggable expression convertor Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/source.rs | 105 +++++++++++++++++++-- 1 file changed, 99 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 0834529af3e..aed2024d5d9 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -301,15 +301,13 @@ impl VortexSource { self.options = opts; self } -} -impl FileSource for VortexSource { - fn create_file_opener( + fn create_vortex_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> DFResult> { + ) -> DFResult { let batch_size = self .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); @@ -339,13 +337,28 @@ impl FileSource for VortexSource { layout_readers: Arc::clone(&self.layout_readers), natural_split_ranges: Arc::clone(&self.natural_split_ranges), has_output_ordering: !base_config.output_ordering.is_empty(), - expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + expression_convertor: Arc::clone(&self.expression_convertor), file_metadata_cache: self.file_metadata_cache.clone(), projection_pushdown: self.options.projection_pushdown, scan_concurrency: self.options.scan_concurrency, }; - Ok(Arc::new(opener)) + Ok(opener) + } +} + +impl FileSource for VortexSource { + fn create_file_opener( + &self, + object_store: Arc, + base_config: &FileScanConfig, + partition: usize, + ) -> DFResult> { + Ok(Arc::new(self.create_vortex_opener( + object_store, + base_config, + partition, + )?)) } fn as_any(&self) -> &dyn Any { @@ -477,3 +490,83 @@ impl FileSource for VortexSource { &self.table_schema } } + +#[cfg(test)] +mod tests { + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_execution::object_store::ObjectStoreUrl; + use object_store::memory::InMemory; + use vortex::VortexSessionDefault; + + use super::*; + use crate::convert::exprs::ProcessedProjection; + + struct TrackingExpressionConvertor { + inner: DefaultExpressionConvertor, + } + + impl ExpressionConvertor for TrackingExpressionConvertor { + fn can_be_pushed_down(&self, expr: &PhysicalExprRef, schema: &Schema) -> bool { + self.inner.can_be_pushed_down(expr, schema) + } + + fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult { + self.inner.convert(expr) + } + + fn split_projection( + &self, + source_projection: ProjectionExprs, + input_schema: &Schema, + output_schema: &Schema, + ) -> DFResult { + self.inner + .split_projection(source_projection, input_schema, output_schema) + } + + fn no_pushdown_projection( + &self, + source_projection: ProjectionExprs, + input_schema: &Schema, + ) -> DFResult { + self.inner + .no_pushdown_projection(source_projection, input_schema) + } + } + + #[test] + fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> { + let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let expression_convertor = Arc::new(TrackingExpressionConvertor { + inner: DefaultExpressionConvertor::default(), + }) as Arc; + + let mut source = VortexSource::new( + TableSchema::from_file_schema(file_schema), + VortexSession::default(), + ) + .with_expression_convertor(Arc::clone(&expression_convertor)); + source.batch_size = Some(100); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(source.clone()), + ) + .build(); + + let opener = source.create_vortex_opener( + Arc::new(InMemory::new()) as Arc, + &config, + 0, + )?; + + assert!(Arc::ptr_eq( + &opener.expression_convertor, + &expression_convertor + )); + Ok(()) + } +}