From fc91ec82914d700dd97f649adc3c388a955259d3 Mon Sep 17 00:00:00 2001 From: Alex Bianchi Date: Mon, 27 Apr 2026 12:20:41 -0400 Subject: [PATCH] Fix CI workflow + accept layered Router's --- .github/workflows/datafusion-ci.yml | 2 + .../src/datafusion_api/setup.rs | 63 ++++++++++--------- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/.github/workflows/datafusion-ci.yml b/.github/workflows/datafusion-ci.yml index 14b8f8b8bd0..0600f1d77fb 100644 --- a/.github/workflows/datafusion-ci.yml +++ b/.github/workflows/datafusion-ci.yml @@ -14,6 +14,7 @@ on: permissions: contents: read + pull-requests: read env: CARGO_INCREMENTAL: 0 @@ -32,6 +33,7 @@ jobs: timeout-minutes: 60 permissions: contents: read + pull-requests: read actions: write steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 diff --git a/quickwit/quickwit-serve/src/datafusion_api/setup.rs b/quickwit/quickwit-serve/src/datafusion_api/setup.rs index 30bc5689845..211efe4b547 100644 --- a/quickwit/quickwit-serve/src/datafusion_api/setup.rs +++ b/quickwit/quickwit-serve/src/datafusion_api/setup.rs @@ -183,26 +183,37 @@ fn insert_datafusion_worker( /// Adapter that appends the DataFusion query and worker gRPC services to the /// tonic `Router`. Construct via [`build_datafusion_mount`]; [`grpc.rs`] calls /// [`Self::apply`] on the assembled router. -/// -/// Hiding the closure behind a struct avoids exposing the -/// `WorkerServiceServer` type publicly (it comes from a -/// `pub(crate)` module inside `datafusion-distributed`). pub(crate) struct DataFusionMount { - router_mod: Option Router + Send>>, + session_builder: Option>, + max_message_size_bytes: usize, } impl DataFusionMount { /// Returns a no-op mount — used when `services.datafusion_session_builder` /// is `None` (startup toggle off). fn noop() -> Self { - Self { router_mod: None } + Self { + session_builder: None, + max_message_size_bytes: 0, + } } - pub fn apply(self, router: Router) -> Router { - match self.router_mod { - Some(f) => f(router), - None => router, - } + pub fn apply(self, router: Router) -> Router { + let Some(session_builder) = self.session_builder else { + return router; + }; + + let query_server = DataFusionServiceServer::new(DataFusionServiceGrpcImpl::new( + DataFusionService::new(Arc::clone(&session_builder)), + )) + .max_decoding_message_size(self.max_message_size_bytes) + .max_encoding_message_size(self.max_message_size_bytes); + + let worker = build_worker(session_builder); + + router + .add_service(query_server) + .add_service(worker.into_worker_server()) } } @@ -225,34 +236,28 @@ pub(crate) fn build_datafusion_mount( enabled.insert("datafusion-worker"); file_descriptor_sets.push(quickwit_datafusion::proto::DATAFUSION_FILE_DESCRIPTOR_SET); - let max_size = max_message_size_bytes; - let session_builder = Arc::clone(session_builder); - - let router_mod = Box::new(move |router: Router| { - let query_server = DataFusionServiceServer::new(DataFusionServiceGrpcImpl::new( - DataFusionService::new(Arc::clone(&session_builder)), - )) - .max_decoding_message_size(max_size) - .max_encoding_message_size(max_size); - - let worker = build_worker(Arc::clone(&session_builder)); - - router - .add_service(query_server) - .add_service(worker.into_worker_server()) - }); - DataFusionMount { - router_mod: Some(router_mod), + session_builder: Some(Arc::clone(session_builder)), + max_message_size_bytes, } } #[cfg(test)] mod tests { use quickwit_proto::ingest::ingester::IngesterStatus; + use tower::layer::util::Identity; use super::*; + #[test] + fn datafusion_mount_accepts_layered_tonic_router() { + let (_health_reporter, health_service) = tonic_health::server::health_reporter(); + let mut server = tonic::transport::Server::builder().layer(Identity::new()); + let router = server.add_service(health_service); + + let _router = DataFusionMount::noop().apply(router); + } + #[tokio::test] async fn datafusion_worker_changes_ignore_non_searcher_adds() { let node = ClusterNode::for_test(