Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@
#include "error.h"
#include "table_filter.h"
#include "duckdb_vx/data.h"
#include <stdint.h>

#ifdef __cplusplus
static_assert(sizeof(idx_t) == 8);
#endif

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -88,16 +83,6 @@ typedef struct {
bool has_null;
} duckdb_column_statistics;

const idx_t INVALID_IDX = UINT64_MAX;

typedef struct {
idx_t partition_index;
// Either INVALID_IDX or position of column in output for file_index column
size_t file_index_column_pos;
// File index for the exported partition.
size_t file_index;
} duckdb_vx_partition_data;

// vtable mimicking subset of TableFunction.
// See duckdb/include/function/tfunc.hpp
typedef struct {
Expand Down Expand Up @@ -138,10 +123,10 @@ typedef struct {

double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);

void (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_partition_data *partition_data_out);
idx_t (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_error *error_out);
} duckdb_vx_tfunc_vtab_t;

// A single function for configuring the DuckDB table function vtable.
Expand Down
71 changes: 15 additions & 56 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ DUCKDB_INCLUDES_BEGIN
#include "duckdb.h"
#include "duckdb/catalog/catalog.hpp"
#include "duckdb/common/insertion_order_preserving_map.hpp"
#include "duckdb/common/multi_file/multi_file_reader.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
Expand All @@ -20,8 +19,6 @@ DUCKDB_INCLUDES_END
using namespace duckdb;
using vortex::CData;
using vortex::IntoErrString;
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;

struct CTableFunctionInfo final : TableFunctionInfo {
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
Expand Down Expand Up @@ -337,50 +334,21 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
result.return_types.emplace_back(logical_type);
}

/**
* Called at planning time to determine whether data is partitioned by a
* given set of columns. Requested columns are GROUP BY parameters i.e. columns
* over which the query aggregates.
*/
TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInput &input) {
const vector<column_t> &ids = input.partition_ids;
// Our data is partitioned by array exporters. Each exporter processes a
// single Array which belongs to a single file. If data is partitioned only
// by file_index, there is one unique value for an Array. Otherwise there
// may be multiple values.
return (ids.size() == 1 && ids[0] == COLUMN_IDENTIFIER_FILE_INDEX)
? TablePartitionInfo::SINGLE_VALUE_PARTITIONS
: TablePartitionInfo::NOT_PARTITIONED;
}

/**
* Duckdb requests this function after exporting the chunk. We answer with
* partition_index we have exported as well as information about constant
* columns in this partition. As data is partitioned by array exporters, in
* each partition ~ exported array file_index is constant.
*/
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
if (input.partition_info.RequiresPartitionColumns()) {
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
}
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
duckdb_vx_partition_data partition_data;
bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &partition_data);

OperatorPartitionData out(partition_data.partition_index);

// file_index_column_pos may be INVALID_IDX, but column_index will never
// be INVALID_IDX, so we can compare directly
for (const column_t column_index : input.partition_info.partition_columns) {
if (column_index == partition_data.file_index_column_pos) {
out.partition_data.emplace_back(Value::UBIGINT(partition_data.file_index));
} else {
throw InternalException(StringUtil::Format(
"get_partition_data: requested column_index %d is not constant for given partition",
column_index));
}

duckdb_vx_error error_out = nullptr;
const idx_t batch_index = bind.info.vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
}
return out;
return OperatorPartitionData(batch_index);
}

extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
Expand Down Expand Up @@ -409,30 +377,21 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d

tf.projection_pushdown = true;
tf.filter_pushdown = true;
// We can prune out filter columns that are unused in the remainder of the query plan.
// e.g. in "SELECT i FROM tbl WHERE j = 42" j does not leave Vortex table function.
tf.filter_prune = true;
tf.sampling_pushdown = false;
tf.late_materialization = false;

tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
tf.get_partition_info = get_partition_info;
tf.get_partition_data = get_partition_data;
tf.get_partition_data = c_get_partition_data;
tf.to_string = c_to_string;
tf.table_scan_progress = c_table_scan_progress;
tf.statistics = c_statistics;

tf.late_materialization = true;
// Columns that uniquely identify a row for deferred re-fetch in a multi
// file scan: (file index, row number in file).
tf.get_row_id_columns = [](auto &, auto) -> vector<column_t> {
return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER};
};

tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
return {
{COLUMN_IDENTIFIER_EMPTY, {"", LogicalTypeId::BOOLEAN}},
{COLUMN_IDENTIFIER_FILE_INDEX, {"file_index", LogicalType::UBIGINT}},
{COLUMN_IDENTIFIER_FILE_ROW_NUMBER, {"file_row_number", LogicalType::BIGINT}},
};
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
};

tf.arguments.resize(vtab->parameter_count);
Expand Down
1 change: 0 additions & 1 deletion vortex-duckdb/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ pub use dtype::from_duckdb_table;
pub use expr::try_from_bound_expression;
pub use scalar::*;
pub use table_filter::try_from_table_filter;
pub use table_filter::try_from_virtual_column_filter;
pub use vector::data_chunk_to_vortex;
99 changes: 0 additions & 99 deletions vortex-duckdb/src/convert/table_filter.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex::buffer::Buffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::error::vortex_err;
use vortex::expr::Expression;
use vortex::expr::and_collect;
use vortex::expr::get_item;
Expand All @@ -24,13 +21,10 @@ use vortex::scalar::Scalar;
use vortex::scalar_fn::ScalarFnVTableExt;
use vortex::scalar_fn::fns::binary::Binary;
use vortex::scalar_fn::fns::operators::CompareOperator;
use vortex::scan::selection::Selection;

use crate::cpp::DUCKDB_VX_EXPR_TYPE;
use crate::duckdb::ExtractedValue;
use crate::duckdb::TableFilterClass;
use crate::duckdb::TableFilterRef;
use crate::duckdb::ValueRef;

pub fn try_from_table_filter(
value: &TableFilterRef,
Expand Down Expand Up @@ -131,96 +125,3 @@ pub fn try_from_table_filter(
}
}))
}

fn nonnegative_number_from_value(value: &ValueRef) -> VortexResult<u64> {
match value.extract() {
ExtractedValue::BigInt(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::Integer(i) => {
u64::try_from(i).map_err(|_| vortex_err!("negative value: {i}"))
}
ExtractedValue::UBigInt(u) => Ok(u),
ExtractedValue::UInteger(u) => Ok(u64::from(u)),
_ => vortex_bail!("unexpected value type"),
}
}

fn intersect_sorted(left: &[u64], right: &[u64]) -> Vec<u64> {
let mut result = Vec::new();
let (mut i, mut j) = (0, 0);
while i < left.len() && j < right.len() {
match left[i].cmp(&right[j]) {
std::cmp::Ordering::Equal => {
result.push(left[i]);
i += 1;
j += 1;
}
std::cmp::Ordering::Less => i += 1,
std::cmp::Ordering::Greater => j += 1,
}
}
result
}

/// For constant comparison on IN filters over file_index or file_row_number
/// virtual column, create a selection and a range covering the same range as
/// expressions do.
pub fn try_from_virtual_column_filter(
filter: &TableFilterRef,
) -> VortexResult<(Selection, Option<Range<u64>>)> {
match filter.as_class() {
TableFilterClass::InFilter(values) => {
let indices = values
.iter()
.map(nonnegative_number_from_value)
.collect::<VortexResult<Vec<u64>>>()?;
Ok((Selection::IncludeByIndex(Buffer::from_iter(indices)), None))
}
TableFilterClass::ConstantComparison(const_) => {
let n = nonnegative_number_from_value(const_.value)?;
let range = match const_.operator {
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_EQUAL => Some(n..n + 1),
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHANOREQUALTO => {
Some(n..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_GREATERTHAN => {
Some(n.saturating_add(1)..u64::MAX)
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHANOREQUALTO => {
Some(0..n.saturating_add(1))
}
DUCKDB_VX_EXPR_TYPE::DUCKDB_VX_EXPR_TYPE_COMPARE_LESSTHAN => Some(0..n),
_ => None,
};
Ok((Selection::All, range))
}
TableFilterClass::ConjunctionAnd(conj) => {
let mut start = 0u64;
let mut end = u64::MAX;
let mut indices: Option<Vec<u64>> = None;
for child in conj.children() {
let (sel, range) = try_from_virtual_column_filter(child)?;
if let Selection::IncludeByIndex(buf) = sel {
indices = Some(match indices {
None => buf.iter().copied().collect(),
Some(existing) => intersect_sorted(&existing, buf.as_ref()),
});
}
if let Some(r) = range {
start = start.max(r.start);
end = end.min(r.end);
}
}
let range = (start < end).then_some(start..end);
let sel = indices
.map(|v| Selection::IncludeByIndex(Buffer::from_iter(v)))
.unwrap_or(Selection::All);
Ok((sel, range))
}
TableFilterClass::Optional(child) => {
try_from_virtual_column_filter(child).or_else(|_| Ok((Selection::All, None)))
}
_ => Ok((Selection::All, None)),
}
}
Loading
Loading