diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index ddf55b532a1..7d08e0cdecb 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -73,15 +73,20 @@ typedef struct { bool has_max_cardinality; } duckdb_vx_node_statistics; +typedef enum { + HasMaxStringLength = 1 << 0, + HasNull = 1 << 1, + // Set only when both min and max are set and both of them are exact + // boundaries + MinMaxIsExact = 1 << 2 +} ColumnStatisticsFlags; + typedef struct { - // Set only for strings and primitive types duckdb_value min; duckdb_value max; - // upper bit: "length is set". lower 32 bits: DuckDB's max string length. - // set only for strings - uint64_t max_string_length; - bool has_null; -} duckdb_column_statistics; + uint32_t max_string_length; + ColumnStatisticsFlags flags; +} duckdb_vx_column_statistics; // vtable mimicking subset of TableFunction. // See duckdb/include/function/tfunc.hpp @@ -110,11 +115,6 @@ typedef struct { duckdb_data_chunk data_chunk_out, duckdb_vx_error *error_out); - bool (*statistics)(duckdb_client_context context, - const void *bind_data, - size_t column_index, - duckdb_column_statistics *stats_out); - void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out); bool (*pushdown_complex_filter)(void *bind_data, duckdb_vx_expr expr, duckdb_vx_error *error_out); @@ -123,10 +123,18 @@ typedef struct { double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state); - idx_t (*get_partition_data)(const void *bind_data, - void *init_global_data, - void *init_local_data, - duckdb_vx_error *error_out); + // What file index are we currently exporting? + idx_t (*get_partition_data)(const void *init_local_data); + + size_t (*get_partition_count)(const void *bind_data); + // false if any file metadata (=row counts) is missing + bool (*get_partition_row_counts)(const void *bind_data, uint64_t *data, size_t len); + + // pass -1 to file_idx to get statistics for all files + bool (*get_column_stats)(const void *bind_data, + idx_t file_idx, + idx_t column_idx, + duckdb_vx_column_statistics *stats_out); } duckdb_vx_tfunc_vtab_t; // A single function for configuring the DuckDB table function vtable. diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index 21832a20950..04ec676ca86 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -89,103 +89,6 @@ double c_table_scan_progress(ClientContext &context, return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state); } -static Value &UnwrapValue(duckdb_value value) { - return *(reinterpret_cast(value)); -} - -unique_ptr numeric_stats(duckdb_column_statistics &stats, LogicalType type) { - BaseStatistics out = StringStats::CreateUnknown(type); - if (stats.min) { - NumericStats::SetMin(out, UnwrapValue(stats.min)); - duckdb_destroy_value(&stats.min); - } - if (stats.max) { - NumericStats::SetMax(out, UnwrapValue(stats.max)); - duckdb_destroy_value(&stats.max); - } - if (!stats.has_null) { - out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); - } - return out.ToUnique(); -} - -unique_ptr string_stats(duckdb_column_statistics &stats, LogicalType type) { - BaseStatistics out = StringStats::CreateUnknown(type); - if (stats.min) { - StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min))); - duckdb_destroy_value(&stats.min); - } - if (stats.max) { - StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max))); - duckdb_destroy_value(&stats.max); - } - if (stats.max_string_length >> 63) { - StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length)); - } - if (!stats.has_null) { - out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); - } - - return out.ToUnique(); -} - -unique_ptr base_stats(duckdb_column_statistics &stats, LogicalType type) { - BaseStatistics out = StringStats::CreateUnknown(type); - if (!stats.has_null) { - out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); - } - return out.ToUnique(); -} - -unique_ptr -c_statistics(ClientContext &context, const FunctionData *bind_data, column_t column_index) { - if (IsVirtualColumn(column_index)) { - return {}; - } - - const auto &bind = bind_data->Cast(); - void *const ffi_bind = bind.ffi_data->DataPtr(); - - duckdb_client_context c_ctx = reinterpret_cast(&context); - duckdb_column_statistics statistics = {}; - if (!bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics)) { - return {}; - } - - const LogicalType type = bind.types[column_index]; - - switch (type.id()) { - case LogicalTypeId::BOOLEAN: - case LogicalTypeId::TINYINT: - case LogicalTypeId::SMALLINT: - case LogicalTypeId::INTEGER: - case LogicalTypeId::BIGINT: - case LogicalTypeId::FLOAT: - case LogicalTypeId::DOUBLE: - case LogicalTypeId::UTINYINT: - case LogicalTypeId::USMALLINT: - case LogicalTypeId::UINTEGER: - case LogicalTypeId::UBIGINT: - case LogicalTypeId::UHUGEINT: - case LogicalTypeId::HUGEINT: { - return numeric_stats(statistics, type); - } - case LogicalTypeId::VARCHAR: - case LogicalTypeId::BLOB: { - return string_stats(statistics, type); - } - case LogicalTypeId::STRUCT: { - // TODO(myrrc) - // Duckdb's has_null has a different semantics for structs. - // If we propagate our has_null, this breaks Duckdb optimizer. - // You can reproduce it in struct.slt test in vortex-sqllogictests: - return {}; - } - default: - return base_stats(statistics, type); - } -} - unique_ptr c_bind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, @@ -340,28 +243,161 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu result->return_types.emplace_back(*logical_type); } -OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { - if (input.partition_info.RequiresPartitionColumns()) { - throw InternalException("TableScan::GetPartitionData: partition columns not supported"); +static Value &UnwrapValue(duckdb_value value) { + return *(reinterpret_cast(value)); +} + +using BaseStatisticsPtr = unique_ptr; + +BaseStatisticsPtr numeric_stats(duckdb_vx_column_statistics &stats, LogicalType type) { + BaseStatistics out = StringStats::CreateUnknown(type); + if (stats.min) { + NumericStats::SetMin(out, UnwrapValue(stats.min)); + duckdb_destroy_value(&stats.min); } - auto &bind = input.bind_data->Cast(); - void *const ffi_bind = bind.ffi_data->DataPtr(); - void *const ffi_global = input.global_state->Cast().ffi_data->DataPtr(); - void *const ffi_local = input.local_state->Cast().ffi_data->DataPtr(); + if (stats.max) { + NumericStats::SetMax(out, UnwrapValue(stats.max)); + duckdb_destroy_value(&stats.max); + } + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); +} - duckdb_vx_error error_out = nullptr; - const idx_t batch_index = bind.info->vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out); - if (error_out) { - throw InvalidInputException(IntoErrString(error_out)); +BaseStatisticsPtr string_stats(duckdb_vx_column_statistics &stats, LogicalType type) { + BaseStatistics out = StringStats::CreateUnknown(type); + if (stats.min) { + StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min))); + duckdb_destroy_value(&stats.min); } - return OperatorPartitionData(batch_index); + if (stats.max) { + StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max))); + duckdb_destroy_value(&stats.max); + } + if (stats.flags & ColumnStatisticsFlags::HasMaxStringLength) { + StringStats::SetMaxStringLength(out, stats.max_string_length); + } + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); +} + +BaseStatisticsPtr base_stats(duckdb_vx_column_statistics &stats, LogicalType type) { + BaseStatistics out = StringStats::CreateUnknown(type); + if (!(stats.flags & ColumnStatisticsFlags::HasNull)) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); } -extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) { - D_ASSERT(map); - D_ASSERT(key); - D_ASSERT(value); - reinterpret_cast *>(map)->insert(key, value); +BaseStatisticsPtr to_duckdb_statistics(duckdb_vx_column_statistics &statistics, LogicalType type) { + switch (type.id()) { + case LogicalTypeId::BOOLEAN: + case LogicalTypeId::TINYINT: + case LogicalTypeId::SMALLINT: + case LogicalTypeId::INTEGER: + case LogicalTypeId::BIGINT: + case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::UTINYINT: + case LogicalTypeId::USMALLINT: + case LogicalTypeId::UINTEGER: + case LogicalTypeId::UBIGINT: + case LogicalTypeId::UHUGEINT: + case LogicalTypeId::HUGEINT: { + return numeric_stats(statistics, type); + } + case LogicalTypeId::VARCHAR: + case LogicalTypeId::BLOB: { + return string_stats(statistics, type); + } + case LogicalTypeId::STRUCT: { + // TODO(myrrc) + // Duckdb's has_null has a different semantics for structs. + // If we propagate our has_null, this breaks Duckdb optimizer. + // You can reproduce it in struct.slt test in vortex-sqllogictests: + return {}; + } + default: + return base_stats(statistics, type); + } +} + +struct VortexRowGroup final : PartitionRowGroup { + VortexRowGroup(const CTableBindData &bind_data, idx_t file_idx) + : bind_data(bind_data), file_idx(file_idx) { + } + ~VortexRowGroup() override = default; + + BaseStatisticsPtr GetColumnStatistics(const StorageIndex &storage_index) override { + const idx_t column_idx = storage_index.GetPrimaryIndex(); + D_ASSERT(!IsVirtualColumn(column_idx)); + D_ASSERT(column_idx < bind_data.types.size()); + + void *const bind_ffi = bind_data.ffi_data->DataPtr(); + if (!bind_data.info->vtab.get_column_stats(bind_ffi, file_idx, column_idx, &statistics)) { + return {}; + } + return to_duckdb_statistics(statistics, bind_data.types[column_idx]); + } + + bool MinMaxIsExact(const BaseStatistics &, const StorageIndex &) override { + // This function is called after GetColumnStatistics with same storage + // index, thus we can reuse calculated statistics + // duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp:84 + return statistics.flags & ColumnStatisticsFlags::MinMaxIsExact; + } + + const CTableBindData &bind_data; + const idx_t file_idx; + duckdb_vx_column_statistics statistics; +}; + +vector get_partition_stats(ClientContext &, GetPartitionStatsInput &input) { + const CTableBindData &bind_data = input.bind_data->Cast(); + void *const ffi_bind = bind_data.ffi_data->DataPtr(); + + const size_t partitions_len = bind_data.info->vtab.get_partition_count(ffi_bind); + vector row_counts(partitions_len); + if (!bind_data.info->vtab.get_partition_row_counts(ffi_bind, row_counts.data(), partitions_len)) { + return {}; + } + + idx_t offset = 0; + vector out(partitions_len); + for (idx_t i = 0; i < partitions_len; ++i) { + out[i].row_start = offset; + out[i].count = row_counts[i]; + out[i].count_type = CountType::COUNT_EXACT; + out[i].partition_row_group = make_shared_ptr(bind_data, i); + offset += row_counts[i]; + } + return out; +} + +BaseStatisticsPtr statistics(ClientContext &, const FunctionData *bind_data, column_t column_idx) { + if (IsVirtualColumn(column_idx)) { + return {}; + } + const auto &bind = bind_data->Cast(); + duckdb_vx_column_statistics statistics; + void *const ffi_bind = bind.ffi_data->DataPtr(); + if (!bind.info->vtab.get_column_stats(ffi_bind, -1, column_idx, &statistics)) { + return {}; + } + return to_duckdb_statistics(statistics, bind.types[column_idx]); +} + +OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { + if (input.partition_info.RequiresPartitionColumns()) { + throw InternalException("TableScan::GetPartitionData: partition columns not supported"); + } + const auto &bind = input.bind_data->Cast(); + void *const ffi_local = input.local_state->Cast().ffi_data->DataPtr(); + const idx_t batch_index = bind.info->vtab.get_partition_data(ffi_local); + return OperatorPartitionData(batch_index); } InsertionOrderPreservingMap c_to_string(TableFunctionToStringInput &input) { @@ -391,10 +427,11 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.pushdown_complex_filter = c_pushdown_complex_filter; tf.cardinality = c_cardinality; - tf.get_partition_data = c_get_partition_data; tf.to_string = c_to_string; tf.table_scan_progress = c_table_scan_progress; - tf.statistics = c_statistics; + tf.get_partition_data = get_partition_data; + tf.get_partition_stats = get_partition_stats; + tf.statistics = statistics; tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t { return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}}; diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index c59fe6cbc2b..0fe453b9054 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -31,6 +31,7 @@ use vortex::dtype::FieldNames; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; +use vortex::error::vortex_panic; use vortex::expr::Expression; use vortex::expr::and_collect; use vortex::expr::col; @@ -38,6 +39,7 @@ use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; use vortex::expr::stats::Stat; +use vortex::file::multi::VortexFileReaderFactory; use vortex::file::v2::FileStatsLayoutReader; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; @@ -46,7 +48,6 @@ use vortex::layout::scan::multi::MultiLayoutChild; use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex::metrics::tracing::get_global_labels; use vortex::scalar::Scalar; -use vortex::scalar::ScalarValue; use vortex::scalar_fn::fns::pack::Pack; use vortex::scan::DataSource; use vortex::scan::ScanRequest; @@ -140,12 +141,19 @@ impl Debug for DataSourceBindData { } } -type DataSourceIterator = ThreadSafeIterator)>>; +type BatchId = usize; +struct DataSourceBatch { + /// Monotonic non-decreasing id used for Duckdb data partitioning and + /// partition stats. Currently each id corresponds to a file. + id: BatchId, + array: ArrayRef, + cache: Arc, +} +type DataSourceIterator = ThreadSafeIterator>; /// Global scan state for driving a `DataSource` scan through DuckDB. pub struct DataSourceGlobal { iterator: DataSourceIterator, - batch_id: AtomicU64, bytes_total: Arc, bytes_read: AtomicU64, } @@ -154,8 +162,9 @@ pub struct DataSourceGlobal { pub struct DataSourceLocal { iterator: DataSourceIterator, exporter: Option, - /// The unique batch id of the last chunk exported via scan(). - batch_id: Option, + /// Monotonic non-decreasing id used for Duckdb data partitioning and + /// partition stats. Currently each id corresponds to a file. + batch_id: BatchId, } /// Returns scan progress as a percentage (0.0–100.0). @@ -167,86 +176,72 @@ fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 { } impl ColumnStatistics { - fn from(stats: &ColumnStatisticsAggregate, dtype: DType) -> Self { - let min = stats.min.as_ref().map(|value| { - let value = value.clone(); + pub fn new(stats: &StatsSet, dtype: DType) -> Self { + let (min, min_exact) = match stats.get(Stat::Min) { + Some(Precision::Exact(min)) => (Some(min), true), + Some(Precision::Inexact(min)) => (Some(min), false), + _ => (None, false), + }; + let (max, max_exact) = match stats.get(Stat::Max) { + Some(Precision::Exact(max)) => (Some(max), true), + Some(Precision::Inexact(max)) => (Some(max), false), + _ => (None, false), + }; + + let min = min.map(|value| { Scalar::try_new(dtype.clone(), Some(value)) .vortex_expect("scalar dtype and value are incompatible") .try_to_duckdb_scalar() .vortex_expect("can't convert Scalar to duckdb Value") }); - let max = stats.max.as_ref().map(|value| { + let max = max.map(|value| { Scalar::try_new(dtype.clone(), Some(value.clone())) .vortex_expect("scalar dtype and value are incompatible") .try_to_duckdb_scalar() .vortex_expect("can't convert Scalar to duckdb Value") }); - let max_string_length = stats - .max_string_length - .map_or(0, |len| (1u64 << 63) | (len as u64)); - - // Useful estimate if we didn't get null count stats - let has_null = stats.has_null && dtype.is_nullable(); - - Self { - min, - max, - max_string_length, - has_null, - } - } -} - -#[derive(Default)] -pub struct ColumnStatisticsAggregate { - pub min: Option, - pub max: Option, - pub max_string_length: Option, - /// May be true if null count stat isn't present - pub has_null: bool, -} - -impl ColumnStatisticsAggregate { - pub fn new(stats: &StatsSet) -> Self { - let min = match stats.get(Stat::Min) { - Some(Precision::Exact(min)) => Some(min), - _ => None, - }; - let max = match stats.get(Stat::Max) { - Some(Precision::Exact(max)) => Some(max), - _ => None, - }; - - let max_string_length = + let (max_string_length, has_max_string_length) = if let Some(Precision::Exact(value)) = stats.get(Stat::UncompressedSizeInBytes) { // DuckDB's string length is u32 #[allow(clippy::cast_possible_truncation)] - Some(value.as_primitive().as_u64().vortex_expect("not a u64") as u32) + ( + value.as_primitive().as_u64().vortex_expect("not a u64") as u32, + true, + ) } else { - None + (0, false) }; + let mut flags = 0u32; + + if has_max_string_length { + flags |= 1; + } + let has_null = match stats.get(Stat::NullCount) { Some(Precision::Exact(cnt)) => { cnt.as_primitive().as_u64().vortex_expect("not a u64") > 0 } _ => true, }; + if has_null && dtype.is_nullable() { + flags |= 2; + } + + if min.is_some() && max.is_some() && min_exact && max_exact { + flags |= 4; + } Self { min, max, max_string_length, - has_null, + flags, } } } -// --------------------------------------------------------------------------- -// Blanket TableFunction implementation for any DataSourceTableFunction -// --------------------------------------------------------------------------- - impl TableFunction for T { type BindData = DataSourceBindData; type GlobalState = DataSourceGlobal; @@ -318,7 +313,8 @@ impl TableFunction for T { // first available array chunk. let stream = scan .partitions() - .map(move |partition| { + .enumerate() + .map(move |(id, partition)| { // We create a new conversion cache scoped to the partition, since there's no point // caching anything across partitions. let cache = Arc::new(ConversionCache::default()); @@ -334,7 +330,11 @@ impl TableFunction for T { }; while let Some(item) = stream.next().await { if tx - .send(item.map(|a| (a, Arc::clone(&cache)))) + .send(item.map(|array| DataSourceBatch { + id, + array, + cache: Arc::clone(&cache), + })) .await .is_err() { @@ -345,7 +345,9 @@ impl TableFunction for T { } }) }) - .buffer_unordered(num_workers); + // We need ordering guarantees as produced batch id for each item + // must be non-decreasing + .buffered(num_workers); // Spawn a task to drive the partition stream and push array chunks into the channel. RUNTIME.handle().spawn(stream.collect::<()>()).detach(); @@ -354,7 +356,6 @@ impl TableFunction for T { Ok(DataSourceGlobal { iterator, - batch_id: AtomicU64::new(0), bytes_total: Arc::new(AtomicU64::new(0)), bytes_read: AtomicU64::new(0), }) @@ -382,7 +383,7 @@ impl TableFunction for T { Ok(DataSourceLocal { iterator: global.iterator.clone(), exporter: None, - batch_id: None, + batch_id: 0, }) } @@ -399,13 +400,12 @@ impl TableFunction for T { let Some(result) = local_state.iterator.next() else { return Ok(()); }; - let (array_result, conversion_cache) = result?; - let array_result = array_result.optimize_recursive()?; + let DataSourceBatch { id, array, cache } = result?; + let array = array.optimize_recursive()?; - let array_result: StructArray = if let Some(array) = array_result.as_opt::() - { + let array: StructArray = if let Some(array) = array.as_opt::() { array.into_owned() - } else if let Some(array) = array_result.as_opt::() + } else if let Some(array) = array.as_opt::() && let Some(pack_options) = array.scalar_fn().as_opt::() { StructArray::new( @@ -415,16 +415,11 @@ impl TableFunction for T { pack_options.nullability.into(), ) } else { - array_result.execute::(&mut ctx)?.into_struct() + array.execute::(&mut ctx)?.into_struct() }; - local_state.exporter = Some(ArrayExporter::try_new( - &array_result, - &conversion_cache, - ctx, - )?); - // Relaxed since there is no intra-instruction ordering required. - local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); + local_state.exporter = Some(ArrayExporter::try_new(&array, &cache, ctx)?); + local_state.batch_id = id; } let exporter = local_state @@ -440,7 +435,6 @@ impl TableFunction for T { if !has_more_data { // This exporter is fully consumed. local_state.exporter = None; - local_state.batch_id = None; } else { break; } @@ -480,31 +474,78 @@ impl TableFunction for T { Ok(false) } - /// Get column-wise statistics. Available only if we're reading a single - /// file. - fn statistics( - _client_context: &ClientContextRef, + fn column_stats( bind_data: &Self::BindData, + file_index: usize, column_index: usize, ) -> Option { let children = bind_data.data_source.children(); + + // Requesting general statistics. Output only if there is 1 file. // Otherwise we'd have to open all files eagerly which is a performance // regression. Duckdb's Parquet reader only gets metadata for multiple // files with a UNION BY NAME and we don't support it (yet) // See duckdb/common/multi_file/multi_file_function.hpp#L691 - if children.len() != 1 { + if file_index == usize::MAX && children.len() != 1 { return None; } - let MultiLayoutChild::Opened(ref reader) = children[0] else { - return None; + if file_index == usize::MAX { + let MultiLayoutChild::Opened(reader) = &children[0] else { + vortex_panic!("Single file should be eagerly opened"); + }; + let stats_sets = match reader.as_any().downcast_ref::() { + Some(inner) => inner.file_stats().stats_sets(), + None => { + vortex_panic!("Single file without metadata"); + } + }; + let stats_set = &stats_sets[column_index]; + let dtype = bind_data.column_fields[column_index].dtype.clone(); + return Some(ColumnStatistics::new(stats_set, dtype)); + } + assert!(file_index < children.len()); + let child = &children[file_index]; + + let MultiLayoutChild::Opened(reader) = child else { + panic!("File metadata not found"); }; let stats_sets = match reader.as_any().downcast_ref::() { Some(inner) => inner.file_stats().stats_sets(), None => return None, }; - let stats_aggregate = ColumnStatisticsAggregate::new(&stats_sets[column_index]); + let stats_set = &stats_sets[column_index]; let dtype = bind_data.column_fields[column_index].dtype.clone(); - Some(ColumnStatistics::from(&stats_aggregate, dtype)) + Some(ColumnStatistics::new(stats_set, dtype)) + } + + fn partition_data(local_init_data: &Self::LocalState) -> u64 { + local_init_data.batch_id as u64 + } + + fn partition_count(bind_data: &Self::BindData) -> usize { + bind_data.data_source.children().len() + } + + fn partition_row_counts(bind_data: &Self::BindData, row_counts: &mut [u64]) -> bool { + let files = bind_data.data_source.children(); + assert!(files.len() == row_counts.len()); + for (rc, file) in row_counts.iter_mut().zip(files) { + match file { + MultiLayoutChild::Opened(reader) => { + println!("partition_row_counts opened reader, returning row count"); + *rc = reader.row_count(); + } + MultiLayoutChild::Deferred(reader) => { + let path = match reader.as_any().downcast_ref::() { + Some(factory) => factory.file.path.clone(), + None => { + vortex_panic!("Deferred reader should be a VortexFileReaderFactory"); + } + }; + } + } + } + true } fn cardinality(bind_data: &Self::BindData) -> Cardinality { @@ -515,16 +556,6 @@ impl TableFunction for T { } } - fn partition_data( - _bind_data: &Self::BindData, - _global_init_data: &Self::GlobalState, - local_init_data: &mut Self::LocalState, - ) -> VortexResult { - local_init_data - .batch_id - .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) - } - fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) { map.push("Function", "Vortex Scan"); if !bind_data.filter_exprs.is_empty() { diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index ac671f409e8..a8a6ee6e9a2 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -1,19 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use core::slice; use std::ffi::CStr; use std::ffi::CString; use std::ffi::c_void; use std::fmt::Debug; +use std::ptr; use vortex::error::VortexExpect; use vortex::error::VortexResult; mod bind; mod cardinality; mod init; -mod partition; mod pushdown_complex_filter; -mod statistics; mod table_scan_progress; pub use bind::*; @@ -21,6 +21,7 @@ pub use init::*; use crate::cpp; use crate::cpp::duckdb_client_context; +use crate::cpp::idx_t; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::DatabaseRef; @@ -30,9 +31,7 @@ use crate::duckdb::client_context::ClientContextRef; use crate::duckdb::data_chunk::DataChunkRef; use crate::duckdb::expr::ExpressionRef; use crate::duckdb::table_function::cardinality::cardinality_callback; -use crate::duckdb::table_function::partition::get_partition_data_callback; use crate::duckdb::table_function::pushdown_complex_filter::pushdown_complex_filter_callback; -use crate::duckdb::table_function::statistics::statistics; use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb_try; @@ -40,8 +39,11 @@ use crate::duckdb_try; pub struct ColumnStatistics { pub min: Option, pub max: Option, - pub max_string_length: u64, - pub has_null: bool, + pub max_string_length: u32, + // has string length: 1 + // has null: 2 + // min max is exact: 4 + pub flags: u32, } // String map lifetime is managed by C++ code @@ -78,12 +80,10 @@ pub trait TableFunction: Sized + Debug { result: &mut BindResultRef, ) -> VortexResult; - /// Report column statistics for a file or collections of files e.g. - /// registered as a VIEW. - fn statistics( - client_context: &ClientContextRef, + fn column_stats( bind_data: &Self::BindData, - column_index: usize, + file_idx: usize, + column_idx: usize, ) -> Option; /// The function is called during query execution and is responsible for producing the output @@ -134,13 +134,10 @@ pub trait TableFunction: Sized + Debug { Cardinality::Unknown } - /// Returns the idx of the current partition being processed by a local threa. - /// This *must* be globally unique. - fn partition_data( - _bind_data: &Self::BindData, - _global_init_data: &Self::GlobalState, - _local_init_data: &mut Self::LocalState, - ) -> VortexResult; + // What file are we currently exporting? + fn partition_data(local_init_data: &Self::LocalState) -> u64; + fn partition_count(local_init_data: &Self::BindData) -> usize; + fn partition_row_counts(bind_data: &Self::BindData, row_counts: &mut [u64]) -> bool; /// Returns a vector of key-value pairs for EXPLAIN output fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef); @@ -174,12 +171,14 @@ impl DatabaseRef { init_global: Some(init_global_callback::), init_local: Some(init_local_callback::), function: Some(function::), - statistics: Some(statistics::), + get_column_stats: Some(get_column_stats::), cardinality: Some(cardinality_callback::), pushdown_complex_filter: Some(pushdown_complex_filter_callback::), to_string: Some(to_string_callback::), table_scan_progress: Some(table_scan_progress_callback::), - get_partition_data: Some(get_partition_data_callback::), + get_partition_data: Some(get_partition_data::), + get_partition_count: Some(get_partition_count::), + get_partition_row_counts: Some(get_partition_row_counts::), }; duckdb_try!( @@ -237,3 +236,49 @@ unsafe extern "C-unwind" fn function( }, } } + +unsafe extern "C-unwind" fn get_column_stats( + bind_data: *const c_void, + file_idx: u64, + column_idx: u64, + stats_out: *mut cpp::duckdb_vx_column_statistics, +) -> bool { + let stats_out = unsafe { &mut *stats_out }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); + let Some(stats) = T::column_stats(bind_data, file_idx as usize, column_idx as usize) else { + return false; + }; + stats_out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr()); + stats_out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr()); + stats_out.max_string_length = stats.max_string_length; + stats_out.flags = stats.flags; + true +} + +unsafe extern "C-unwind" fn get_partition_data( + local_init_data: *const c_void, +) -> idx_t { + let local_init_data = unsafe { local_init_data.cast::().as_ref() } + .vortex_expect("local_init_data null pointer"); + T::partition_data(local_init_data) +} + +unsafe extern "C-unwind" fn get_partition_count( + bind_data: *const c_void, +) -> usize { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); + T::partition_count(bind_data) +} + +unsafe extern "C-unwind" fn get_partition_row_counts( + bind_data: *const c_void, + data: *mut u64, + len: usize, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); + let row_counts = unsafe { slice::from_raw_parts_mut(data, len) }; + T::partition_row_counts(bind_data, row_counts) +} diff --git a/vortex-duckdb/src/duckdb/table_function/partition.rs b/vortex-duckdb/src/duckdb/table_function/partition.rs deleted file mode 100644 index 09d0e8fbd94..00000000000 --- a/vortex-duckdb/src/duckdb/table_function/partition.rs +++ /dev/null @@ -1,36 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ffi::c_void; - -use cpp::duckdb_vx_error; -use vortex::error::VortexExpect; - -use crate::cpp; -use crate::cpp::idx_t; -use crate::duckdb::TableFunction; - -/// Native callback for the cardinality estimate of a table function. -pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback( - bind_data: *const c_void, - global_init_data: *mut c_void, - local_init_data: *mut c_void, - error_out: *mut duckdb_vx_error, -) -> idx_t { - let bind_data = - unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); - let global_init_data = unsafe { global_init_data.cast::().as_ref() } - .vortex_expect("global_init_data null pointer"); - let local_init_data = unsafe { local_init_data.cast::().as_mut() } - .vortex_expect("local_init_data null pointer"); - - match T::partition_data(bind_data, global_init_data, local_init_data) { - Ok(batch_id) => batch_id, - Err(e) => { - // Set the error in the error output. - let msg = e.to_string(); - unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) }; - 0 - } - } -} diff --git a/vortex-duckdb/src/duckdb/table_function/statistics.rs b/vortex-duckdb/src/duckdb/table_function/statistics.rs deleted file mode 100644 index 7f923dddefd..00000000000 --- a/vortex-duckdb/src/duckdb/table_function/statistics.rs +++ /dev/null @@ -1,31 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ffi::c_void; -use std::ptr; - -use vortex::error::VortexExpect; - -use crate::cpp; -use crate::duckdb::ClientContext; -use crate::duckdb::TableFunction; - -pub(crate) unsafe extern "C-unwind" fn statistics( - ctx: cpp::duckdb_client_context, - bind_data: *const c_void, - column_index: usize, - stats_out: *mut cpp::duckdb_column_statistics, -) -> bool { - let stats_out = unsafe { &mut *stats_out }; - let client_context = unsafe { ClientContext::borrow(ctx) }; - let bind_data = - unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); - let Some(stats) = T::statistics(client_context, bind_data, column_index) else { - return false; - }; - stats_out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr()); - stats_out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr()); - stats_out.max_string_length = stats.max_string_length; - stats_out.has_null = stats.has_null; - true -} diff --git a/vortex-file/src/multi/mod.rs b/vortex-file/src/multi/mod.rs index 5e7f5466921..844d1b97fc8 100644 --- a/vortex-file/src/multi/mod.rs +++ b/vortex-file/src/multi/mod.rs @@ -5,6 +5,7 @@ mod session; +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; @@ -241,9 +242,9 @@ fn layout_reader_with_stats(file: &crate::VortexFile) -> VortexResult VortexOpenOptions + Send + Sync>, } @@ -260,4 +261,8 @@ impl LayoutReaderFactory for VortexFileReaderFactory { .await?; Ok(Some(layout_reader_with_stats(&file)?)) } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index 600d14ccffd..3e171911a85 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -68,6 +68,7 @@ const DEFAULT_CONCURRENCY: usize = 8; pub trait LayoutReaderFactory: 'static + Send + Sync { /// Opens the layout reader, or returns `None` if it should be skipped. async fn open(&self) -> VortexResult>; + fn as_any(&self) -> &dyn Any; } /// A [`DataSource`] that combines multiple [`LayoutReaderRef`]s into a single scannable source.