Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/datafusion-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:

permissions:
contents: read
pull-requests: read

env:
CARGO_INCREMENTAL: 0
Expand All @@ -32,6 +33,7 @@ jobs:
timeout-minutes: 60
permissions:
contents: read
pull-requests: read
actions: write
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
Expand Down
63 changes: 34 additions & 29 deletions quickwit/quickwit-serve/src/datafusion_api/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,37 @@ fn insert_datafusion_worker(
/// Adapter that appends the DataFusion query and worker gRPC services to the
/// tonic `Router`. Construct via [`build_datafusion_mount`]; [`grpc.rs`] calls
/// [`Self::apply`] on the assembled router.
///
/// Hiding the closure behind a struct avoids exposing the
/// `WorkerServiceServer<Worker>` type publicly (it comes from a
/// `pub(crate)` module inside `datafusion-distributed`).
pub(crate) struct DataFusionMount {
router_mod: Option<Box<dyn FnOnce(Router) -> Router + Send>>,
session_builder: Option<Arc<DataFusionSessionBuilder>>,
max_message_size_bytes: usize,
}

impl DataFusionMount {
/// Returns a no-op mount — used when `services.datafusion_session_builder`
/// is `None` (startup toggle off).
fn noop() -> Self {
Self { router_mod: None }
Self {
session_builder: None,
max_message_size_bytes: 0,
}
}

pub fn apply(self, router: Router) -> Router {
match self.router_mod {
Some(f) => f(router),
None => router,
}
pub fn apply<L>(self, router: Router<L>) -> Router<L> {
let Some(session_builder) = self.session_builder else {
return router;
};

let query_server = DataFusionServiceServer::new(DataFusionServiceGrpcImpl::new(
DataFusionService::new(Arc::clone(&session_builder)),
))
.max_decoding_message_size(self.max_message_size_bytes)
.max_encoding_message_size(self.max_message_size_bytes);

let worker = build_worker(session_builder);

router
.add_service(query_server)
.add_service(worker.into_worker_server())
}
}

Expand All @@ -225,34 +236,28 @@ pub(crate) fn build_datafusion_mount(
enabled.insert("datafusion-worker");
file_descriptor_sets.push(quickwit_datafusion::proto::DATAFUSION_FILE_DESCRIPTOR_SET);

let max_size = max_message_size_bytes;
let session_builder = Arc::clone(session_builder);

let router_mod = Box::new(move |router: Router| {
let query_server = DataFusionServiceServer::new(DataFusionServiceGrpcImpl::new(
DataFusionService::new(Arc::clone(&session_builder)),
))
.max_decoding_message_size(max_size)
.max_encoding_message_size(max_size);

let worker = build_worker(Arc::clone(&session_builder));

router
.add_service(query_server)
.add_service(worker.into_worker_server())
});

DataFusionMount {
router_mod: Some(router_mod),
session_builder: Some(Arc::clone(session_builder)),
max_message_size_bytes,
}
}

#[cfg(test)]
mod tests {
use quickwit_proto::ingest::ingester::IngesterStatus;
use tower::layer::util::Identity;

use super::*;

#[test]
fn datafusion_mount_accepts_layered_tonic_router() {
let (_health_reporter, health_service) = tonic_health::server::health_reporter();
let mut server = tonic::transport::Server::builder().layer(Identity::new());
let router = server.add_service(health_service);

let _router = DataFusionMount::noop().apply(router);
}

#[tokio::test]
async fn datafusion_worker_changes_ignore_non_searcher_adds() {
let node = ClusterNode::for_test(
Expand Down
Loading