Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-duckdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ num-traits = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
paste = { workspace = true }
static_assertions = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions vortex-duckdb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ fn cpp(duckdb_include_dir: &Path) {
.flags(["-Wall", "-Wextra", "-Wpedantic"])
.cpp(true)
.include(duckdb_include_dir)
.include("include")
.include("cpp/include")
.files(SOURCE_FILES)
.compile("vortex-duckdb-extras");
Expand Down
4 changes: 4 additions & 0 deletions vortex-duckdb/cbindgen.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
language = "C"
cpp_compat = true

header = """
// SPDX-License-Identifier: Apache-2.0
Expand All @@ -23,3 +24,6 @@ trailer = """

// clang-format on
"""

[fn]
prefix = "extern"
2 changes: 1 addition & 1 deletion vortex-duckdb/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ else()
)
endif()

include_directories(include ${DUCKDB_INCLUDE})
include_directories(include ${DUCKDB_INCLUDE} ../include)

# Auto-discover C++ source files
file(GLOB CPP_SOURCES "*.cpp")
Expand Down
55 changes: 4 additions & 51 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

/**
* We redefine a C API for DuckDB Table Functions in order to expose the full functionality of the C++ API.
*
* Since this C API has no stability requirements (it's versioned lock-step with the Rust bindings), we can
* take a transparent vtable struct to populate the C++ Table Function vtable.
*/
#pragma once

#include "error.h"
#include "table_filter.h"
#include "duckdb_vx/data.h"
#include "duckdb_vx/error.h"
#include "duckdb_vx/expr.h"
#include "table_filter.h"
#include <stdint.h>

#ifdef __cplusplus
Expand Down Expand Up @@ -73,9 +67,7 @@ typedef struct {
// Result data returned from the cardinality callback.
typedef struct {
idx_t estimated_cardinality;
idx_t max_cardinality;
bool has_estimated_cardinality;
bool has_max_cardinality;
} duckdb_vx_node_statistics;

typedef struct {
Expand All @@ -98,46 +90,7 @@ typedef struct {
size_t file_index;
} duckdb_vx_partition_data;

// vtable mimicking subset of TableFunction.
// See duckdb/include/function/tfunc.hpp
typedef struct {
const char *name;
const duckdb_logical_type *parameters;
size_t parameter_count;

duckdb_vx_data (*bind)(duckdb_client_context ctx,
duckdb_vx_tfunc_bind_input input,
duckdb_vx_tfunc_bind_result result,
duckdb_vx_error *error_out);

duckdb_vx_data (*bind_data_clone)(const void *bind_data, duckdb_vx_error *error_out);

duckdb_vx_data (*init_global)(const duckdb_vx_tfunc_init_input *input, duckdb_vx_error *error_out);

duckdb_vx_data (*init_local)(void *init_global_data);

void (*function)(void *init_global_data,
void *init_local_data,
duckdb_data_chunk data_chunk_out,
duckdb_vx_error *error_out);

bool (*statistics)(const void *bind_data, size_t column_index, duckdb_column_statistics *stats_out);

void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out);

bool (*pushdown_complex_filter)(void *bind_data, duckdb_vx_expr expr, duckdb_vx_error *error_out);

void (*to_string)(void *bind_data, duckdb_vx_string_map map);

double (*table_scan_progress)(void *global_state);

void (*get_partition_data)(void *init_global_data,
void *init_local_data,
duckdb_vx_partition_data *partition_data_out);
} duckdb_vx_tfunc_vtab_t;

// A single function for configuring the DuckDB table function vtable.
duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const duckdb_vx_tfunc_vtab_t *vtab);
duckdb_state duckdb_vx_register_table_functions(duckdb_database ffi_db);

#ifdef __cplusplus
}
Expand Down
111 changes: 45 additions & 66 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "duckdb_vx/duckdb_diagnostics.h"
#include "duckdb_vx/error.hpp"
#include "duckdb_vx/table_function.h"
#include "vortex.h"

DUCKDB_INCLUDES_BEGIN
#include "duckdb.h"
Expand All @@ -15,58 +16,42 @@ DUCKDB_INCLUDES_BEGIN
#include "duckdb/main/capi/capi_internal.hpp"
#include "duckdb/main/connection.hpp"
#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
#include "duckdb/planner/expression/bound_operator_expression.hpp"
#include "duckdb/planner/expression/bound_comparison_expression.hpp"
#include "duckdb/planner/expression/bound_between_expression.hpp"
#include "duckdb/planner/expression/bound_conjunction_expression.hpp"
#include "duckdb/planner/expression/bound_function_expression.hpp"
DUCKDB_INCLUDES_END

using namespace std::string_literals;
using namespace duckdb;
using vortex::CData;
using vortex::IntoErrString;
constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX;
constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER;

struct CTableFunctionInfo final : TableFunctionInfo {
explicit CTableFunctionInfo(const duckdb_vx_tfunc_vtab_t &vtab) : vtab(vtab) {
}

const duckdb_vx_tfunc_vtab_t vtab;
};

struct CTableBindData final : FunctionData {
CTableBindData(const CTableFunctionInfo &info,
unique_ptr<CData> ffi_data_p,
const vector<LogicalType> &types)
: info(info), ffi_data(std::move(ffi_data_p)), types(types) {
CTableBindData(unique_ptr<CData> ffi_data_p, const vector<LogicalType> &types)
: ffi_data(std::move(ffi_data_p)), types(types) {
}
unique_ptr<FunctionData> Copy() const override;
bool Equals(const FunctionData &other_base) const override;

// Table function info lives for as long as TableFunction is alive as it's
// stored inside TableFunction, so it's safe to store a reference.
const CTableFunctionInfo &info;
unique_ptr<CData> ffi_data;
vector<LogicalType> types;
};

unique_ptr<FunctionData> CTableBindData::Copy() const {
duckdb_vx_error error_out = nullptr;
const auto copied_ffi_data = info.vtab.bind_data_clone(ffi_data->DataPtr(), &error_out);
const auto copied_ffi_data = duckdb_table_function_bind_data_clone(ffi_data->DataPtr(), &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}

auto ffi_data_p = unique_ptr<CData>(reinterpret_cast<CData *>(copied_ffi_data));
return make_uniq<CTableBindData>(info, std::move(ffi_data_p), types);
return make_uniq<CTableBindData>(std::move(ffi_data_p), types);
}

bool CTableBindData::Equals(const FunctionData &other_base) const {
const CTableBindData &other = other_base.Cast<CTableBindData>();
// if "types" are different, "ffi_data" would also be different as it
// contains types inside, so omit "types" from comparison.
return &info == &other.info && ffi_data.get() == other.ffi_data.get();
return ffi_data.get() == other.ffi_data.get();
}

struct CTableGlobalData final : GlobalTableFunctionState {
Expand All @@ -87,12 +72,10 @@ struct CTableLocalData final : LocalTableFunctionState {
unique_ptr<CData> ffi_data;
};

double table_scan_progress(ClientContext &,
const FunctionData *bind_data,
const GlobalTableFunctionState *global_state) {
auto &bind = bind_data->Cast<CTableBindData>();
double
table_scan_progress(ClientContext &, const FunctionData *, const GlobalTableFunctionState *global_state) {
void *const c_global_state = global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
return bind.info.vtab.table_scan_progress(c_global_state);
return duckdb_table_function_scan_progress(c_global_state);
}

static Value &UnwrapValue(duckdb_value value) {
Expand Down Expand Up @@ -152,7 +135,7 @@ unique_ptr<BaseStatistics> statistics(ClientContext &, const FunctionData *bind_
void *const ffi_bind = bind.ffi_data->DataPtr();

duckdb_column_statistics statistics = {};
if (!bind.info.vtab.statistics(ffi_bind, column_index, &statistics)) {
if (!duckdb_table_function_statistics(ffi_bind, column_index, &statistics)) {
return {};
}

Expand Down Expand Up @@ -204,21 +187,20 @@ unique_ptr<FunctionData> c_bind(ClientContext &context,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
vector<string> &names) {
const auto &info = input.table_function.function_info->Cast<CTableFunctionInfo>();
CTableBindResult result = {return_types, names};

duckdb_vx_error error_out = nullptr;
auto ctx = reinterpret_cast<duckdb_client_context>(&context);
auto ffi_bind_data = info.vtab.bind(ctx,
reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
reinterpret_cast<duckdb_vx_tfunc_bind_result>(&result),
&error_out);
auto ffi_bind_data = duckdb_table_function_bind(ctx,
reinterpret_cast<duckdb_vx_tfunc_bind_input>(&input),
reinterpret_cast<duckdb_vx_tfunc_bind_result>(&result),
&error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}

auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data));
return make_uniq<CTableBindData>(info, std::move(cdata), return_types);
return make_uniq<CTableBindData>(std::move(cdata), return_types);
}

unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, TableFunctionInitInput &input) {
Expand All @@ -235,7 +217,7 @@ unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, Table
};

duckdb_vx_error error_out = nullptr;
duckdb_vx_data ffi_global_data = bind.info.vtab.init_global(&ffi_input, &error_out);
duckdb_vx_data ffi_global_data = duckdb_table_function_init_global(&ffi_input, &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
Expand All @@ -245,24 +227,21 @@ unique_ptr<GlobalTableFunctionState> c_init_global(ClientContext &context, Table
}

unique_ptr<LocalTableFunctionState>
init_local(ExecutionContext &, TableFunctionInitInput &input, GlobalTableFunctionState *global_state) {
const auto &bind = input.bind_data->Cast<CTableBindData>();
init_local(ExecutionContext &, TableFunctionInitInput &, GlobalTableFunctionState *global_state) {
void *const ffi_global = global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();

duckdb_vx_data ffi_local_data = bind.info.vtab.init_local(ffi_global);
duckdb_vx_data ffi_local_data = duckdb_table_function_init_local(ffi_global);
auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_local_data));
return make_uniq<CTableLocalData>(std::move(cdata));
}

void function(ClientContext &, TableFunctionInput &input, DataChunk &output) {
const auto &bind = input.bind_data->Cast<CTableBindData>();

void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();

duckdb_data_chunk chunk = reinterpret_cast<duckdb_data_chunk>(&output);
duckdb_vx_error error_out = nullptr;
bind.info.vtab.function(ffi_global, ffi_local, chunk, &error_out);
duckdb_table_function_scan(ffi_global, ffi_local, chunk, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
}
Expand Down Expand Up @@ -290,7 +269,7 @@ void pushdown_complex_filter(const FunctionData &bind_data, FilterVec &filters)
for (auto iter = filters.begin(); iter != filters.end();) {
duckdb_vx_expr ffi_expr = reinterpret_cast<duckdb_vx_expr>(iter->get());

const bool pushed = bind.info.vtab.pushdown_complex_filter(ffi_bind, ffi_expr, &error_out);
const bool pushed = duckdb_table_function_pushdown_complex_filter(ffi_bind, ffi_expr, &error_out);
if (error_out) {
throw BinderException(IntoErrString(error_out));
}
Expand All @@ -302,13 +281,12 @@ unique_ptr<NodeStatistics> c_cardinality(ClientContext &, const FunctionData *bi
auto &bind = bind_data->Cast<CTableBindData>();

duckdb_vx_node_statistics stats = {};
bind.info.vtab.cardinality(bind.ffi_data->DataPtr(), &stats);
duckdb_table_function_cardinality(bind.ffi_data->DataPtr(), &stats);

auto out = make_uniq<NodeStatistics>();
out->has_estimated_cardinality = stats.has_estimated_cardinality;
out->estimated_cardinality = stats.estimated_cardinality;
out->has_max_cardinality = stats.has_max_cardinality;
out->max_cardinality = stats.max_cardinality;
out->has_max_cardinality = false;

return out;
}
Expand Down Expand Up @@ -357,11 +335,10 @@ TablePartitionInfo get_partition_info(ClientContext &, TableFunctionPartitionInp
* each partition ~ exported array file_index is constant.
*/
OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
duckdb_vx_partition_data partition_data;
bind.info.vtab.get_partition_data(ffi_global, ffi_local, &partition_data);
duckdb_table_function_get_partition_data(ffi_global, ffi_local, &partition_data);

OperatorPartitionData out(partition_data.partition_index);

Expand Down Expand Up @@ -390,29 +367,20 @@ InsertionOrderPreservingMap<string> c_to_string(TableFunctionToStringInput &inpu
InsertionOrderPreservingMap<string> result;
duckdb_vx_string_map ffi_map = reinterpret_cast<duckdb_vx_string_map>(&result);
void *const ffi_bind = input.bind_data->Cast<CTableBindData>().ffi_data->DataPtr();
const auto &info = static_cast<CTableFunctionInfo &>(*input.table_function.function_info);
info.vtab.to_string(ffi_bind, ffi_map);
duckdb_table_function_to_string(ffi_bind, ffi_map);
return result;
}

// pushdown_expression misses FunctionData so we can't place it in vtab
extern "C" bool duckdb_vx_pushdown_expression(duckdb_vx_expr expr);

extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const duckdb_vx_tfunc_vtab_t *vtab) {
D_ASSERT(ffi_db);
D_ASSERT(vtab);

const DatabaseWrapper &wrapper = *reinterpret_cast<DatabaseWrapper *>(ffi_db);
DatabaseInstance &db = *wrapper.database->instance;
TableFunction tf(vtab->name, {}, function, c_bind, c_init_global, init_local);
duckdb_state register_table_function(DatabaseInstance &db, LogicalType parameter, const std::string &name) {
TableFunction tf(name, {}, function, c_bind, c_init_global, init_local);

tf.projection_pushdown = true;
tf.filter_pushdown = true;
tf.filter_prune = true;
tf.sampling_pushdown = false;

tf.pushdown_expression = [](auto &, const auto &, Expression &expression) {
return duckdb_vx_pushdown_expression(reinterpret_cast<duckdb_vx_expr>(&expression));
return duckdb_table_function_pushdown_expression(reinterpret_cast<duckdb_vx_expr>(&expression));
};
tf.pushdown_complex_filter = [](auto &, auto &, FunctionData *bind_data, FilterVec &filters) {
pushdown_complex_filter(*bind_data, filters);
Expand Down Expand Up @@ -442,12 +410,8 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
};
};

tf.arguments.resize(vtab->parameter_count);
for (size_t i = 0; i < vtab->parameter_count; i++) {
tf.arguments[i] = *reinterpret_cast<LogicalType *>(vtab->parameters[i]);
}

tf.function_info = make_shared_ptr<CTableFunctionInfo>(*vtab);
tf.arguments.resize(1);
tf.arguments[0] = parameter;

try {
auto &system_catalog = Catalog::GetSystemCatalog(db);
Expand All @@ -462,3 +426,18 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d
}
return DuckDBSuccess;
}

extern "C" duckdb_state duckdb_vx_register_table_functions(duckdb_database ffi_db) {
D_ASSERT(ffi_db);
const DatabaseWrapper &wrapper = *reinterpret_cast<DatabaseWrapper *>(ffi_db);
DatabaseInstance &db = *wrapper.database->instance;

for (LogicalType type : {LogicalType(LogicalType::VARCHAR), LogicalType::LIST(LogicalType::VARCHAR)}) {
for (const std::string &name : {"read_vortex"s, "vortex_scan"s}) {
if (register_table_function(db, type, name) == DuckDBError) {
return DuckDBError;
}
}
}
return DuckDBSuccess;
}
Loading
Loading