From 46a3b2a7c88eae591d4869ed006c2ce1d38ed8a6 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 24 Apr 2026 11:55:41 +0100 Subject: [PATCH] Fix custom-labels for DuckDB Signed-off-by: Adam Gutglick --- Cargo.lock | 1 + benchmarks/duckdb-bench/Cargo.toml | 1 + benchmarks/duckdb-bench/src/main.rs | 40 +++++++++++++++++++++-------- vortex-duckdb/src/datasource.rs | 34 ++++++++++++++---------- 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 069aa68ebab..14cff1264a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3642,6 +3642,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "custom-labels", "similar", "tokio", "tracing", diff --git a/benchmarks/duckdb-bench/Cargo.toml b/benchmarks/duckdb-bench/Cargo.toml index b70375cf963..7d499250ca0 100644 --- a/benchmarks/duckdb-bench/Cargo.toml +++ b/benchmarks/duckdb-bench/Cargo.toml @@ -17,6 +17,7 @@ publish = false [dependencies] anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } +custom-labels = { workspace = true } similar = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 7ab8f1ac7ab..d5d5fbea5b4 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use clap::Parser; use clap::value_parser; +use custom_labels::Labelset; use duckdb_bench::DuckClient; use tokio::runtime::Runtime; use vortex::metrics::tracing::set_global_labels; @@ -25,6 +26,27 @@ use vortex_bench::runner::SqlBenchmarkRunner; use vortex_bench::runner::filter_queries; use vortex_bench::setup_logging_and_tracing; +fn with_query_labels( + benchmark_name: &str, + query_idx: usize, + format: Format, + f: impl FnOnce() -> T, +) -> T { + let labels = vec![ + ("format", format.to_string()), + ("benchmark_name", benchmark_name.to_owned()), + ("query_idx", query_idx.to_string()), + ]; + set_global_labels(labels.clone()); + + let mut labelset = Labelset::clone_from_current(); + for (key, value) in labels { + labelset.set(key, value); + } + + labelset.enter(f) +} + /// Common arguments shared across benchmarks #[derive(Parser)] struct Args { @@ -171,17 +193,13 @@ fn main() -> anyhow::Result<()> { Ok(ctx) }, |ctx, query_idx, format, query| { - set_global_labels(vec![ - ("format", format.to_string()), - ("benchmark_name", benchmark_name.clone()), - ("query_idx", query_idx.to_string()), - ]); - - // Make sure to reopen the duckdb connection between iterations - if !args.reuse { - ctx.reopen()?; - } - ctx.execute_query_result(query) + with_query_labels(&benchmark_name, query_idx, format, || { + // Make sure to reopen the duckdb connection between iterations + if !args.reuse { + ctx.reopen()?; + } + ctx.execute_query_result(query) + }) }, )?; diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index c59fe6cbc2b..c744e9c49b8 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -158,6 +158,21 @@ pub struct DataSourceLocal { batch_id: Option, } +fn sync_current_labels_from_global() { + unsafe { + use custom_labels::sys; + + if sys::current().is_null() { + let ls = sys::new(0); + sys::replace(ls); + }; + } + + for (key, value) in get_global_labels() { + CURRENT_LABELSET.set(key, value); + } +} + /// Returns scan progress as a percentage (0.0–100.0). fn progress(bytes_read: &AtomicU64, bytes_total: &AtomicU64) -> f64 { let read = bytes_read.load(Ordering::Relaxed); @@ -364,20 +379,7 @@ impl TableFunction for T { _init: &TableInitInput, global: &Self::GlobalState, ) -> VortexResult { - unsafe { - use custom_labels::sys; - - if sys::current().is_null() { - let ls = sys::new(0); - sys::replace(ls); - }; - } - - let global_labels = get_global_labels(); - - for (key, value) in global_labels { - CURRENT_LABELSET.set(key, value); - } + sync_current_labels_from_global(); Ok(DataSourceLocal { iterator: global.iterator.clone(), @@ -393,6 +395,10 @@ impl TableFunction for T { global_state: &Self::GlobalState, chunk: &mut DataChunkRef, ) -> VortexResult<()> { + // DuckDB can reuse a thread-local state across multiple benchmark queries, so refresh + // the thread-local labels on every callback to keep `format`/`query_idx` current. + sync_current_labels_from_global(); + loop { if local_state.exporter.is_none() { let mut ctx = SESSION.create_execution_ctx();