Exact table statistics for the optimizer (DataFusion 54)#201
Conversation
| shuffle) and, in future, join-driven partition pruning. The native | ||
| engine is **eager** (it materialises the result) and does not register | ||
| Python scalar UDFs; use the default engine for ``cftime`` filters or | ||
| multi-dimension-group datasets. |
There was a problem hiding this comment.
That the native engine is eager and not lazy is a merge blocker. Laziness is a must.
There was a problem hiding this comment.
Fixed in a4258a6. The native engine is now lazy: NativeContext.sql() returns a lazy NativeDataFrame (planned, not executed) that streams result batches instead of collecting, and it round-trips through the same to_pandas/to_dataset — including the chunked, dask-backed path — as the FFI engine. A reduction or chunked scan over a larger-than-memory store never materialises the whole result. Covered by test_native_lazy_chunked_roundtrip and test_native_sql_returns_lazy_frame.
Generated by Claude Code
| if self._native is not None: | ||
| raise NotImplementedError( | ||
| "The native engine does not yet support datasets whose " | ||
| "variables span multiple dimension groups (which register as " | ||
| f"a SQL namespace). {name!r} has groups {list(groups)}. Use " | ||
| "engine='ffi' for namespaced datasets." | ||
| ) |
There was a problem hiding this comment.
We also needed this support.
There was a problem hiding this comment.
Added in ab7a847. The native engine now supports multi-dimension-group datasets as SQL namespaces (NativeContext.register_table accepts a schema-qualified name like era5.surface and creates the schema on demand) and the cftime() filter UDF (a native ScalarUDFImpl that calls the Python cftime callable, since a datafusion-python UDF can't cross the FFI boundary). Tests: test_native_multigroup_namespace and test_native_cftime_udf. Let me know if "this support" meant something else.
Generated by Claude Code
| inner = SessionContext().from_arrow(table) | ||
| return XarrayDataFrame(inner, templates=self._registered_datasets) | ||
|
|
||
| def explain_native(self, query: str) -> str: |
There was a problem hiding this comment.
I don't think this should be a public API
There was a problem hiding this comment.
Agreed — made it private in a4258a6. It's now _explain_native (used only by tests to confirm stats/plan shape), so it's no longer part of the public surface.
Generated by Claude Code
| the existing :class:`XarrayDataFrame` round-trip — ``to_pandas`` and | ||
| ``to_dataset`` — works unchanged. | ||
| """ | ||
| batches = self._native.sql(query) |
There was a problem hiding this comment.
This needs to be lazy. What if we wanted to load ARCO-ERA5 (several petabytes)
There was a problem hiding this comment.
Fixed in a4258a6 (same lazy-engine change). The result is no longer collected — NativeDataFrame.execute_stream() pulls batches on demand (releasing the GIL per batch), and the chunked to_dataset path re-plans per chunk with a coordinate filter that pushes into the scan and prunes source partitions. So a SELECT over ARCO-ERA5-scale data reads chunk-by-chunk and never holds the whole input or output in memory.
Generated by Claude Code
| return self._batch | ||
|
|
||
|
|
||
| class NativeFrame: |
There was a problem hiding this comment.
Does this serve the entire DataFrame API surface? Can we subclass it so it does?
ab7a847 to
913707e
Compare
| /// Execution, schema, ordering, and partitioning are delegated verbatim to the | ||
| /// inner plan (so projection mechanics are reused unchanged); the only thing | ||
| /// this node adds is real cardinality. When consumed natively (not across the | ||
| /// FFI boundary, which drops statistics entirely), this is what lets |
There was a problem hiding this comment.
I think this comment is stale
913707e to
997f73d
Compare
| /// what lets `COUNT(*)` be answered without a scan. Column min/max would add | ||
| /// range hints but do not change those decisions (WHERE filters are already | ||
| /// handled by partition pruning), so they are omitted to keep this simple. | ||
| fn build_scan_statistics(output_schema: &Schema, metas: &[&PartitionMetadata]) -> Statistics { | ||
| let mut stats = Statistics::new_unknown(output_schema); | ||
| stats.num_rows = sum_row_counts(metas.iter().copied()); | ||
| stats |
There was a problem hiding this comment.
This may be too simple. I think adding all the relevant stats that we have is good. It may help us later.
There was a problem hiding this comment.
Agreed — expanded in c7550c5. build_scan_statistics now reports every stat we can derive from coordinate metadata without touching the data, each exact:
num_rows(as before)total_byte_size=num_rows × fixed row width(derived in Rust from the projected schema's primitive widths;Absentif any column is variable-width)- per dimension-coordinate column: exact
min/max(folded coordinate bounds — the join/filter keys) andnull_count = 0(grid axes are always fully populated)
Verified through the FFI boundary — EXPLAIN on a projected scan now shows Rows=Exact(2000), Bytes=Exact(48000), [(Col[0]: Min=Exact(Int64(0)) Max=Exact(Int64(3)) Null=Exact(0)), …], with data-variable columns correctly carrying no bounds. Pinned by test_exact_byte_size_in_scan_statistics and test_dimension_column_min_max_in_scan_statistics.
I left distinct_count and sum_value as Absent: distinct count would need per-dimension cardinality (not present in the min/max metadata) plus a coordinate-uniqueness guarantee we don't enforce, and reporting an inexact value as Exact would mislead the optimizer.
Generated by Claude Code
c7550c5 to
24f2a8d
Compare
Report exact statistics from the xarray scan so DataFusion's cost-based optimizer can plan joins and aggregations well — without a second engine. DataFusion 54's datafusion-ffi forwards ExecutionPlan statistics across the FFI boundary (52/53 dropped them), so the statistics the scan reports now reach the optimizer on the ordinary path. - XarrayScanExec wraps the StreamingTableExec from scan() and reports exact Statistics: num_rows is the summed product of each chunk's dimension sizes (exact, not an estimate), plus exact min/max for numeric dimension columns. Per-partition row counts are plumbed from Python as a third tuple element (factory, metadata, num_rows); the 2-tuple form still works. - Upgrade datafusion + datafusion-ffi 52 -> 54 (and arrow 57 -> 58, pyo3 0.26 -> 0.28 to match), and the datafusion Python dep to 54. Verified: a big-vs-small join now plans as HashJoinExec mode=CollectLeft with the small side's Rows=Exact(64) carried through the FFI boundary (FFI_ExecutionPlan: XarrayScanExec), and COUNT(*) is answered from the exact statistics without scanning. The reader tests that used COUNT(*) to force a scan now use SELECT * (COUNT(*) is metadata-only once statistics are exact). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N
24f2a8d to
5f05dce
Compare
Report exact statistics from the xarray scan so DataFusion's cost-based optimizer can plan joins and aggregations well — on the ordinary engine, with no second engine and the full
datafusion-pythonDataFrame surface preserved.Why this works now
Earlier exploration went down a "native in-process engine" path because
datafusion-ffi(52/53) droppedStatisticsacross the FFI boundary — a foreign scan reported unknown cardinality. DataFusion 54 forwardsExecutionPlanstatistics across FFI (FFI_ExecutionPlangainedpartition_statistics), so the statistics our scan reports now reach the optimizer through the normal FFI table-provider path. The native engine is gone.What's here
XarrayScanExecwraps theStreamingTableExecfromscan()and reports exactStatistics. Every stat is derived from coordinate metadata xarray already knows — none of it scans the data — and each is exact, not an estimate:num_rows— summed product of each surviving chunk's dimension sizes. DrivesJoinSelection's build-side choice and letsCOUNT(*)skip the scan.total_byte_size—num_rows × fixed row width(derived in Rust from the projected schema's primitive widths).min/max(the join/filter keys) andnull_count = 0(grid axes are always fully populated). Data variables are left unknown; their bounds would require a scan.TableProvider::statistics()): the physicalJoinSelectionrule reads statistics off theExecutionPlan, not off theTableProvider— so injecting them at the plan node is what actually drives build-side selection, even under DF 54's FFI forwarding. Confirmed empirically:TableProvider::statistics()alone leaves the join asmode=Partitioned.num_rowsis plumbed from Python as an optional third tuple element(factory, metadata, num_rows)—_block_len(block), the product of the chunk's slice extents (the 2-tuple form still works). The min/max bounds reuse the same coordinate metadata already computed for partition pruning. Nothing is recomputed from the data.datafusion+datafusion-ffi52 → 54 (andarrow57 → 58,pyo30.26 → 0.28 to match), plus thedatafusionPython dep → 54.Verified
HashJoinExec: mode=CollectLeftwith the small side'sRows=Exact(64)carried through the FFI boundary (FFI_ExecutionPlan: XarrayScanExec) — the cost-based build-side choice statistics unlock.EXPLAINshowsRows=Exact(2000), Bytes=Exact(48000), [(Col[0]: Min=Exact(Int64(0)) Max=Exact(Int64(3)) Null=Exact(0)), …], with data-variable columns carrying no bounds.COUNT(*)is answered from the exact statistics without scanning.tests/test_stats.pypins the statistics contract; reader tests that usedCOUNT(*)to force a scan now useSELECT *, sinceCOUNT(*)is metadata-only once statistics are exact).🤖 Generated with Claude Code
https://claude.ai/code/session_019VuSeCio99NcME5eubcN3N