Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,19 @@ typedef struct {
bool has_max_cardinality;
} duckdb_vx_node_statistics;

typedef enum {
HasMaxStringLength = 1 << 0,
HasNull = 1 << 1,
} ColumnStatisticsFlags;

typedef struct {
// Set only for strings and primitive types
duckdb_value min;
duckdb_value max;
// upper bit: "length is set". lower 32 bits: DuckDB's max string length.
// set only for strings
uint64_t max_string_length;
bool has_null;
uint32_t max_string_length;
// 1: has max string length
// 2: has null
ColumnStatisticsFlags flags;
} duckdb_column_statistics;

const idx_t INVALID_IDX = UINT64_MAX;
Expand Down
8 changes: 4 additions & 4 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, Logica
NumericStats::SetMax(out, UnwrapValue(stats.max));
duckdb_destroy_value(&stats.max);
}
if (!stats.has_null) {
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
Expand All @@ -120,10 +120,10 @@ unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, Logical
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
duckdb_destroy_value(&stats.max);
}
if (stats.max_string_length >> 63) {
if (stats.flags & ColumnStatisticsFlags::HasMaxStringLength) {
StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length));
}
if (!stats.has_null) {
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}

Expand All @@ -132,7 +132,7 @@ unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, Logical

unique_ptr<BaseStatistics> base_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (!stats.has_null) {
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
Expand Down
137 changes: 115 additions & 22 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use custom_labels::CURRENT_LABELSET;
use futures::StreamExt;
use itertools::Itertools;
use num_traits::AsPrimitive;
use tracing::debug;
use tracing::{debug, trace};
use vortex::array::ArrayRef;
use vortex::array::Canonical;
use vortex::array::VortexSessionExecute;
Expand All @@ -32,6 +32,7 @@ use vortex::dtype::FieldNames;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::error::vortex_panic;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::col;
Expand All @@ -41,6 +42,8 @@ use vortex::expr::root;
use vortex::expr::select;
use vortex::expr::stats::Precision;
use vortex::expr::stats::Stat;
use vortex::file::multi::MultiFileSessionExt;
use vortex::file::multi::VortexFileReaderFactory;
use vortex::file::v2::FileStatsLayoutReader;
use vortex::io::kanal_ext::KanalExt;
use vortex::io::runtime::BlockingRuntime;
Expand All @@ -55,6 +58,7 @@ use vortex::scalar_fn::fns::pack::Pack;
use vortex::scan::DataSource;
use vortex::scan::ScanRequest;
use vortex::scan::selection::Selection;
use vortex_utils::aliases::dash_map::DashMap;
use vortex_utils::aliases::hash_set::HashSet;
use vortex_utils::parallelism::get_available_parallelism;

Expand Down Expand Up @@ -118,6 +122,7 @@ pub struct DataSourceBindData {
data_source: Arc<MultiLayoutDataSource>,
filter_exprs: Vec<Expression>,
column_fields: Vec<DuckdbField>,
stats_cache: Arc<DashMap<usize, ColumnStatisticsAggregate>>,
}

impl Clone for DataSourceBindData {
Expand All @@ -127,6 +132,7 @@ impl Clone for DataSourceBindData {
// filter_exprs are consumed once in `init_global`.
filter_exprs: vec![],
column_fields: self.column_fields.clone(),
stats_cache: Arc::clone(&self.stats_cache),
}
}
}
Expand Down Expand Up @@ -191,18 +197,22 @@ impl ColumnStatistics {
.vortex_expect("can't convert Scalar to duckdb Value")
});

let max_string_length = stats
.max_string_length
.map_or(0, |len| (1u64 << 63) | (len as u64));
let max_string_length = stats.max_string_length.unwrap_or(0);

let mut flags = 0u32;
if stats.max_string_length.is_some() {
flags |= 1;
}
// Useful estimate if we didn't get null count stats
let has_null = stats.has_null && dtype.is_nullable();
if stats.has_null && dtype.is_nullable() {
flags |= 2;
}

Self {
min,
max,
max_string_length,
has_null,
flags,
}
}
}
Expand All @@ -219,11 +229,11 @@ pub struct ColumnStatisticsAggregate {
impl ColumnStatisticsAggregate {
pub fn new(stats: &StatsSet) -> Self {
let min = match stats.get(Stat::Min) {
Some(Precision::Exact(min)) => Some(min),
Some(Precision::Exact(min) | Precision::Inexact(min)) => Some(min),
_ => None,
};
let max = match stats.get(Stat::Max) {
Some(Precision::Exact(max)) => Some(max),
Some(Precision::Exact(max) | Precision::Inexact(max)) => Some(max),
_ => None,
};

Expand All @@ -250,6 +260,41 @@ impl ColumnStatisticsAggregate {
has_null,
}
}

pub fn merge(&mut self, other: ColumnStatisticsAggregate) {
self.min = match (self.min.take(), other.min) {
(Some(a), Some(b)) => match partial_cmp_scalar_value(&a, &b) {
Some(std::cmp::Ordering::Greater) => Some(b),
Some(_) => Some(a),
None => None,
},
_ => None,
};
self.max = match (self.max.take(), other.max) {
(Some(a), Some(b)) => match partial_cmp_scalar_value(&a, &b) {
Some(std::cmp::Ordering::Less) => Some(b),
Some(_) => Some(a),
None => None,
},
_ => None,
};
self.max_string_length = match (self.max_string_length, other.max_string_length) {
(Some(a), Some(b)) => Some(a.max(b)),
_ => None,
};
self.has_null |= other.has_null;
}
}

fn partial_cmp_scalar_value(left: &ScalarValue, right: &ScalarValue) -> Option<std::cmp::Ordering> {
match (left, right) {
(ScalarValue::Bool(la), ScalarValue::Bool(lb)) => la.partial_cmp(lb),
(ScalarValue::Primitive(la), ScalarValue::Primitive(lb)) => la.partial_cmp(lb),
(ScalarValue::Decimal(la), ScalarValue::Decimal(lb)) => la.partial_cmp(lb),
(ScalarValue::Utf8(la), ScalarValue::Utf8(lb)) => la.partial_cmp(lb),
(ScalarValue::Binary(la), ScalarValue::Binary(lb)) => la.partial_cmp(lb),
_ => None,
}
}

impl<T: DataSourceTableFunction> TableFunction for T {
Expand All @@ -275,6 +320,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
data_source: Arc::new(data_source),
filter_exprs: vec![],
column_fields,
stats_cache: Arc::new(DashMap::default()),
})
}

Expand Down Expand Up @@ -520,27 +566,74 @@ impl<T: DataSourceTableFunction> TableFunction for T {
Ok(false)
}

/// Get column-wise statistics. Available only if we're reading a single
/// file.
fn statistics(bind_data: &Self::BindData, column_index: usize) -> Option<ColumnStatistics> {
let children = bind_data.data_source.children();
// Otherwise we'd have to open all files eagerly which is a performance
// regression. Duckdb's Parquet reader only gets metadata for multiple
// files with a UNION BY NAME and we don't support it (yet)
// See duckdb/common/multi_file/multi_file_function.hpp#L691
if children.len() != 1 {
return None;
debug!(
children = children.len(),
column_index, "requested statistics"
);
use std::time::Instant;
let now = Instant::now();

let dtype = bind_data.column_fields[column_index].dtype.clone();
if let Some(cache) = bind_data.stats_cache.get(&column_index) {
let stats = ColumnStatistics::from(cache.value(), dtype);
let elapsed = now.elapsed();
debug!("Cached, took {elapsed:.2?}");
return Some(stats);
}

let MultiLayoutChild::Opened(reader) = &children[0] else {
return None;
};
let stats_sets = match reader.as_any().downcast_ref::<FileStatsLayoutReader>() {
Some(inner) => inner.file_stats().stats_sets(),
None => return None,
let Some(reader) = reader.as_any().downcast_ref::<FileStatsLayoutReader>() else {
vortex_panic!("File reader is not FileStatsLayoutReader");
};
let stats_aggregate = ColumnStatisticsAggregate::new(&stats_sets[column_index]);
let dtype = bind_data.column_fields[column_index].dtype.clone();
Some(ColumnStatistics::from(&stats_aggregate, dtype))
let stats_sets = reader.file_stats().stats_sets();
let mut stats_aggregate = ColumnStatisticsAggregate::new(&stats_sets[column_index]);

// Duckdb's statistics() for Parquet return multi-file statistics only
// for UNION BY NAME. However, checking whether we have all footers
// cached is relatively cheap, so we try to always return multi-file
// statistics.
let session = (&*SESSION).multi_file();
let mut footers = Vec::with_capacity(children.len() - 1);
for child in &children[1..] {
let MultiLayoutChild::Deferred(factory) = child else {
vortex_panic!("Non-first file is opened eagerly");
};
let Some(factory) = factory.as_any().downcast_ref::<VortexFileReaderFactory>() else {
vortex_panic!("Layout factory is not file factory");
};
let path = factory.path();
let Some(footer) = session.get_footer(&path) else {
trace!(path, "No footer found");
return None;
};
if footer.statistics().is_none() {
trace!(path, "No statistics found for footer");
return None;
};
// Statistics merge is meaningful work, we want to skip it if any
// footer or footer statistics is missing.
footers.push(footer);
}

for footer in footers {
let Some(stats_sets) = footer.statistics() else {
vortex_panic!("Statistics for footer disappeared");
};
stats_aggregate.merge(ColumnStatisticsAggregate::new(
&stats_sets.stats_sets()[column_index],
));
}

let stats = ColumnStatistics::from(&stats_aggregate, dtype);
bind_data.stats_cache.insert(column_index, stats_aggregate);

let elapsed = now.elapsed();
debug!("Took {elapsed:.2?}");
Some(stats)
}

fn cardinality(bind_data: &Self::BindData) -> Cardinality {
Expand Down
20 changes: 7 additions & 13 deletions vortex-duckdb/src/duckdb/macro_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ macro_rules! lifetime_wrapper {
}
}

// --- Owned handle (calls destructor on drop) ---

$(#[$meta])*
#[allow(dead_code)]
pub struct $Name($ffi_type);
pub struct $Name(std::ptr::NonNull<std::ffi::c_void>);

#[allow(dead_code)]
impl $Name {
Expand All @@ -71,7 +69,7 @@ macro_rules! lifetime_wrapper {
"Attempted to create an owned wrapper from a null pointer"
);
}
Self(ptr)
Self(unsafe { std::ptr::NonNull::new_unchecked(ptr.cast()) })
}

/// Borrows the pointer as an immutable reference with explicit lifetime.
Expand Down Expand Up @@ -107,35 +105,31 @@ macro_rules! lifetime_wrapper {
/// calling the destructor.
pub fn into_ptr(self) -> $ffi_type {
let this = std::mem::ManuallyDrop::new(self);
this.0
(*this).0.as_ptr().cast()
}
}

impl std::ops::Deref for $Name {
type Target = [<$Name Ref>];

fn deref(&self) -> &Self::Target {
// SAFETY: The opaque [<$Name Ref>] is a ZST and the pointer is valid
// for the lifetime of the owned handle.
unsafe { &*(self.0 as *const [<$Name Ref>]) }
unsafe { &*(self.0.as_ptr() as *const [<$Name Ref>]) }
}
}

impl std::ops::DerefMut for $Name {
fn deref_mut(&mut self) -> &mut Self::Target {
// SAFETY: The opaque [<$Name Ref>] is a ZST and the pointer is valid
// for the lifetime of the owned handle. We have &mut self so
// exclusive access is guaranteed.
unsafe { &mut *(self.0 as *mut [<$Name Ref>]) }
unsafe { &mut *(self.0.as_ptr() as *mut [<$Name Ref>]) }
}
}

impl Drop for $Name {
fn drop(&mut self) {
let destructor = $destructor;
let mut ptr: $ffi_type = self.0.as_ptr().cast();
#[allow(unused_unsafe)]
unsafe {
destructor(&mut self.0)
destructor(&mut ptr)
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ pub struct PartitionData {
pub struct ColumnStatistics {
pub min: Option<Value>,
pub max: Option<Value>,
pub max_string_length: u64,
pub has_null: bool,
pub max_string_length: u32,
// 1: has string length
// 2: has null
pub flags: u32,
}

// String map lifetime is managed by C++ code
Expand Down Expand Up @@ -196,7 +198,7 @@ unsafe extern "C-unwind" fn statistics<T: TableFunction>(
stats_out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr());
stats_out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr());
stats_out.max_string_length = stats.max_string_length;
stats_out.has_null = stats.has_null;
stats_out.flags = stats.flags;
true
}

Expand Down
14 changes: 5 additions & 9 deletions vortex-duckdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use std::ffi::CStr;
use std::ffi::c_char;
use std::sync::LazyLock;
use std::sync::OnceLock;

use vortex::VortexSessionDefault;
use vortex::error::VortexExpect;
Expand Down Expand Up @@ -50,14 +49,11 @@ static SESSION: LazyLock<VortexSession> =
// would be hard to integrate with tracing::. We use logging for
// debugging only anyway, so that's good enough.
fn init_tracing() {
static ONCE: OnceLock<()> = OnceLock::new();
ONCE.get_or_init(|| {
drop(
tracing_subscriber::fmt()
.with_writer(std::io::stdout)
.try_init(),
);
});
drop(
tracing_subscriber::fmt()
.with_writer(std::io::stdout)
.try_init(),
)
}

/// Initialize the Vortex extension by registering the extension functions.
Expand Down
Loading
Loading