Skip to content

Add quickwit-datafusion crate#6270

Draft
alexanderbianchi wants to merge 1 commit intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/wide-metrics-df-quickwit-clean
Draft

Add quickwit-datafusion crate#6270
alexanderbianchi wants to merge 1 commit intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/wide-metrics-df-quickwit-clean

Conversation

@alexanderbianchi
Copy link
Copy Markdown

@alexanderbianchi alexanderbianchi commented Apr 6, 2026

Overview

Adds quickwit-datafusion — a DataFusion-based query execution layer for parquet metrics splits, built on top of PR #6237 (wide-schema parquet pipeline).

What's in this PR

  • quickwit-datafusion crate — self-contained query layer with a pluggable QuickwitDataSource trait, DataFusionSessionBuilder, QuickwitSchemaProvider, and DataFusionService (streaming gRPC: ExecuteSql + ExecuteSubstrait)
  • MetricsDataSource — parquet metrics source: metastore-backed split discovery, 30s object-store cache, filter pushdown with CAST-unwrapping fix, Substrait ReadRel consumption
  • Distributed executionDistributedPhysicalOptimizerRule decomposes queries into tasks (one per split) via PartitionIsolatorExec. Tasks, not shuffles — parquet splits are self-contained so no cross-worker repartitioning is needed
  • Opt-in — the DataFusionService only starts when QW_ENABLE_DATAFUSION_ENDPOINT=true is set; zero impact on existing deployments
  • Integration tests — pruning, aggregation, time-range filtering (CAST unwrap proven), GROUP BY, distributed task shape, NULL-column fill, Substrait named-table queries, rollup, partial schema projection

Execution Flow

flowchart TD
    Client -->|SQL or Substrait bytes| SVC[DataFusionService\nExecuteSql / ExecuteSubstrait]

    subgraph Coordinator
        SVC --> SB[DataFusionSessionBuilder\nbuild_session]
        SB --> CTX[SessionContext\n+ QuickwitSchemaProvider]

        CTX -->|Substrait| SC[QuickwitSubstraitConsumer\nconsume_read ReadRel]
        CTX -->|SQL| SP[QuickwitSchemaProvider\ntable lookup]

        SC --> MD[MetricsDataSource]
        SP --> MD

        MD --> IR[MetastoreIndexResolver\nresolve index_name]
        IR -->|split_provider · object_store · url\n30 s object-store cache| TP[MetricsTableProvider]

        TP --> OPT[DataFusion Optimizer]
        OPT -->|SearcherPool ≥ 2 nodes| DIST[DistributedPhysicalOptimizerRule]

        DIST --> EST[QuickwitTaskEstimator\nDesired N = num splits]
        EST --> PIE[PartitionIsolatorExec\nt0 gets splits 0,1 · t1 gets splits 2,3 …]
        PIE -->|WorkerService gRPC| WORKERS[(Workers)]
    end

    subgraph Worker [Worker — runs per task]
        WORKERS --> SCAN[MetricsTableProvider::scan\npushed-down filters]
        SCAN --> EXT[extract_split_filters\nMetricsSplitQuery\nCAST-unwrapping fix]
        EXT --> MSP[MetastoreSplitProvider\nlist_splits → metastore RPC]
        MSP -->|published splits matching\nmetric_names · time_range · tags| PS[ParquetSource\nbloom filter · page index · pushdown]
        PS --> OBS[QuickwitObjectStore\nquickwit_storage::Storage bridge]
    end

    OBS --> AGG[Partial aggregates]
    AGG --> NCE[NetworkCoalesceExec\ncoordinator merges]
    NCE -->|Arrow IPC stream| Client
Loading

Notes

  • When SearcherPool has only one node the distributed rule is a no-op — the plan runs locally as a standard DataSourceExec.
  • PartitionIsolatorExec assigns each split to a specific worker task; workers execute their local parquet scans and return partial aggregates. No NetworkShuffleExec (no cross-worker repartitioning) because splits are already self-contained.
  • QuickwitObjectStore is a read-only adapter: get_opts, get_range, head are implemented; all write/list operations return NotSupported.

@alexanderbianchi alexanderbianchi force-pushed the bianchi/wide-metrics-df-quickwit-clean branch from a7618bc to f8c18a9 Compare April 6, 2026 21:10
@alexanderbianchi alexanderbianchi force-pushed the bianchi/wide-metrics-df-quickwit-clean branch from f8c18a9 to 80da189 Compare April 6, 2026 21:52
@mattmkim
Copy link
Copy Markdown
Contributor

mattmkim commented Apr 7, 2026

Asked Claude to give me a strategy on how to review.

PR #6270 Review Plan: Add quickwit-datafusion crate

Scope

The local branch has the parent PR #6237 already merged to main. The actual PR changes are the uncommitted modifications (10 existing files) plus all untracked files (~5300 lines in the new crate, integration tests, proto definitions). Total: ~7100 additions.

Review Order (dependency-driven, bottom-up)

Phase 1: Protocol & Interface Contract

Review the proto definition and generated code first — everything else depends on this API surface.

File What to check
quickwit-proto/protos/quickwit/datafusion.proto RPC signatures, message types, streaming semantics. Are the request/response types sufficient? Is bytes the right wire type for Substrait plans and Arrow IPC?
quickwit-proto/src/datafusion/mod.rs Wrapper correctness, FILE_DESCRIPTOR_SET exposure
quickwit-proto/build.rs Proto compilation config matches existing patterns

Phase 2: Core Abstractions (the generic execution layer)

These files form the extension-point architecture. Review for soundness and future-proofing.

File What to check
quickwit-datafusion/src/data_source.rs QuickwitDataSource trait: are the lifecycle hooks sufficient? Is DataSourceContributions accumulator safe against conflicting contributions?
quickwit-datafusion/src/session.rs DataFusionSessionBuilder: shared RuntimeEnv design, optimizer rule ordering, DataFusion 52 API usage correctness
quickwit-datafusion/src/catalog.rs QuickwitSchemaProvider: table routing, information_schema support, error handling for missing indexes
quickwit-datafusion/src/storage_bridge.rs QuickwitObjectStore: read-op correctness (byte range, head), error mapping, that write stubs properly return NotSupported
quickwit-datafusion/src/substrait.rs QuickwitSubstraitConsumer: ReadRel routing, ExtensionTable→NamedTable rewrite, schema hint propagation, fallback behavior

Phase 3: Distributed Execution

Critical path — correctness bugs here affect query results.

File What to check
quickwit-datafusion/src/task_estimator.rs Split-count estimation: does it correctly count parquet file groups? Edge case: 0 splits, 1 split, more splits than workers
quickwit-datafusion/src/resolver.rs QuickwitWorkerResolver: SearcherPool→URL mapping, TLS flag correctness, port handling
quickwit-datafusion/src/worker.rs Worker session builder: does it correctly mirror the coordinator's catalog/sources?

Phase 4: Metrics Data Source (the concrete implementation)

The bulk of domain logic lives here.

File What to check
sources/metrics/mod.rs MetricsDataSource: init/contributions/worker registration, object store caching (30s), schema handling
sources/metrics/table_provider.rs MetricsTableProvider: filter pushdown integration, supports_filters_pushdown correctness, scan plan construction
sources/metrics/predicate.rs High priority — filter extraction, CAST unwrapping, time-range boundary correctness (off-by-one?), tag pushdown limitations. Verify the 16 unit tests cover edge cases
sources/metrics/index_resolver.rs MetastoreIndexResolver: N+1 RPC concern, caching strategy (or lack thereof)
sources/metrics/metastore_provider.rs MetastoreSplitProvider: multi-value IN list limitation — is correctness preserved when pushdown falls through?
sources/metrics/factory.rs DDL factory: STORED AS metrics registration

Phase 5: Service Layer & gRPC Integration

Where the crate meets the outside world.

File What to check
quickwit-datafusion/src/service.rs DataFusionService: SQL and Substrait execution paths, error handling, streaming semantics
quickwit-serve/src/datafusion_api/grpc_handler.rs gRPC adapter: error→Status mapping, Arrow IPC serialization correctness, message size limits
quickwit-serve/src/grpc.rs Service registration: ordering, conditional mounting, file descriptor sets
quickwit-serve/src/lib.rs Startup flow: env var gating, setup_searcher return-value change (SearcherPool exposure), OtlpGrpcMetricsService addition (is this part of this PR or a separate concern?)

Phase 6: Integration Tests

Validate end-to-end correctness claims.

File What to check
metrics_datafusion_tests.rs Coverage: pruning, aggregation, time-range, GROUP BY, NULL fill, Substrait named-table, projection
metrics_distributed_tests.rs Distributed task shape, multi-split partitioning
rollup_substrait.json Substrait fixture validity
cluster_sandbox.rs Test infra changes — do they affect other tests?

Phase 7: Cross-Cutting Concerns

Concern What to check
Dependencies datafusion-distributed is a git dep — is this pinned to a commit? datafusion = "52" vs workspace — should it be a workspace dep? url = "2" not from workspace
Feature gating QW_ENABLE_DATAFUSION_ENDPOINT — is it consistently checked? No code path that bypasses it?
Cargo.toml Workspace member registration, dependency minimality per CLAUDE.md guidelines
OtlpMetrics additions lib.rs adds OtlpGrpcMetricsService and changes local_shards_update_listener condition — verify these are intentional for this PR and not accidental scope creep
setup_searcher signature change Now returns SearcherPool — verify all callers updated (including tests)

Key Risk Areas to Prioritize

  1. predicate.rs CAST unwrapping — incorrectly unwrapping a CAST could silently produce wrong filter bounds (wrong query results)
  2. Distributed partitioningPartitionIsolatorExec split assignment must be deterministic and exhaustive (no split dropped, no split double-counted)
  3. StorageBridge byte ranges — off-by-one in get_range could corrupt parquet reads
  4. Multi-value IN pushdown fallback — if metastore can't push multi-value tags, the remaining filter must still be applied at parquet scan level
  5. OtlpMetrics scope creep — changes to lib.rs that add metrics OTLP ingestion may deserve a separate PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants