diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index 4b60207a036..38b2b81b42f 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -78,14 +78,19 @@ typedef struct { bool has_max_cardinality; } duckdb_vx_node_statistics; +typedef enum { + HasMaxStringLength = 1 << 0, + HasNull = 1 << 1, +} ColumnStatisticsFlags; + typedef struct { // Set only for strings and primitive types duckdb_value min; duckdb_value max; - // upper bit: "length is set". lower 32 bits: DuckDB's max string length. - // set only for strings - uint64_t max_string_length; - bool has_null; + uint32_t max_string_length; + // 1: has max string length + // 2: has null + ColumnStatisticsFlags flags; } duckdb_column_statistics; const idx_t INVALID_IDX = UINT64_MAX; diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index 92d989d30d4..14ef990388a 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -104,7 +104,7 @@ unique_ptr numeric_stats(duckdb_column_statistics &stats, Logica NumericStats::SetMax(out, UnwrapValue(stats.max)); duckdb_destroy_value(&stats.max); } - if (!stats.has_null) { + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); } return out.ToUnique(); @@ -120,10 +120,10 @@ unique_ptr string_stats(duckdb_column_statistics &stats, Logical StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max))); duckdb_destroy_value(&stats.max); } - if (stats.max_string_length >> 63) { + if (stats.flags & ColumnStatisticsFlags::HasMaxStringLength) { StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length)); } - if (!stats.has_null) { + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); } @@ -132,7 +132,7 @@ unique_ptr string_stats(duckdb_column_statistics &stats, Logical unique_ptr base_stats(duckdb_column_statistics &stats, LogicalType type) { BaseStatistics out = StringStats::CreateUnknown(type); - if (!stats.has_null) { + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); } return out.ToUnique(); diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index a7d3eaf6f68..ef909566a21 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -17,7 +17,7 @@ use custom_labels::CURRENT_LABELSET; use futures::StreamExt; use itertools::Itertools; use num_traits::AsPrimitive; -use tracing::debug; +use tracing::{debug, trace}; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::VortexSessionExecute; @@ -32,6 +32,7 @@ use vortex::dtype::FieldNames; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; +use vortex::error::vortex_panic; use vortex::expr::Expression; use vortex::expr::and_collect; use vortex::expr::col; @@ -41,6 +42,8 @@ use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; use vortex::expr::stats::Stat; +use vortex::file::multi::MultiFileSessionExt; +use vortex::file::multi::VortexFileReaderFactory; use vortex::file::v2::FileStatsLayoutReader; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; @@ -55,6 +58,7 @@ use vortex::scalar_fn::fns::pack::Pack; use vortex::scan::DataSource; use vortex::scan::ScanRequest; use vortex::scan::selection::Selection; +use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::hash_set::HashSet; use vortex_utils::parallelism::get_available_parallelism; @@ -118,6 +122,7 @@ pub struct DataSourceBindData { data_source: Arc, filter_exprs: Vec, column_fields: Vec, + stats_cache: Arc>, } impl Clone for DataSourceBindData { @@ -127,6 +132,7 @@ impl Clone for DataSourceBindData { // filter_exprs are consumed once in `init_global`. filter_exprs: vec![], column_fields: self.column_fields.clone(), + stats_cache: Arc::clone(&self.stats_cache), } } } @@ -191,18 +197,22 @@ impl ColumnStatistics { .vortex_expect("can't convert Scalar to duckdb Value") }); - let max_string_length = stats - .max_string_length - .map_or(0, |len| (1u64 << 63) | (len as u64)); + let max_string_length = stats.max_string_length.unwrap_or(0); + let mut flags = 0u32; + if stats.max_string_length.is_some() { + flags |= 1; + } // Useful estimate if we didn't get null count stats - let has_null = stats.has_null && dtype.is_nullable(); + if stats.has_null && dtype.is_nullable() { + flags |= 2; + } Self { min, max, max_string_length, - has_null, + flags, } } } @@ -219,11 +229,11 @@ pub struct ColumnStatisticsAggregate { impl ColumnStatisticsAggregate { pub fn new(stats: &StatsSet) -> Self { let min = match stats.get(Stat::Min) { - Some(Precision::Exact(min)) => Some(min), + Some(Precision::Exact(min) | Precision::Inexact(min)) => Some(min), _ => None, }; let max = match stats.get(Stat::Max) { - Some(Precision::Exact(max)) => Some(max), + Some(Precision::Exact(max) | Precision::Inexact(max)) => Some(max), _ => None, }; @@ -250,6 +260,41 @@ impl ColumnStatisticsAggregate { has_null, } } + + pub fn merge(&mut self, other: ColumnStatisticsAggregate) { + self.min = match (self.min.take(), other.min) { + (Some(a), Some(b)) => match partial_cmp_scalar_value(&a, &b) { + Some(std::cmp::Ordering::Greater) => Some(b), + Some(_) => Some(a), + None => None, + }, + _ => None, + }; + self.max = match (self.max.take(), other.max) { + (Some(a), Some(b)) => match partial_cmp_scalar_value(&a, &b) { + Some(std::cmp::Ordering::Less) => Some(b), + Some(_) => Some(a), + None => None, + }, + _ => None, + }; + self.max_string_length = match (self.max_string_length, other.max_string_length) { + (Some(a), Some(b)) => Some(a.max(b)), + _ => None, + }; + self.has_null |= other.has_null; + } +} + +fn partial_cmp_scalar_value(left: &ScalarValue, right: &ScalarValue) -> Option { + match (left, right) { + (ScalarValue::Bool(la), ScalarValue::Bool(lb)) => la.partial_cmp(lb), + (ScalarValue::Primitive(la), ScalarValue::Primitive(lb)) => la.partial_cmp(lb), + (ScalarValue::Decimal(la), ScalarValue::Decimal(lb)) => la.partial_cmp(lb), + (ScalarValue::Utf8(la), ScalarValue::Utf8(lb)) => la.partial_cmp(lb), + (ScalarValue::Binary(la), ScalarValue::Binary(lb)) => la.partial_cmp(lb), + _ => None, + } } impl TableFunction for T { @@ -275,6 +320,7 @@ impl TableFunction for T { data_source: Arc::new(data_source), filter_exprs: vec![], column_fields, + stats_cache: Arc::new(DashMap::default()), }) } @@ -520,27 +566,74 @@ impl TableFunction for T { Ok(false) } - /// Get column-wise statistics. Available only if we're reading a single - /// file. fn statistics(bind_data: &Self::BindData, column_index: usize) -> Option { let children = bind_data.data_source.children(); - // Otherwise we'd have to open all files eagerly which is a performance - // regression. Duckdb's Parquet reader only gets metadata for multiple - // files with a UNION BY NAME and we don't support it (yet) - // See duckdb/common/multi_file/multi_file_function.hpp#L691 - if children.len() != 1 { - return None; + debug!( + children = children.len(), + column_index, "requested statistics" + ); + use std::time::Instant; + let now = Instant::now(); + + let dtype = bind_data.column_fields[column_index].dtype.clone(); + if let Some(cache) = bind_data.stats_cache.get(&column_index) { + let stats = ColumnStatistics::from(cache.value(), dtype); + let elapsed = now.elapsed(); + debug!("Cached, took {elapsed:.2?}"); + return Some(stats); } + let MultiLayoutChild::Opened(reader) = &children[0] else { return None; }; - let stats_sets = match reader.as_any().downcast_ref::() { - Some(inner) => inner.file_stats().stats_sets(), - None => return None, + let Some(reader) = reader.as_any().downcast_ref::() else { + vortex_panic!("File reader is not FileStatsLayoutReader"); }; - let stats_aggregate = ColumnStatisticsAggregate::new(&stats_sets[column_index]); - let dtype = bind_data.column_fields[column_index].dtype.clone(); - Some(ColumnStatistics::from(&stats_aggregate, dtype)) + let stats_sets = reader.file_stats().stats_sets(); + let mut stats_aggregate = ColumnStatisticsAggregate::new(&stats_sets[column_index]); + + // Duckdb's statistics() for Parquet return multi-file statistics only + // for UNION BY NAME. However, checking whether we have all footers + // cached is relatively cheap, so we try to always return multi-file + // statistics. + let session = (&*SESSION).multi_file(); + let mut footers = Vec::with_capacity(children.len() - 1); + for child in &children[1..] { + let MultiLayoutChild::Deferred(factory) = child else { + vortex_panic!("Non-first file is opened eagerly"); + }; + let Some(factory) = factory.as_any().downcast_ref::() else { + vortex_panic!("Layout factory is not file factory"); + }; + let path = factory.path(); + let Some(footer) = session.get_footer(&path) else { + trace!(path, "No footer found"); + return None; + }; + if footer.statistics().is_none() { + trace!(path, "No statistics found for footer"); + return None; + }; + // Statistics merge is meaningful work, we want to skip it if any + // footer or footer statistics is missing. + footers.push(footer); + } + + for footer in footers { + let Some(stats_sets) = footer.statistics() else { + vortex_panic!("Statistics for footer disappeared"); + }; + stats_aggregate.merge(ColumnStatisticsAggregate::new( + &stats_sets.stats_sets()[column_index], + )); + } + + let stats = ColumnStatistics::from(&stats_aggregate, dtype); + bind_data.stats_cache.insert(column_index, stats_aggregate); + + let elapsed = now.elapsed(); + debug!("Took {elapsed:.2?}"); + Some(stats) } fn cardinality(bind_data: &Self::BindData) -> Cardinality { diff --git a/vortex-duckdb/src/duckdb/macro_.rs b/vortex-duckdb/src/duckdb/macro_.rs index 11587f8efb1..824d5cf4b56 100644 --- a/vortex-duckdb/src/duckdb/macro_.rs +++ b/vortex-duckdb/src/duckdb/macro_.rs @@ -51,11 +51,9 @@ macro_rules! lifetime_wrapper { } } - // --- Owned handle (calls destructor on drop) --- - $(#[$meta])* #[allow(dead_code)] - pub struct $Name($ffi_type); + pub struct $Name(std::ptr::NonNull); #[allow(dead_code)] impl $Name { @@ -71,7 +69,7 @@ macro_rules! lifetime_wrapper { "Attempted to create an owned wrapper from a null pointer" ); } - Self(ptr) + Self(unsafe { std::ptr::NonNull::new_unchecked(ptr.cast()) }) } /// Borrows the pointer as an immutable reference with explicit lifetime. @@ -107,7 +105,7 @@ macro_rules! lifetime_wrapper { /// calling the destructor. pub fn into_ptr(self) -> $ffi_type { let this = std::mem::ManuallyDrop::new(self); - this.0 + (*this).0.as_ptr().cast() } } @@ -115,27 +113,23 @@ macro_rules! lifetime_wrapper { type Target = [<$Name Ref>]; fn deref(&self) -> &Self::Target { - // SAFETY: The opaque [<$Name Ref>] is a ZST and the pointer is valid - // for the lifetime of the owned handle. - unsafe { &*(self.0 as *const [<$Name Ref>]) } + unsafe { &*(self.0.as_ptr() as *const [<$Name Ref>]) } } } impl std::ops::DerefMut for $Name { fn deref_mut(&mut self) -> &mut Self::Target { - // SAFETY: The opaque [<$Name Ref>] is a ZST and the pointer is valid - // for the lifetime of the owned handle. We have &mut self so - // exclusive access is guaranteed. - unsafe { &mut *(self.0 as *mut [<$Name Ref>]) } + unsafe { &mut *(self.0.as_ptr() as *mut [<$Name Ref>]) } } } impl Drop for $Name { fn drop(&mut self) { let destructor = $destructor; + let mut ptr: $ffi_type = self.0.as_ptr().cast(); #[allow(unused_unsafe)] unsafe { - destructor(&mut self.0) + destructor(&mut ptr) } } } diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 986ac64d100..54a9e8ff4ac 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -39,8 +39,10 @@ pub struct PartitionData { pub struct ColumnStatistics { pub min: Option, pub max: Option, - pub max_string_length: u64, - pub has_null: bool, + pub max_string_length: u32, + // 1: has string length + // 2: has null + pub flags: u32, } // String map lifetime is managed by C++ code @@ -196,7 +198,7 @@ unsafe extern "C-unwind" fn statistics( stats_out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr()); stats_out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr()); stats_out.max_string_length = stats.max_string_length; - stats_out.has_null = stats.has_null; + stats_out.flags = stats.flags; true } diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 410d241a766..f1f1f38b535 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -6,7 +6,6 @@ use std::ffi::CStr; use std::ffi::c_char; use std::sync::LazyLock; -use std::sync::OnceLock; use vortex::VortexSessionDefault; use vortex::error::VortexExpect; @@ -50,14 +49,11 @@ static SESSION: LazyLock = // would be hard to integrate with tracing::. We use logging for // debugging only anyway, so that's good enough. fn init_tracing() { - static ONCE: OnceLock<()> = OnceLock::new(); - ONCE.get_or_init(|| { - drop( - tracing_subscriber::fmt() - .with_writer(std::io::stdout) - .try_init(), - ); - }); + drop( + tracing_subscriber::fmt() + .with_writer(std::io::stdout) + .try_init(), + ) } /// Initialize the Vortex extension by registering the extension functions. diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 78c18ea438e..ec66db6e632 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -5,11 +5,13 @@ mod session; +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; use futures::TryStreamExt; -use session::MultiFileSessionExt; +pub use session::MultiFileSession; +pub use session::MultiFileSessionExt; use tracing::debug; use vortex_error::VortexResult; use vortex_error::vortex_bail; @@ -252,13 +254,19 @@ fn layout_reader_with_stats(file: &crate::VortexFile) -> VortexResult VortexOpenOptions + Send + Sync>, } +impl VortexFileReaderFactory { + pub fn path(&self) -> String { + self.file.path.clone() + } +} + #[async_trait] impl LayoutReaderFactory for VortexFileReaderFactory { async fn open(&self) -> VortexResult> { @@ -271,4 +279,8 @@ impl LayoutReaderFactory for VortexFileReaderFactory { .await?; Ok(Some(layout_reader_with_stats(&file)?)) } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/vortex-file/src/multi/session.rs b/vortex-file/src/multi/session.rs index 7023d3cd020..7829b75ff04 100644 --- a/vortex-file/src/multi/session.rs +++ b/vortex-file/src/multi/session.rs @@ -22,7 +22,7 @@ use crate::footer::Footer; /// /// Consider generalizing this cache into [`VortexOpenOptions`](crate::VortexOpenOptions) so /// that single-file opens also benefit from session-level footer caching. -pub(super) struct MultiFileSession { +pub struct MultiFileSession { footer_cache: moka::sync::Cache, } @@ -74,7 +74,7 @@ impl SessionVar for MultiFileSession { } /// Extension trait for accessing the [`MultiFileSession`] from a session. -pub(super) trait MultiFileSessionExt: SessionExt { +pub trait MultiFileSessionExt: SessionExt { /// Returns a reference to the [`MultiFileSession`] state. fn multi_file(&self) -> vortex_session::Ref<'_, MultiFileSession> { self.get::() diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 38503c77a63..b2827cc6a8e 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -70,6 +70,7 @@ const DEFAULT_CONCURRENCY: usize = 8; pub trait LayoutReaderFactory: 'static + Send + Sync { /// Opens the layout reader, or returns `None` if it should be skipped. async fn open(&self) -> VortexResult>; + fn as_any(&self) -> &dyn Any; } /// A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.