Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/duckdb-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ publish = false
[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
custom-labels = { workspace = true }
similar = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
40 changes: 29 additions & 11 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::path::PathBuf;

use clap::Parser;
use clap::value_parser;
use custom_labels::Labelset;
use duckdb_bench::DuckClient;
use tokio::runtime::Runtime;
use vortex::metrics::tracing::set_global_labels;
Expand All @@ -25,6 +26,27 @@ use vortex_bench::runner::SqlBenchmarkRunner;
use vortex_bench::runner::filter_queries;
use vortex_bench::setup_logging_and_tracing;

fn with_query_labels<T>(
benchmark_name: &str,
query_idx: usize,
format: Format,
f: impl FnOnce() -> T,
) -> T {
let labels = vec![
("format", format.to_string()),
("benchmark_name", benchmark_name.to_owned()),
("query_idx", query_idx.to_string()),
];
set_global_labels(labels.clone());

let mut labelset = Labelset::clone_from_current();
for (key, value) in labels {
labelset.set(key, value);
}

labelset.enter(f)
}

/// Common arguments shared across benchmarks
#[derive(Parser)]
struct Args {
Expand Down Expand Up @@ -171,17 +193,13 @@ fn main() -> anyhow::Result<()> {
Ok(ctx)
},
|ctx, query_idx, format, query| {
set_global_labels(vec![
("format", format.to_string()),
("benchmark_name", benchmark_name.clone()),
("query_idx", query_idx.to_string()),
]);

// Make sure to reopen the duckdb connection between iterations
if !args.reuse {
ctx.reopen()?;
}
ctx.execute_query_result(query)
with_query_labels(&benchmark_name, query_idx, format, || {
// Make sure to reopen the duckdb connection between iterations
if !args.reuse {
ctx.reopen()?;
}
ctx.execute_query_result(query)
})
},
)?;

Expand Down
34 changes: 20 additions & 14 deletions vortex-duckdb/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ pub struct DataSourceLocal {
batch_id: Option<u64>,
}

fn sync_current_labels_from_global() {
unsafe {
use custom_labels::sys;

if sys::current().is_null() {
let ls = sys::new(0);
sys::replace(ls);
};
}

for (key, value) in get_global_labels() {
CURRENT_LABELSET.set(key, value);
}
}

/// Returns scan progress as a percentage (0.0–100.0).
fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 {
let read = bytes_read.load(Ordering::Relaxed);
Expand Down Expand Up @@ -364,20 +379,7 @@ impl<T: DataSourceTableFunction> TableFunction for T {
_init: &TableInitInput<Self>,
global: &Self::GlobalState,
) -> VortexResult<Self::LocalState> {
unsafe {
use custom_labels::sys;

if sys::current().is_null() {
let ls = sys::new(0);
sys::replace(ls);
};
}

let global_labels = get_global_labels();

for (key, value) in global_labels {
CURRENT_LABELSET.set(key, value);
}
sync_current_labels_from_global();

Ok(DataSourceLocal {
iterator: global.iterator.clone(),
Expand All @@ -393,6 +395,10 @@ impl<T: DataSourceTableFunction> TableFunction for T {
global_state: &Self::GlobalState,
chunk: &mut DataChunkRef,
) -> VortexResult<()> {
// DuckDB can reuse a thread-local state across multiple benchmark queries, so refresh
// the thread-local labels on every callback to keep `format`/`query_idx` current.
sync_current_labels_from_global();

loop {
if local_state.exporter.is_none() {
let mut ctx = SESSION.create_execution_ctx();
Expand Down
Loading