Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, cons

// Input data passed into the init_global and init_local callbacks.
typedef struct {
const void *bind_data;
void *bind_data;

/**
* Projected columns that are requested to be read. These are not
Expand Down
63 changes: 57 additions & 6 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,52 @@ static Value &UnwrapValue(duckdb_value value) {
return *(reinterpret_cast<Value *>(value));
}

// For boolean or integral types, derive distinct count from min/max pair.
idx_t integer_distinct(LogicalTypeId id, const Value &min, const Value &max) {
switch (id) {
case LogicalTypeId::BOOLEAN:
return 1 + max.GetValueUnsafe<bool>() - min.GetValueUnsafe<bool>();
case LogicalTypeId::UTINYINT:
return 1 + max.GetValueUnsafe<uint8_t>() - min.GetValueUnsafe<uint8_t>();
case LogicalTypeId::USMALLINT:
return 1 + max.GetValueUnsafe<uint16_t>() - min.GetValueUnsafe<uint16_t>();
case LogicalTypeId::UINTEGER:
return 1 + max.GetValueUnsafe<uint32_t>() - min.GetValueUnsafe<uint32_t>();
case LogicalTypeId::UBIGINT:
return 1 + max.GetValueUnsafe<uint64_t>() - min.GetValueUnsafe<uint64_t>();
case LogicalTypeId::TINYINT:
return 1 + abs(max.GetValueUnsafe<int8_t>() - min.GetValueUnsafe<int8_t>());
case LogicalTypeId::SMALLINT:
return 1 + abs(max.GetValueUnsafe<int16_t>() - min.GetValueUnsafe<int16_t>());
case LogicalTypeId::INTEGER:
return 1 + labs(max.GetValueUnsafe<int32_t>() - min.GetValueUnsafe<int32_t>());
case LogicalTypeId::BIGINT:
return 1 + llabs(max.GetValueUnsafe<int64_t>() - min.GetValueUnsafe<int64_t>());
// Don't estimate distinct for huge ints since result may not fit in u64.
default:
return 0;
}
}

unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
if (stats.min && stats.max) {
const Value &min = UnwrapValue(stats.min);
NumericStats::SetMin(out, min);

const Value &max = UnwrapValue(stats.max);
NumericStats::SetMax(out, max);

if (const idx_t distinct = integer_distinct(type.id(), min, max); distinct > 0) {
out.SetDistinctCount(distinct);
}

duckdb_destroy_value(&stats.min);
duckdb_destroy_value(&stats.max);
Comment on lines +98 to +138
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in the c++ wrapper,

I would like this to be in rust

} else if (stats.min) {
NumericStats::SetMin(out, UnwrapValue(stats.min));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
} else if (stats.max) {
NumericStats::SetMax(out, UnwrapValue(stats.max));
duckdb_destroy_value(&stats.max);
}
Expand All @@ -112,14 +151,26 @@ unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, Logica

unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
if (stats.min && stats.max) {
const std::string &min = StringValue::Get(UnwrapValue(stats.min));
StringStats::SetMin(out, min);
duckdb_destroy_value(&stats.min);

const std::string &max = StringValue::Get(UnwrapValue(stats.max));
StringStats::SetMax(out, max);
duckdb_destroy_value(&stats.max);

if (min == max) {
out.SetDistinctCount(1);
}
} else if (stats.min) {
StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min)));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
} else if (stats.max) {
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
duckdb_destroy_value(&stats.max);
}

if (stats.max_string_length >> 63) {
StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length));
}
Expand Down
62 changes: 47 additions & 15 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! to get a blanket [`TableFunction`] implementation covering init, scan, progress, filter
//! pushdown, cardinality, and partitioning.

use std::cmp::max;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -51,6 +52,8 @@ use vortex::layout::scan::multi::MultiLayoutDataSource;
use vortex::metrics::tracing::get_global_labels;
use vortex::scalar::Scalar;
use vortex::scalar::ScalarValue;
use vortex::scalar_fn::fns::binary::Binary;
use vortex::scalar_fn::fns::operators::Operator;
use vortex::scalar_fn::fns::pack::Pack;
use vortex::scan::DataSource;
use vortex::scan::ScanRequest;
Expand All @@ -74,6 +77,7 @@ use crate::duckdb::DuckdbStringMapRef;
use crate::duckdb::ExpressionRef;
use crate::duckdb::LogicalType;
use crate::duckdb::PartitionData;
use crate::duckdb::TableFilterClass;
use crate::duckdb::TableFilterSetRef;
use crate::duckdb::TableFunction;
use crate::duckdb::TableInitInput;
Expand Down Expand Up @@ -118,6 +122,7 @@ pub struct DataSourceBindData {
data_source: Arc<MultiLayoutDataSource>,
filter_exprs: Vec<Expression>,
column_fields: Vec<DuckdbField>,
has_non_optional_filter: bool,
}

impl Clone for DataSourceBindData {
Expand All @@ -127,6 +132,7 @@ impl Clone for DataSourceBindData {
// filter_exprs are consumed once in `init_global`.
filter_exprs: vec![],
column_fields: self.column_fields.clone(),
has_non_optional_filter: self.has_non_optional_filter,
}
}
}
Expand Down Expand Up @@ -252,6 +258,20 @@ impl ColumnStatisticsAggregate {
}
}

// Duckdb requires post-filter cardinality estimates, otherwise join
// planner may flip join sides which is a huge regression for some
// queries i.e. 1000x for tpcds 85.
//
// See duckdb/src/optimizer/join_order/relation_statistics_helper.cpp
const DEFAULT_SELECTIVITY: f64 = 0.2;
fn postfilter_cardinality(cardinality: u64, has_non_optional_filter: bool) -> u64 {
if has_non_optional_filter {
max(1, (cardinality as f64 * DEFAULT_SELECTIVITY) as u64)
} else {
cardinality
}
}

impl<T: DataSourceTableFunction> TableFunction for T {
type BindData = DataSourceBindData;
type GlobalState = DataSourceGlobal;
Expand All @@ -275,6 +295,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
data_source: Arc::new(data_source),
filter_exprs: vec![],
column_fields,
has_non_optional_filter: false,
})
}

Expand All @@ -297,13 +318,15 @@ impl<T: DataSourceTableFunction> TableFunction for T {
row_range,
file_selection,
file_range,
has_non_optional_filter,
} = extract_table_filter_expr(
init_input.table_filter_set(),
column_ids,
&bind_data.column_fields,
&bind_data.filter_exprs,
bind_data.data_source.dtype(),
)?;
bind_data.has_non_optional_filter = has_non_optional_filter;

debug!(
%projection,
Expand Down Expand Up @@ -506,18 +529,19 @@ impl<T: DataSourceTableFunction> TableFunction for T {
debug!(%expr, "failed to push down expression");
return Ok(false);
};
debug!(%expr, "pushed down expression");
bind_data.filter_exprs.push(expr);

// NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true`
// here to tell DuckDB we've handled the filter. However, DuckDB applies some crude
// cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that
// means by returning false, DuckDB runs an additional filter (a little bit of overhead)
// but tends to end up with a better query plan.
// If we plumb row count estimation into the layout tree, perhaps we could use zone maps
// etc. to return estimates. But this function is probably called too late anyway. Maybe
// we need our own cardinality heuristics.
Ok(false)
// Default selectivity filter gives too high cardinality bounds for
// equality operators which flips join sides, see tpch sf=10, query 17.
// All other operators estimate is mostly correct.
// To fix this, we need to start collecting distinct counts for columns.
let report_pushed = !expr
.as_opt::<Binary>()
.map(|op| *op == Operator::Eq)
.unwrap_or(false);
Comment on lines +537 to +540
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about (> /\ =) or > \/ =? what should we do?


debug!(%expr, report_pushed, "pushed down expression");
bind_data.filter_exprs.push(expr);
Ok(report_pushed)
}

/// Get column-wise statistics. Available only if we're reading a single
Expand Down Expand Up @@ -545,8 +569,10 @@ impl<T: DataSourceTableFunction> TableFunction for T {

fn cardinality(bind_data: &Self::BindData) -> Cardinality {
match bind_data.data_source.row_count() {
Some(Precision::Exact(v)) => Cardinality::Maximum(v),
Some(Precision::Inexact(v)) => Cardinality::Estimate(v),
Some(Precision::Exact(v) | Precision::Inexact(v)) => {
// Post-filter estimate is always a heuristic.
Cardinality::Estimate(postfilter_cardinality(v, bind_data.has_non_optional_filter))
}
None => Cardinality::Unknown,
}
}
Expand All @@ -565,8 +591,8 @@ impl<T: DataSourceTableFunction> TableFunction for T {
fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) {
map.push("Function", "Vortex Scan");
if !bind_data.filter_exprs.is_empty() {
let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{}", f));
map.push("Filters", &filters.join(" /\\\n"));
let mut filters = bind_data.filter_exprs.iter().map(|f| format!("{f}"));
map.push("Filters", &filters.join("\n"));
}
}
}
Expand Down Expand Up @@ -687,6 +713,7 @@ struct FilterWithVirtualColumns {
row_range: Option<Range<u64>>,
file_selection: Selection,
file_range: Option<Range<u64>>,
has_non_optional_filter: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define this!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does duckdb do? what about or and other filters?

Copy link
Copy Markdown
Contributor Author

@myrrc myrrc May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what duckdb does. The only distinction is "at least one non-optional filter"

}

/// Creates a table filter expression, row selection, and row range from the table filter set,
Expand All @@ -698,6 +725,8 @@ fn extract_table_filter_expr(
additional_filters: &[Expression],
dtype: &DType,
) -> VortexResult<FilterWithVirtualColumns> {
let mut has_non_optional_filter = false;

let mut table_filter_exprs: HashSet<Expression> = if let Some(filter) = table_filter_set {
filter
.into_iter()
Expand All @@ -706,6 +735,8 @@ fn extract_table_filter_expr(
!is_virtual_column(column_ids[idx_u])
})
.map(|(idx, ex)| {
has_non_optional_filter |= !matches!(ex.as_class(), TableFilterClass::Optional(_));

let idx_u: usize = idx.as_();
let col_idx: usize = column_ids[idx_u].as_();
let name = &column_fields.get(col_idx).vortex_expect("exists").name;
Expand Down Expand Up @@ -741,6 +772,7 @@ fn extract_table_filter_expr(
row_range,
file_selection,
file_range,
has_non_optional_filter,
};
Ok(out)
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-duckdb/src/duckdb/table_function/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ impl<'a, T: TableFunction> TableInitInput<'a, T> {
}

/// Returns the bind data for the table function.
pub fn bind_data(&self) -> &T::BindData {
unsafe { &*self.input.bind_data.cast::<T::BindData>() }
pub fn bind_data(&self) -> &mut T::BindData {
unsafe { &mut *self.input.bind_data.cast::<T::BindData>() }
}

pub fn column_ids(&self) -> &[u64] {
Expand Down
Loading