Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions vortex-duckdb/cpp/include/duckdb_vx/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,20 @@ typedef struct {
bool has_max_cardinality;
} duckdb_vx_node_statistics;

typedef enum {
HasMaxStringLength = 1 << 0,
HasNull = 1 << 1,
// Set only when both min and max are set and both of them are exact
// boundaries
MinMaxIsExact = 1 << 2
} ColumnStatisticsFlags;

typedef struct {
// Set only for strings and primitive types
duckdb_value min;
duckdb_value max;
// upper bit: "length is set". lower 32 bits: DuckDB's max string length.
// set only for strings
uint64_t max_string_length;
bool has_null;
} duckdb_column_statistics;
uint32_t max_string_length;
ColumnStatisticsFlags flags;
} duckdb_vx_column_statistics;

// vtable mimicking subset of TableFunction.
// See duckdb/include/function/tfunc.hpp
Expand Down Expand Up @@ -110,11 +115,6 @@ typedef struct {
duckdb_data_chunk data_chunk_out,
duckdb_vx_error *error_out);

bool (*statistics)(duckdb_client_context context,
const void *bind_data,
size_t column_index,
duckdb_column_statistics *stats_out);

void (*cardinality)(void *bind_data, duckdb_vx_node_statistics *node_stats_out);

bool (*pushdown_complex_filter)(void *bind_data, duckdb_vx_expr expr, duckdb_vx_error *error_out);
Expand All @@ -123,10 +123,18 @@ typedef struct {

double (*table_scan_progress)(duckdb_client_context ctx, void *bind_data, void *global_state);

idx_t (*get_partition_data)(const void *bind_data,
void *init_global_data,
void *init_local_data,
duckdb_vx_error *error_out);
// What file index are we currently exporting?
idx_t (*get_partition_data)(const void *init_local_data);

size_t (*get_partition_count)(const void *bind_data);
// false if any file metadata (=row counts) is missing
bool (*get_partition_row_counts)(const void *bind_data, uint64_t *data, size_t len);

// pass -1 to file_idx to get statistics for all files
bool (*get_column_stats)(const void *bind_data,
idx_t file_idx,
idx_t column_idx,
duckdb_vx_column_statistics *stats_out);
} duckdb_vx_tfunc_vtab_t;

// A single function for configuring the DuckDB table function vtable.
Expand Down
269 changes: 153 additions & 116 deletions vortex-duckdb/cpp/table_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,103 +89,6 @@ double c_table_scan_progress(ClientContext &context,
return bind.info->vtab.table_scan_progress(c_ctx, c_bind_data, c_global_state);
}

static Value &UnwrapValue(duckdb_value value) {
return *(reinterpret_cast<Value *>(value));
}

unique_ptr<BaseStatistics> numeric_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
NumericStats::SetMin(out, UnwrapValue(stats.min));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
NumericStats::SetMax(out, UnwrapValue(stats.max));
duckdb_destroy_value(&stats.max);
}
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}

unique_ptr<BaseStatistics> string_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min)));
duckdb_destroy_value(&stats.min);
}
if (stats.max) {
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
duckdb_destroy_value(&stats.max);
}
if (stats.max_string_length >> 63) {
StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length));
}
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}

return out.ToUnique();
}

unique_ptr<BaseStatistics> base_stats(duckdb_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (!stats.has_null) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}

unique_ptr<BaseStatistics>
c_statistics(ClientContext &context, const FunctionData *bind_data, column_t column_index) {
if (IsVirtualColumn(column_index)) {
return {};
}

const auto &bind = bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();

duckdb_client_context c_ctx = reinterpret_cast<duckdb_client_context>(&context);
duckdb_column_statistics statistics = {};
if (!bind.info->vtab.statistics(c_ctx, ffi_bind, column_index, &statistics)) {
return {};
}

const LogicalType type = bind.types[column_index];

switch (type.id()) {
case LogicalTypeId::BOOLEAN:
case LogicalTypeId::TINYINT:
case LogicalTypeId::SMALLINT:
case LogicalTypeId::INTEGER:
case LogicalTypeId::BIGINT:
case LogicalTypeId::FLOAT:
case LogicalTypeId::DOUBLE:
case LogicalTypeId::UTINYINT:
case LogicalTypeId::USMALLINT:
case LogicalTypeId::UINTEGER:
case LogicalTypeId::UBIGINT:
case LogicalTypeId::UHUGEINT:
case LogicalTypeId::HUGEINT: {
return numeric_stats(statistics, type);
}
case LogicalTypeId::VARCHAR:
case LogicalTypeId::BLOB: {
return string_stats(statistics, type);
}
case LogicalTypeId::STRUCT: {
// TODO(myrrc)
// Duckdb's has_null has a different semantics for structs.
// If we propagate our has_null, this breaks Duckdb optimizer.
// You can reproduce it in struct.slt test in vortex-sqllogictests:
return {};
}
default:
return base_stats(statistics, type);
}
}

unique_ptr<FunctionData> c_bind(ClientContext &context,
TableFunctionBindInput &input,
vector<LogicalType> &return_types,
Expand Down Expand Up @@ -340,28 +243,161 @@ extern "C" void duckdb_vx_tfunc_bind_result_add_column(duckdb_vx_tfunc_bind_resu
result->return_types.emplace_back(*logical_type);
}

OperatorPartitionData c_get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
if (input.partition_info.RequiresPartitionColumns()) {
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
static Value &UnwrapValue(duckdb_value value) {
return *(reinterpret_cast<Value *>(value));
}

using BaseStatisticsPtr = unique_ptr<BaseStatistics>;

BaseStatisticsPtr numeric_stats(duckdb_vx_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
NumericStats::SetMin(out, UnwrapValue(stats.min));
duckdb_destroy_value(&stats.min);
}
auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind.ffi_data->DataPtr();
void *const ffi_global = input.global_state->Cast<CTableGlobalData>().ffi_data->DataPtr();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
if (stats.max) {
NumericStats::SetMax(out, UnwrapValue(stats.max));
duckdb_destroy_value(&stats.max);
}
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}

duckdb_vx_error error_out = nullptr;
const idx_t batch_index = bind.info->vtab.get_partition_data(ffi_bind, ffi_global, ffi_local, &error_out);
if (error_out) {
throw InvalidInputException(IntoErrString(error_out));
BaseStatisticsPtr string_stats(duckdb_vx_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (stats.min) {
StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min)));
duckdb_destroy_value(&stats.min);
}
return OperatorPartitionData(batch_index);
if (stats.max) {
StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max)));
duckdb_destroy_value(&stats.max);
}
if (stats.flags & ColumnStatisticsFlags::HasMaxStringLength) {
StringStats::SetMaxStringLength(out, stats.max_string_length);
}
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}

BaseStatisticsPtr base_stats(duckdb_vx_column_statistics &stats, LogicalType type) {
BaseStatistics out = StringStats::CreateUnknown(type);
if (!(stats.flags & ColumnStatisticsFlags::HasNull)) {
out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES);
}
return out.ToUnique();
}

extern "C" void duckdb_vx_string_map_insert(duckdb_vx_string_map map, const char *key, const char *value) {
D_ASSERT(map);
D_ASSERT(key);
D_ASSERT(value);
reinterpret_cast<InsertionOrderPreservingMap<string> *>(map)->insert(key, value);
BaseStatisticsPtr to_duckdb_statistics(duckdb_vx_column_statistics &statistics, LogicalType type) {
switch (type.id()) {
case LogicalTypeId::BOOLEAN:
case LogicalTypeId::TINYINT:
case LogicalTypeId::SMALLINT:
case LogicalTypeId::INTEGER:
case LogicalTypeId::BIGINT:
case LogicalTypeId::FLOAT:
case LogicalTypeId::DOUBLE:
case LogicalTypeId::UTINYINT:
case LogicalTypeId::USMALLINT:
case LogicalTypeId::UINTEGER:
case LogicalTypeId::UBIGINT:
case LogicalTypeId::UHUGEINT:
case LogicalTypeId::HUGEINT: {
return numeric_stats(statistics, type);
}
case LogicalTypeId::VARCHAR:
case LogicalTypeId::BLOB: {
return string_stats(statistics, type);
}
case LogicalTypeId::STRUCT: {
// TODO(myrrc)
// Duckdb's has_null has a different semantics for structs.
// If we propagate our has_null, this breaks Duckdb optimizer.
// You can reproduce it in struct.slt test in vortex-sqllogictests:
return {};
}
default:
return base_stats(statistics, type);
}
}

struct VortexRowGroup final : PartitionRowGroup {
VortexRowGroup(const CTableBindData &bind_data, idx_t file_idx)
: bind_data(bind_data), file_idx(file_idx) {
}
~VortexRowGroup() override = default;

BaseStatisticsPtr GetColumnStatistics(const StorageIndex &storage_index) override {
const idx_t column_idx = storage_index.GetPrimaryIndex();
D_ASSERT(!IsVirtualColumn(column_idx));
D_ASSERT(column_idx < bind_data.types.size());

void *const bind_ffi = bind_data.ffi_data->DataPtr();
if (!bind_data.info->vtab.get_column_stats(bind_ffi, file_idx, column_idx, &statistics)) {
return {};
}
return to_duckdb_statistics(statistics, bind_data.types[column_idx]);
}

bool MinMaxIsExact(const BaseStatistics &, const StorageIndex &) override {
// This function is called after GetColumnStatistics with same storage
// index, thus we can reuse calculated statistics
// duckdb/src/optimizer/statistics/operator/propagate_aggregate.cpp:84
return statistics.flags & ColumnStatisticsFlags::MinMaxIsExact;
}

const CTableBindData &bind_data;
const idx_t file_idx;
duckdb_vx_column_statistics statistics;
};

vector<PartitionStatistics> get_partition_stats(ClientContext &, GetPartitionStatsInput &input) {
const CTableBindData &bind_data = input.bind_data->Cast<CTableBindData>();
void *const ffi_bind = bind_data.ffi_data->DataPtr();

const size_t partitions_len = bind_data.info->vtab.get_partition_count(ffi_bind);
vector<idx_t> row_counts(partitions_len);
if (!bind_data.info->vtab.get_partition_row_counts(ffi_bind, row_counts.data(), partitions_len)) {
return {};
}

idx_t offset = 0;
vector<PartitionStatistics> out(partitions_len);
for (idx_t i = 0; i < partitions_len; ++i) {
out[i].row_start = offset;
out[i].count = row_counts[i];
out[i].count_type = CountType::COUNT_EXACT;
out[i].partition_row_group = make_shared_ptr<VortexRowGroup>(bind_data, i);
offset += row_counts[i];
}
return out;
}

BaseStatisticsPtr statistics(ClientContext &, const FunctionData *bind_data, column_t column_idx) {
if (IsVirtualColumn(column_idx)) {
return {};
}
const auto &bind = bind_data->Cast<CTableBindData>();
duckdb_vx_column_statistics statistics;
void *const ffi_bind = bind.ffi_data->DataPtr();
if (!bind.info->vtab.get_column_stats(ffi_bind, -1, column_idx, &statistics)) {
return {};
}
return to_duckdb_statistics(statistics, bind.types[column_idx]);
}

OperatorPartitionData get_partition_data(ClientContext &, TableFunctionGetPartitionInput &input) {
if (input.partition_info.RequiresPartitionColumns()) {
throw InternalException("TableScan::GetPartitionData: partition columns not supported");
}
const auto &bind = input.bind_data->Cast<CTableBindData>();
void *const ffi_local = input.local_state->Cast<CTableLocalData>().ffi_data->DataPtr();
const idx_t batch_index = bind.info->vtab.get_partition_data(ffi_local);
return OperatorPartitionData(batch_index);
}

InsertionOrderPreservingMap<string> c_to_string(TableFunctionToStringInput &input) {
Expand Down Expand Up @@ -391,10 +427,11 @@ extern "C" duckdb_state duckdb_vx_tfunc_register(duckdb_database ffi_db, const d

tf.pushdown_complex_filter = c_pushdown_complex_filter;
tf.cardinality = c_cardinality;
tf.get_partition_data = c_get_partition_data;
tf.to_string = c_to_string;
tf.table_scan_progress = c_table_scan_progress;
tf.statistics = c_statistics;
tf.get_partition_data = get_partition_data;
tf.get_partition_stats = get_partition_stats;
tf.statistics = statistics;

tf.get_virtual_columns = [](auto &, auto) -> virtual_column_map_t {
return {{COLUMN_IDENTIFIER_EMPTY, TableColumn("", LogicalTypeId::BOOLEAN)}};
Expand Down
Loading
Loading