diff --git a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h index 3e2b0580684..ddf55b532a1 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/table_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/table_function.h @@ -12,11 +12,6 @@ #include "error.h" #include "table_filter.h" #include "duckdb_vx/data.h" -#include - -#ifdef __cplusplus -static_assert(sizeof(idx_t) == 8); -#endif #ifdef __cplusplus extern "C" { @@ -88,16 +83,6 @@ typedef struct { bool has_null; } duckdb_column_statistics; -const idx_t INVALID_IDX = UINT64_MAX; - -typedef struct { - idx_t partition_index; - // Either INVALID_IDX or position of column in output for file_index column - size_t file_index_column_pos; - // File index for the exported partition. - size_t file_index; -} duckdb_vx_partition_data; - // vtable mimicking subset of TableFunction. // See duckdb/include/function/tfunc.hpp typedef struct { @@ -138,10 +123,10 @@ typedef struct { double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state); - void (*get_partition_data)(const void *bind_data, - void *init_global_data, - void *init_local_data, - duckdb_vx_partition_data *partition_data_out); + idx_t (*get_partition_data)(const void *bind_data, + void *init_global_data, + void *init_local_data, + duckdb_vx_error *error_out); } duckdb_vx_tfunc_vtab_t; // A single function for configuring the DuckDB table function vtable. diff --git a/vortex-duckdb/cpp/table_function.cpp b/vortex-duckdb/cpp/table_function.cpp index d26d0fa2f04..a0ebabd068d 100644 --- a/vortex-duckdb/cpp/table_function.cpp +++ b/vortex-duckdb/cpp/table_function.cpp @@ -10,7 +10,6 @@ DUCKDB_INCLUDES_BEGIN #include "duckdb.h" #include "duckdb/catalog/catalog.hpp" #include "duckdb/common/insertion_order_preserving_map.hpp" -#include "duckdb/common/multi_file/multi_file_reader.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/main/capi/capi_internal.hpp" #include "duckdb/main/connection.hpp" @@ -20,8 +19,6 @@ DUCKDB_INCLUDES_END using namespace duckdb; using vortex::CData; using vortex::IntoErrString; -constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX; -constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER; struct CTableFunctionInfo final : TableFunctionInfo { explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) { @@ -337,50 +334,21 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu result.return_types.emplace_back(logical_type); } -/** - * Called at planning time to determine whether data is partitioned by a - * given set of columns. Requested columns are GROUP BY parameters i.e. columns - * over which the query aggregates. - */ -TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) { - const vector &ids = input.partition_ids; - // Our data is partitioned by array exporters. Each exporter processes a - // single Array which belongs to a single file. If data is partitioned only - // by file_index, there is one unique value for an Array. Otherwise there - // may be multiple values. - return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX) - ? TablePartitionInfo::SINGLE_VALUE_PARTITIONS - : TablePartitionInfo::NOT_PARTITIONED; -} - -/** - * Duckdb requests this function after exporting the chunk. We answer with - * partition_index we have exported as well as information about constant - * columns in this partition. As data is partitioned by array exporters, in - * each partition ~ exported array file_index is constant. - */ -OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { +OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) { + if (input.partition_info.RequiresPartitionColumns()) { + throw InternalException("TableScan::GetPartitionData: partition columns not supported"); + } auto &bind = input.bind_data->Cast(); void *const ffi_bind = bind.ffi_data->DataPtr(); void *const ffi_global = input.global_state->Cast().ffi_data->DataPtr(); void *const ffi_local = input.local_state->Cast().ffi_data->DataPtr(); - duckdb_vx_partition_data partition_data; - bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data); - - OperatorPartitionData out(partition_data.partition_index); - - // file_index_column_pos may be INVALID_IDX, but column_index will never - // be INVALID_IDX, so we can compare directly - for (const column_t column_index : input.partition_info.partition_columns) { - if (column_index == partition_data.file_index_column_pos) { - out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index)); - } else { - throw InternalException(StringUtil::Format( - "get_partition_data: requested column_index %d is not constant for given partition", - column_index)); - } + + duckdb_vx_error error_out = nullptr; + const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out); + if (error_out) { + throw InvalidInputException(IntoErrString(error_out)); } - return out; + return OperatorPartitionData(batch_index); } extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) { @@ -409,30 +377,21 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d tf.projection_pushdown = true; tf.filter_pushdown = true; + // We can prune out filter columns that are unused in the remainder of the query plan. + // e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function. tf.filter_prune = true; tf.sampling_pushdown = false; + tf.late_materialization = false; tf.pushdown_complex_filter = c_pushdown_complex_filter; tf.cardinality = c_cardinality; - tf.get_partition_info = get_partition_info; - tf.get_partition_data = get_partition_data; + tf.get_partition_data = c_get_partition_data; tf.to_string = c_to_string; tf.table_scan_progress = c_table_scan_progress; tf.statistics = c_statistics; - tf.late_materialization = true; - // Columns that uniquely identify a row for deferred re-fetch in a multi - // file scan: (file index, row number in file). - tf.get_row_id_columns = [](auto &, auto) -> vector { - return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER}; - }; - tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t { - return { - {COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}}, - {COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}}, - {COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}}, - }; + return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}}; }; tf.arguments.resize(vtab->parameter_count); diff --git a/vortex-duckdb/src/convert/mod.rs b/vortex-duckdb/src/convert/mod.rs index d61aa81e005..8caab12b04c 100644 --- a/vortex-duckdb/src/convert/mod.rs +++ b/vortex-duckdb/src/convert/mod.rs @@ -11,5 +11,4 @@ pub use dtype::from_duckdb_table; pub use expr::try_from_bound_expression; pub use scalar::*; pub use table_filter::try_from_table_filter; -pub use table_filter::try_from_virtual_column_filter; pub use vector::data_chunk_to_vortex; diff --git a/vortex-duckdb/src/convert/table_filter.rs b/vortex-duckdb/src/convert/table_filter.rs index a1c046c678c..fa9003bc09c 100644 --- a/vortex-duckdb/src/convert/table_filter.rs +++ b/vortex-duckdb/src/convert/table_filter.rs @@ -1,17 +1,14 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::ops::Range; use std::sync::Arc; use itertools::Itertools; -use vortex::buffer::Buffer; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; -use vortex::error::vortex_err; use vortex::expr::Expression; use vortex::expr::and_collect; use vortex::expr::get_item; @@ -24,13 +21,10 @@ use vortex::scalar::Scalar; use vortex::scalar_fn::ScalarFnVTableExt; use vortex::scalar_fn::fns::binary::Binary; use vortex::scalar_fn::fns::operators::CompareOperator; -use vortex::scan::selection::Selection; use crate::cpp::DUCKDB_VX_EXPR_TYPE; -use crate::duckdb::ExtractedValue; use crate::duckdb::TableFilterClass; use crate::duckdb::TableFilterRef; -use crate::duckdb::ValueRef; pub fn try_from_table_filter( value: &TableFilterRef, @@ -131,96 +125,3 @@ pub fn try_from_table_filter( } })) } - -fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult { - match value.extract() { - ExtractedValue::BigInt(i) => { - u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}")) - } - ExtractedValue::Integer(i) => { - u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}")) - } - ExtractedValue::UBigInt(u) => Ok(u), - ExtractedValue::UInteger(u) => Ok(u64::from(u)), - _ => vortex_bail!("unexpected value type"), - } -} - -fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec { - let mut result = Vec::new(); - let (mut i, mut j) = (0, 0); - while i < left.len() && j < right.len() { - match left[i].cmp(&right[j]) { - std::cmp::Ordering::Equal => { - result.push(left[i]); - i += 1; - j += 1; - } - std::cmp::Ordering::Less => i += 1, - std::cmp::Ordering::Greater => j += 1, - } - } - result -} - -/// For constant comparison on IN filters over file_index or file_row_number -/// virtual column, create a selection and a range covering the same range as -/// expressions do. -pub fn try_from_virtual_column_filter( - filter: &TableFilterRef, -) -> VortexResult<(Selection, Option>)> { - match filter.as_class() { - TableFilterClass::InFilter(values) => { - let indices = values - .iter() - .map(nonnegative_number_from_value) - .collect::>>()?; - Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None)) - } - TableFilterClass::ConstantComparison(const_) => { - let n = nonnegative_number_from_value(const_.value)?; - let range = match const_.operator { - DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1), - DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => { - Some(n..u64::MAX) - } - DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => { - Some(n.saturating_add(1)..u64::MAX) - } - DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => { - Some(0..n.saturating_add(1)) - } - DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n), - _ => None, - }; - Ok((Selection::All, range)) - } - TableFilterClass::ConjunctionAnd(conj) => { - let mut start = 0u64; - let mut end = u64::MAX; - let mut indices: Option> = None; - for child in conj.children() { - let (sel, range) = try_from_virtual_column_filter(child)?; - if let Selection::IncludeByIndex(buf) = sel { - indices = Some(match indices { - None => buf.iter().copied().collect(), - Some(existing) => intersect_sorted(&existing, buf.as_ref()), - }); - } - if let Some(r) = range { - start = start.max(r.start); - end = end.min(r.end); - } - } - let range = (start < end).then_some(start..end); - let sel = indices - .map(|v| Selection::IncludeByIndex(Buffer::from_iter(v))) - .unwrap_or(Selection::All); - Ok((sel, range)) - } - TableFilterClass::Optional(child) => { - try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None))) - } - _ => Ok((Selection::All, None)), - } -} diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 9ca9148433c..33bd8095b4f 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -8,7 +8,6 @@ //! pushdown, cardinality, and partitioning. use std::fmt::Debug; -use std::ops::Range; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -29,16 +28,12 @@ use vortex::array::optimizer::ArrayOptimizer; use vortex::array::stats::StatsSet; use vortex::dtype::DType; use vortex::dtype::FieldNames; -use vortex::dtype::PType; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::Expression; use vortex::expr::and_collect; -use vortex::expr::cast; use vortex::expr::col; -use vortex::expr::merge; -use vortex::expr::pack; use vortex::expr::root; use vortex::expr::select; use vortex::expr::stats::Precision; @@ -47,7 +42,6 @@ use vortex::file::v2::FileStatsLayoutReader; use vortex::io::kanal_ext::KanalExt; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; -use vortex::layout::layouts::row_idx::row_idx; use vortex::layout::scan::multi::MultiLayoutChild; use vortex::layout::scan::multi::MultiLayoutDataSource; use vortex::metrics::tracing::get_global_labels; @@ -56,7 +50,6 @@ use vortex::scalar::ScalarValue; use vortex::scalar_fn::fns::pack::Pack; use vortex::scan::DataSource; use vortex::scan::ScanRequest; -use vortex::scan::selection::Selection; use vortex_utils::aliases::hash_set::HashSet; use vortex_utils::parallelism::get_available_parallelism; @@ -65,7 +58,6 @@ use crate::SESSION; use crate::convert::ToDuckDBScalar; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; -use crate::convert::try_from_virtual_column_filter; use crate::duckdb::BindInputRef; use crate::duckdb::BindResultRef; use crate::duckdb::Cardinality; @@ -75,25 +67,25 @@ use crate::duckdb::DataChunkRef; use crate::duckdb::DuckdbStringMapRef; use crate::duckdb::ExpressionRef; use crate::duckdb::LogicalType; -use crate::duckdb::PartitionData; use crate::duckdb::TableFilterSetRef; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; -use crate::duckdb::Value; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; -// See MultiFileReader for constants - -/// "file_index" virtual column -static FILE_INDEX_COLUMN_IDX: u64 = 9223372036854775810; -/// "file_row_number" virtual column -static FILE_ROW_NUMBER_COLUMN_IDX: u64 = 9223372036854775809; - -/// See duckdb/src/common/constants.cpp -fn is_virtual_column(id: u64) -> bool { - id >= 9223372036854775808u64 -} +/// Taken from +/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/include/duckdb/common/constants.hpp#L44 +/// +/// If DuckDB requests a zero-column projection from read_vortex like count(*), +/// its planner tries to get any column: +/// https://github.com/duckdb/duckdb/blob/dc11eadd8f0a7c600f0034810706605ebe10d5b9/src/planner/operator/logical_get.cpp#L149 +/// +/// If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the +/// first column. As we don't want to fill the output chunk and we can leave +/// it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a +/// virtual column. +/// See virtual_columns in vortex-duckdb/cpp/table_function.cpp +static EMPTY_COLUMN_IDX: u64 = 18446744073709551614; /// A trait for table functions that resolve to a [`DataSourceRef`]. /// @@ -157,16 +149,14 @@ pub struct DataSourceGlobal { batch_id: AtomicU64, bytes_total: Arc, bytes_read: AtomicU64, - file_index_column_pos: Option, - file_row_number_column_pos: Option, } /// Per-thread local scan state. pub struct DataSourceLocal { iterator: DataSourceIterator, exporter: Option, - partition_index: u64, - file_index: usize, + /// The unique batch id of the last chunk exported via scan(). + batch_id: Option, } /// Returns scan progress as a percentage (0.0–100.0). @@ -291,19 +281,9 @@ impl TableFunction for T { let column_ids = init_input.column_ids(); let projection_ids = init_input.projection_ids(); - let ProjectionWithVirtualColumns { - projection, - file_index_column_pos, - file_row_number_column_pos, - } = extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields); - - let FilterWithVirtualColumns { - filter, - row_selection, - row_range, - file_selection, - file_range, - } = extract_table_filter_expr( + let projection_expr = + extract_projection_expr(projection_ids, column_ids, &bind_data.column_fields); + let filter_expr = extract_table_filter_expr( init_input.table_filter_set(), column_ids, &bind_data.column_fields, @@ -311,24 +291,16 @@ impl TableFunction for T { bind_data.data_source.dtype(), )?; - let filter_expr_str = filter + let filter_expr_str = filter_expr .as_ref() .map_or_else(|| "true".to_string(), |f| f.to_string()); - debug!( - "Global init Vortex scan SELECT {projection} WHERE {filter_expr_str}\n - row selection: {row_selection:?}, row range: {row_range:?}, - file selection: {file_selection:?}, file range: {file_range:?}" - ); + debug!("Global init Vortex scan SELECT {projection_expr} WHERE {filter_expr_str}"); let request = ScanRequest { - projection, - filter, - ordered: file_row_number_column_pos.is_some(), - selection: row_selection, - row_range, - file_selection, - file_range, - limit: None, + projection: projection_expr, + filter: filter_expr, + ordered: false, + ..Default::default() }; let scan = RUNTIME.block_on(bind_data.data_source.scan(request))?; @@ -346,22 +318,13 @@ impl TableFunction for T { let stream = scan .partitions() .map(move |partition| { + // We create a new conversion cache scoped to the partition, since there's no point + // caching anything across partitions. + let cache = Arc::new(ConversionCache::default()); let tx = tx.clone(); - RUNTIME.handle().spawn(async move { - let partition = match partition { - Ok(partition) => partition, - Err(e) => { - let _ = tx.send(Err(e)).await; - return; - } - }; - - let cache = Arc::new(ConversionCache { - file_index: partition.index(), - ..Default::default() - }); - let mut stream = match partition.execute() { + RUNTIME.handle().spawn(async move { + let mut stream = match partition.and_then(|p| p.execute()) { Ok(s) => s, Err(e) => { let _ = tx.send(Err(e)).await; @@ -393,8 +356,6 @@ impl TableFunction for T { batch_id: AtomicU64::new(0), bytes_total: Arc::new(AtomicU64::new(0)), bytes_read: AtomicU64::new(0), - file_index_column_pos, - file_row_number_column_pos, }) } @@ -420,8 +381,7 @@ impl TableFunction for T { Ok(DataSourceLocal { iterator: global.iterator.clone(), exporter: None, - partition_index: 0, - file_index: 0, + batch_id: None, }) } @@ -440,7 +400,6 @@ impl TableFunction for T { }; let (array_result, conversion_cache) = result?; let array_result = array_result.optimize_recursive(ctx.session())?; - local_state.file_index = conversion_cache.file_index; let array_result: StructArray = if let Some(array) = array_result.as_opt::() { @@ -464,19 +423,15 @@ impl TableFunction for T { ctx, )?); // Relaxed since there is no intra-instruction ordering required. - local_state.partition_index = global_state.batch_id.fetch_add(1, Ordering::Relaxed); + local_state.batch_id = Some(global_state.batch_id.fetch_add(1, Ordering::Relaxed)); } let exporter = local_state .exporter .as_mut() .vortex_expect("error: exporter missing"); - let has_more_data = exporter.export( - chunk, - global_state.file_index_column_pos, - global_state.file_row_number_column_pos, - )?; + let has_more_data = exporter.export(chunk)?; global_state .bytes_read .fetch_add(chunk.len(), Ordering::Relaxed); @@ -484,7 +439,7 @@ impl TableFunction for T { if !has_more_data { // This exporter is fully consumed. local_state.exporter = None; - local_state.partition_index = 0; + local_state.batch_id = None; } else { break; } @@ -492,12 +447,6 @@ impl TableFunction for T { assert!(!chunk.is_empty()); - if let Some(pos) = global_state.file_index_column_pos { - chunk - .get_vector_mut(pos) - .reference_value(&Value::from(local_state.file_index as u64)); - } - Ok(()) } @@ -567,14 +516,12 @@ impl TableFunction for T { fn partition_data( _bind_data: &Self::BindData, - global_init_data: &Self::GlobalState, + _global_init_data: &Self::GlobalState, local_init_data: &mut Self::LocalState, - ) -> PartitionData { - PartitionData { - partition_index: local_init_data.partition_index, - file_index_column_pos: global_init_data.file_index_column_pos, - file_index: local_init_data.file_index, - } + ) -> VortexResult { + local_init_data + .batch_id + .ok_or_else(|| vortex_err!("batch id missing, no batches exported")) } fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) { @@ -610,96 +557,53 @@ fn extract_schema_from_dtype(dtype: &DType) -> VortexResult> { Ok(fields) } -struct ProjectionWithVirtualColumns { - projection: Expression, - file_index_column_pos: Option, - file_row_number_column_pos: Option, -} - -/// Creates a projection expression from raw projection/column ID slices and -/// column names. +/// Creates a projection expression from raw projection/column ID slices and column names. fn extract_projection_expr( projection_ids: Option<&[u64]>, column_ids: &[u64], column_fields: &[DuckdbField], -) -> ProjectionWithVirtualColumns { - // If projection ids are empty, use column_ids. +) -> Expression { + // Projection ids may be empty, in which case you need to use projection_ids // See duckdb/src/planner/operator/logical_get.cpp#L168 - let (ids, has_projection_ids) = match projection_ids { + let (projection_ids, has_projection_ids) = match projection_ids { Some(ids) => (ids, true), None => (column_ids, false), }; - let mut file_index_column_pos = None; - let mut file_row_number_column_pos = None; - + // duckdb index is u64 (size_t) but in Rust u64 and usize are different things. #[expect(clippy::cast_possible_truncation)] - let names = ids + let names = projection_ids .iter() - .enumerate() - .map(|(column_pos, &column_id)| { - let column_id = if has_projection_ids { - column_ids[column_id as usize] - } else { - column_id - }; - - if column_id == FILE_INDEX_COLUMN_IDX { - file_index_column_pos = Some(column_pos); - } - if column_id == FILE_ROW_NUMBER_COLUMN_IDX { - file_row_number_column_pos = Some(column_pos); + .filter(|p| **p != EMPTY_COLUMN_IDX) + .map(|mut idx| { + if has_projection_ids { + idx = &column_ids[*idx as usize]; } - column_id + #[expect(clippy::cast_possible_truncation)] + &column_fields + .get(*idx as usize) + .vortex_expect("prune idx in column names") + .name }) - .filter(|&col_id| !is_virtual_column(col_id)) - .map(|col_id| Arc::from(column_fields[col_id as usize].name.as_str())) + .map(|s| Arc::from(s.as_str())) .collect::(); - // file_index column will be filled later when exporting the chunk. - let select = select(names, root()); - let projection = if file_row_number_column_pos.is_some() { - // row_idx will be rearranged to correct position in scan(), prepend - // here - let row_idx = cast(row_idx(), DType::Primitive(PType::I64, false.into())); - let row_idx_struct = pack([("file_row_number", row_idx)], false.into()); - merge([row_idx_struct, select]) - } else { - select - }; - - ProjectionWithVirtualColumns { - projection, - file_index_column_pos, - file_row_number_column_pos, - } -} - -struct FilterWithVirtualColumns { - filter: Option, - row_selection: Selection, - row_range: Option>, - file_selection: Selection, - file_range: Option>, + select(names, root()) } -/// Creates a table filter expression, row selection, and row range from the table filter set, -/// column metadata, additional filter expressions, and the top-level DType. +/// Creates a table filter expression from the table filter set, column metadata, additional +/// filter expressions, and the top-level DType. fn extract_table_filter_expr( table_filter_set: Option<&TableFilterSetRef>, column_ids: &[u64], column_fields: &[DuckdbField], additional_filters: &[Expression], dtype: &DType, -) -> VortexResult { +) -> VortexResult> { let mut table_filter_exprs: HashSet = if let Some(filter) = table_filter_set { filter .into_iter() - .filter(|(idx, _)| { - let idx_u: usize = idx.as_(); - !is_virtual_column(column_ids[idx_u]) - }) .map(|(idx, ex)| { let idx_u: usize = idx.as_(); let col_idx: usize = column_ids[idx_u].as_(); @@ -713,31 +617,7 @@ fn extract_table_filter_expr( }; table_filter_exprs.extend(additional_filters.iter().cloned()); - - let mut file_selection = Selection::All; - let mut row_selection = Selection::All; - let mut row_range = None; - let mut file_range = None; - if let Some(filter) = table_filter_set { - for (idx, expression) in filter.into_iter() { - let idx: usize = idx.as_(); - if column_ids[idx] == FILE_ROW_NUMBER_COLUMN_IDX { - (row_selection, row_range) = try_from_virtual_column_filter(expression)?; - } - if column_ids[idx] == FILE_INDEX_COLUMN_IDX { - (file_selection, file_range) = try_from_virtual_column_filter(expression)?; - } - } - }; - - let out = FilterWithVirtualColumns { - filter: and_collect(table_filter_exprs), - row_selection, - row_range, - file_selection, - file_range, - }; - Ok(out) + Ok(and_collect(table_filter_exprs)) } #[cfg(test)] diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 1b3b8199e0e..ac671f409e8 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -36,12 +36,6 @@ use crate::duckdb::table_function::statistics::statistics; use crate::duckdb::table_function::table_scan_progress::table_scan_progress_callback; use crate::duckdb_try; -pub struct PartitionData { - pub partition_index: u64, - pub file_index_column_pos: Option, - pub file_index: usize, -} - #[derive(Debug, Default)] pub struct ColumnStatistics { pub min: Option, @@ -143,10 +137,10 @@ pub trait TableFunction: Sized + Debug { /// Returns the idx of the current partition being processed by a local threa. /// This *must* be globally unique. fn partition_data( - bind_data: &Self::BindData, - global_init_data: &Self::GlobalState, - local_init_data: &mut Self::LocalState, - ) -> PartitionData; + _bind_data: &Self::BindData, + _global_init_data: &Self::GlobalState, + _local_init_data: &mut Self::LocalState, + ) -> VortexResult; /// Returns a vector of key-value pairs for EXPLAIN output fn to_string(bind_data: &Self::BindData, map: &mut DuckdbStringMapRef); diff --git a/vortex-duckdb/src/duckdb/table_function/partition.rs b/vortex-duckdb/src/duckdb/table_function/partition.rs index d373d6f5623..09d0e8fbd94 100644 --- a/vortex-duckdb/src/duckdb/table_function/partition.rs +++ b/vortex-duckdb/src/duckdb/table_function/partition.rs @@ -3,9 +3,11 @@ use std::ffi::c_void; +use cpp::duckdb_vx_error; use vortex::error::VortexExpect; use crate::cpp; +use crate::cpp::idx_t; use crate::duckdb::TableFunction; /// Native callback for the cardinality estimate of a table function. @@ -13,18 +15,22 @@ pub(crate) unsafe extern "C-unwind" fn get_partition_data_callback idx_t { let bind_data = unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null pointer"); let global_init_data = unsafe { global_init_data.cast::().as_ref() } .vortex_expect("global_init_data null pointer"); let local_init_data = unsafe { local_init_data.cast::().as_mut() } .vortex_expect("local_init_data null pointer"); - let data = T::partition_data(bind_data, global_init_data, local_init_data); - let out = unsafe { &mut *partition_data_out }; - out.partition_index = data.partition_index; - out.file_index_column_pos = data.file_index_column_pos.unwrap_or(usize::MAX); - out.file_index = data.file_index; + match T::partition_data(bind_data, global_init_data, local_init_data) { + Ok(batch_id) => batch_id, + Err(e) => { + // Set the error in the error output. + let msg = e.to_string(); + unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) }; + 0 + } + } } diff --git a/vortex-duckdb/src/e2e_test/object_cache_test.rs b/vortex-duckdb/src/e2e_test/object_cache_test.rs index b21c10b1d11..ae9dc38d0db 100644 --- a/vortex-duckdb/src/e2e_test/object_cache_test.rs +++ b/vortex-duckdb/src/e2e_test/object_cache_test.rs @@ -15,7 +15,6 @@ use crate::duckdb::ClientContextRef; use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; use crate::duckdb::LogicalType; -use crate::duckdb::PartitionData; use crate::duckdb::TableFunction; use crate::duckdb::TableInitInput; @@ -111,12 +110,8 @@ impl TableFunction for TestTableFunction { _bind_data: &Self::BindData, _global_init_data: &Self::GlobalState, _local_init_data: &mut Self::LocalState, - ) -> PartitionData { - PartitionData { - partition_index: 0, - file_index_column_pos: None, - file_index: 0, - } + ) -> VortexResult { + Ok(0) } fn statistics( diff --git a/vortex-duckdb/src/exporter/cache.rs b/vortex-duckdb/src/exporter/cache.rs index 2f495ba9608..3b0fd496360 100644 --- a/vortex-duckdb/src/exporter/cache.rs +++ b/vortex-duckdb/src/exporter/cache.rs @@ -21,5 +21,4 @@ pub struct ConversionCache { pub dict_cache: DashMap, pub values_cache: DashMap>)>, pub canonical_cache: DashMap, - pub file_index: usize, } diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index 517776f5521..f76b3e41124 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -33,7 +33,6 @@ use vortex::array::arrays::struct_::StructArrayExt; use vortex::buffer::BitChunks; use vortex::encodings::runend::RunEnd; use vortex::encodings::sequence::Sequence; -use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::vortex_bail; @@ -75,22 +74,15 @@ impl ArrayExporter { /// Export the data into the next chunk. /// /// Returns `true` if a chunk was exported, `false` if all rows have been exported. - pub fn export( - &mut self, - chunk: &mut DataChunkRef, - file_index_column_pos: Option, - file_row_number_column_pos: Option, - ) -> VortexResult { + pub fn export(&mut self, chunk: &mut DataChunkRef) -> VortexResult { chunk.reset(); if self.remaining == 0 { return Ok(false); } - let zero_projection = self.fields.is_empty(); - - // file_row_number column is already populated in scan construction - let expected_cols = self.fields.len() + file_index_column_pos.is_some() as usize; + let expected_cols = self.fields.len(); let chunk_cols = chunk.column_count(); + let zero_projection = expected_cols == 0; if !zero_projection && chunk_cols != expected_cols { vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); } @@ -100,48 +92,14 @@ impl ArrayExporter { self.remaining -= chunk_len; chunk.set_len(chunk_len); - // If DuckDB requests a zero-column projection from read_vortex like count(*), - // its planner tries to get any column: - // See duckdb/src/planner/operator/logical_get.cpp#L149 - // - // If you define COLUMN_IDENTIFIER_EMPTY, planner takes it, otherwise the - // first column. As we don't want to fill the output chunk and we can leave - // it uninitialized in this case, we define COLUMN_IDENTIFIER_EMPTY as a - // virtual column. - // See virtual_columns in vortex-duckdb/cpp/table_function.cpp + // DuckDB asked us for zero columns. This may happen with aggregation + // functions like count(*). In such case we can leave chunk contents + // uninitialized. See EMPTY_COLUMN_IDX comment why this works. if zero_projection { return Ok(true); } - let mut fields = self.fields.iter(); - // file_row_number column is the first one if present. - if let Some(pos) = file_row_number_column_pos { - let field = fields.next().vortex_expect("field column mismatch"); - field.export( - position, - chunk_len, - chunk.get_vector_mut(pos), - &mut self.ctx, - )?; - } - - for i in 0..chunk_cols { - // file_index column: skip index - it will be filled after - // chunk export. - if let Some(pos) = file_index_column_pos - && i == pos - { - continue; - } - - // file_row_number column: skip index, already filled - if let Some(pos) = file_row_number_column_pos - && i == pos - { - continue; - } - - let field = fields.next().vortex_expect("field count mismatch"); + for (i, field) in self.fields.iter_mut().enumerate() { field.export(position, chunk_len, chunk.get_vector_mut(i), &mut self.ctx)?; } diff --git a/vortex-ffi/src/scan.rs b/vortex-ffi/src/scan.rs index 4e551d472c4..9a52aefa5f9 100644 --- a/vortex-ffi/src/scan.rs +++ b/vortex-ffi/src/scan.rs @@ -189,8 +189,6 @@ fn scan_request(opts: *const vx_scan_options) -> VortexResult { selection, ordered, limit, - file_selection: Selection::All, - file_range: None, }) } diff --git a/vortex-jni/src/scan.rs b/vortex-jni/src/scan.rs index cea046c990d..36cadfbb201 100644 --- a/vortex-jni/src/scan.rs +++ b/vortex-jni/src/scan.rs @@ -111,8 +111,6 @@ fn build_scan_request( selection, ordered, limit, - file_selection: Selection::All, - file_range: None, }) } diff --git a/vortex-layout/src/scan/layout.rs b/vortex-layout/src/scan/layout.rs index c1ccc0eea0e..0b46da11561 100644 --- a/vortex-layout/src/scan/layout.rs +++ b/vortex-layout/src/scan/layout.rs @@ -289,10 +289,6 @@ impl Partition for LayoutReaderSplit { self } - fn index(&self) -> usize { - 0 - } - fn row_count(&self) -> Option> { let row_count = self.row_range.end - self.row_range.start; let row_count = self.selection.row_count(row_count); @@ -355,10 +351,6 @@ impl Partition for Empty { self } - fn index(&self) -> usize { - 0 - } - fn row_count(&self) -> Option> { Some(Precision::exact(self.row_count)) } diff --git a/vortex-layout/src/scan/multi.rs b/vortex-layout/src/scan/multi.rs index bd84340fe45..756d21c5019 100644 --- a/vortex-layout/src/scan/multi.rs +++ b/vortex-layout/src/scan/multi.rs @@ -51,7 +51,6 @@ use vortex_scan::Partition; use vortex_scan::PartitionRef; use vortex_scan::PartitionStream; use vortex_scan::ScanRequest; -use vortex_scan::selection::Selection; use vortex_session::VortexSession; use vortex_utils::parallelism::get_available_parallelism; @@ -310,9 +309,8 @@ impl DataSourceScan for MultiLayoutScan { // `flat_map` is appropriate here. The real I/O work happens when `execute()` is called. ready_stream .chain(deferred_stream) - .enumerate() - .flat_map(move |(i, reader_result)| match reader_result { - Ok(reader) => reader_partition(i, reader, session.clone(), request.clone()), + .flat_map(move |reader_result| match reader_result { + Ok(reader) => reader_partition(reader, session.clone(), request.clone()), Err(e) => stream::once(async move { Err(e) }).boxed(), }) .boxed() @@ -325,7 +323,6 @@ impl DataSourceScan for MultiLayoutScan { /// can match, returns an empty stream. Otherwise, yields a single partition covering the /// reader's full row range. fn reader_partition( - partition_idx: usize, reader: LayoutReaderRef, session: VortexSession, request: ScanRequest, @@ -333,26 +330,6 @@ fn reader_partition( let row_count = reader.row_count(); let row_range = request.row_range.clone().unwrap_or(0..row_count); - let partition_idx_u64: u64 = partition_idx as u64; - if let Some(range) = &request.file_range - && !range.contains(&partition_idx_u64) - { - return stream::empty().boxed(); - }; - match &request.file_selection { - Selection::IncludeByIndex(buffer) => { - if buffer.as_slice().binary_search(&partition_idx_u64).is_err() { - return stream::empty().boxed(); - } - } - Selection::ExcludeByIndex(buffer) => { - if buffer.as_slice().binary_search(&partition_idx_u64).is_ok() { - return stream::empty().boxed(); - } - } - _ => {} - }; - // Check file-level pruning: if the filter can be proven false for the entire row range // using file-level statistics, skip this reader entirely. if let Some(filter) = &request.filter { @@ -374,7 +351,6 @@ fn reader_partition( row_range: Some(row_range), ..request }, - index: partition_idx, }) as PartitionRef) }) .boxed() @@ -388,7 +364,6 @@ struct MultiLayoutPartition { reader: LayoutReaderRef, session: VortexSession, request: ScanRequest, - index: usize, } impl Partition for MultiLayoutPartition { @@ -396,10 +371,6 @@ impl Partition for MultiLayoutPartition { self } - fn index(&self) -> usize { - self.index - } - fn row_count(&self) -> Option> { let row_range = self.request.row_range.as_ref()?; let row_count = row_range.end - row_range.start; diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index a80bad4dd43..2748e5cc037 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -54,10 +54,6 @@ pub fn vortex_scan::selection::Selection::fmt(&self, f: &mut core::fmt::Formatte pub struct vortex_scan::ScanRequest -pub vortex_scan::ScanRequest::file_range: core::option::Option> - -pub vortex_scan::ScanRequest::file_selection: vortex_scan::selection::Selection - pub vortex_scan::ScanRequest::filter: core::option::Option pub vortex_scan::ScanRequest::limit: core::option::Option @@ -122,8 +118,6 @@ pub fn vortex_scan::Partition::byte_size(&self) -> core::option::Option) -> vortex_error::VortexResult -pub fn vortex_scan::Partition::index(&self) -> usize - pub fn vortex_scan::Partition::row_count(&self) -> core::option::Option> pub fn vortex_scan::Partition::serialize(&self) -> vortex_error::VortexResult>> diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index d17528998b1..edcb5e29979 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -125,10 +125,6 @@ pub struct ScanRequest { /// A row selection to apply to the scan. The selection identifies rows within the specified /// row range. pub selection: Selection, - /// If we're operating on files, what files to read - pub file_selection: Selection, - /// If we're operating on files, what files to read - pub file_range: Option>, /// Whether the scan should preserve row order. If false, the scan may produce rows in any /// order, for example to enable parallel execution across partitions. pub ordered: bool, @@ -144,10 +140,8 @@ impl Default for ScanRequest { filter: None, row_range: None, selection: Selection::default(), - file_selection: Selection::default(), ordered: false, limit: None, - file_range: None, } } } @@ -175,11 +169,6 @@ pub trait Partition: 'static + Send { /// Downcast the partition to a concrete type. fn as_any(&self) -> &dyn Any; - /// Some unique identifier for partition if it's present. - /// Used mainly to filter out unused partitions with Duckdb's - /// late materialization support - fn index(&self) -> usize; - /// Returns an estimate of the row count for this partition. fn row_count(&self) -> Option>; diff --git a/vortex-sqllogictest/slt/duckdb/file_index.slt b/vortex-sqllogictest/slt/duckdb/file_index.slt deleted file mode 100644 index ea2693e81a3..00000000000 --- a/vortex-sqllogictest/slt/duckdb/file_index.slt +++ /dev/null @@ -1,50 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright the Vortex contributors - -include ../setup.slt - -query I -COPY (SELECT * FROM (VALUES ('Hello'), ('Hi'), ) AS t(str)) TO '$__TEST_DIR__/file_index_1.vortex'; ----- -2 - -query I -COPY (SELECT * FROM (VALUES ('1'), ('2'), ('3')) AS t(str)) TO '$__TEST_DIR__/file_index_2.vortex'; ----- -3 - -query TI -SELECT str, file_index FROM '$__TEST_DIR__/file_index_1.vortex'; ----- -Hello 0 -Hi 0 - -query IT -SELECT file_index, str FROM '$__TEST_DIR__/file_index_2.vortex'; ----- -0 1 -0 2 -0 3 - -query TB -SELECT *, file_index < 2 FROM '$__TEST_DIR__/*.vortex' -ORDER BY str; ----- -1 true -2 true -3 true -Hello true -Hi true - -query IB -SELECT count(*) AS cnt, sum(file_index) <= 3 FROM '$__TEST_DIR__/*.vortex' -ORDER BY cnt; ----- -5 true - -query B -SELECT file_index < 2 FROM '$__TEST_DIR__/*.vortex' -WHERE len(str) > 1; ----- -true -true diff --git a/vortex-sqllogictest/slt/duckdb/file_row_number.slt b/vortex-sqllogictest/slt/duckdb/file_row_number.slt deleted file mode 100644 index 4cb0a459480..00000000000 --- a/vortex-sqllogictest/slt/duckdb/file_row_number.slt +++ /dev/null @@ -1,56 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright the Vortex contributors - -include ../setup.slt - -query I -COPY (SELECT * FROM (VALUES ('Hello'), ('Hi')) AS t(str)) TO '$__TEST_DIR__/file_row_1.vortex'; ----- -2 - -query I -COPY (SELECT * FROM (VALUES ('1'), ('2'), ('3'), ('45')) AS t(str)) TO '$__TEST_DIR__/file_row_2.vortex'; ----- -4 - -query TI -SELECT str, file_row_number FROM '$__TEST_DIR__/file_row_1.vortex'; ----- -Hello 0 -Hi 1 - -query IT -SELECT file_row_number, str FROM '$__TEST_DIR__/file_row_2.vortex'; ----- -0 1 -1 2 -2 3 -3 45 - -query IT -SELECT file_row_number, str FROM '$__TEST_DIR__/file_row_2.vortex' -WHERE len(str) = 1; ----- -0 1 -1 2 -2 3 - -query TI -SELECT *, file_row_number FROM '$__TEST_DIR__/*.vortex' -ORDER BY str; ----- -1 0 -2 1 -3 2 -45 3 -Hello 0 -Hi 1 - -query I -SELECT file_row_number FROM '$__TEST_DIR__/*.vortex' -WHERE len(str) > 1 -ORDER BY file_row_number; ----- -0 -1 -3 diff --git a/vortex-sqllogictest/slt/duckdb/explain.slt b/vortex-sqllogictest/slt/explain.slt similarity index 91% rename from vortex-sqllogictest/slt/duckdb/explain.slt rename to vortex-sqllogictest/slt/explain.slt index 1917d9f0494..81619291bf6 100644 --- a/vortex-sqllogictest/slt/duckdb/explain.slt +++ b/vortex-sqllogictest/slt/explain.slt @@ -1,13 +1,15 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -include ../setup.slt +include ./setup.slt +onlyif duckdb query I COPY (SELECT * FROM (VALUES ('Hello'), ('Hi'), ('Hey')) AS t(str)) TO '$__TEST_DIR__/explain.vortex'; ---- 3 +onlyif duckdb query TT EXPLAIN (FORMAT json) SELECT * FROM '$__TEST_DIR__/explain.vortex'; ----