feat(jni): add Java bindings for compute_zonemap_batch + write_consolidated_zonemap_segment#6780
Open
LuciferYang wants to merge 4 commits into
Open
feat(jni): add Java bindings for compute_zonemap_batch + write_consolidated_zonemap_segment#6780LuciferYang wants to merge 4 commits into
LuciferYang wants to merge 4 commits into
Conversation
Contributor
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
Expose the API surface needed for a coordinator (e.g. a Spark driver in lance-spark) to consolidate per-fragment zonemap batches computed in parallel by worker tasks into a single committed zonemap.lance index file, without re-running the train phase on the driver. Public additions in lance-index::scalar::zonemap: - `ZONEMAP_FILENAME` const — the canonical filename inside the IndexStore so external callers don't hardcode the string. - `zonemap_stats_schema(value_type)` — canonical Arrow schema factory for the on-disk zone stats batch. min/max take a caller-supplied type (the indexed column's type); the remaining metadata columns are fixed types and non-nullable. - `validate_zonemap_stats_schema(schema)` — strict structural validator (7 columns in canonical order, fixed metadata types and nullability, min/max sharing one type). Public so coordinators can pre-flight validate before write. - `write_zonemap_index_from_batch(batch, params, store)` — free fn that writes a pre-computed batch via the IndexStore without re-running the train phase. Takes `params: &ZoneMapIndexBuilderParams` so future ZoneMap knobs that affect on-disk metadata can extend the parameter set without breaking the signature. - `ZoneMapIndexBuilder::zonemap_stats_as_batch` made `pub` for the worker-side `compute_zonemap_batch` consumer. Public addition in lance::index::scalar: - `compute_zonemap_batch(dataset, column, fragment_ids, params)` — computes the zone-stats RecordBatch for a fragment subset WITHOUT writing. The min/max value type is derived from the post-scan training stream schema (not `dataset.schema()`), so dictionary → primitive type adaptation in the scanner is reflected correctly. Tests: - Validator: 7 unit tests covering each rejection branch plus the documented permissiveness on min/max nullability. - Round-trip: writes a consolidated zonemap.lance from concatenated per-fragment batches, reads it back, asserts schema is preserved and per-fragment zone counts match ceil(rows_per_frag/rows_per_zone) via a BTreeMap multiset (set-membership would mask duplicated rows). Uses rows_per_zone=4 with 10 rows per fragment so each fragment contributes multiple zones — covering the multi-zone-per- fragment shape rather than the trivial single-zone case.
Completes the build-time-consolidation API surface paired with the
existing worker-side compute_zonemap_batch. The coordinator (Spark
driver, Python driver, etc.) does:
1. Per-fragment-subset workers call compute_zonemap_batch and
return Arrow batches.
2. Coordinator concatenates them.
3. Coordinator calls write_consolidated_zonemap_segment, which:
- validates the indexed column exists, captures its field id
- derives the IndexMetadata.fragment_bitmap from the batch's
fragment_id column (so a fragment hole in the inputs ⇒ a
hole in the bitmap, no implicit "all fragments" fiction)
- allocates a fresh UUID directory under indices/
- writes zonemap.lance via the existing
write_zonemap_index_from_batch (which structurally validates
the batch on the way in)
- returns a full IndexMetadata populated with the canonical
ZoneMap index_version, prost-encoded ZoneMapIndexDetails,
and the file listing for skip-HEAD optimisations.
4. Coordinator passes the returned IndexMetadata to the existing
commitExistingIndexSegments to land one IndexMetadata entry
covering all input fragments — a single segment instead of N.
ZONEMAP_INDEX_VERSION is now pub so this helper (and any future
external coordinator) can populate IndexMetadata.index_version
without duplicating the literal. Other scalar index versions stay
file-internal until they grow a similar need.
End-to-end test verifies (a) every input fragment lands in the
returned bitmap, (b) the field id matches the schema lookup,
(c) the index_version equals ZONEMAP_INDEX_VERSION, (d) the
freshly written directory contains zonemap.lance.
5caec47 to
2412bae
Compare
Worker-side entry point for build-time zonemap consolidation: a Spark driver fans out per-fragment compute calls to worker tasks, each returning an Arrow RecordBatch of zone stats. The driver then concatenates and writes a single consolidated zonemap.lance, bypassing the per-segment commit shape that scales poorly with fragment count at plan time. JNI side (java/lance-jni/src/blocking_dataset.rs): - `nativeComputeZonemapBatch(column, fragmentIds, paramsJson, arrayAddr, schemaAddr)` exposes the Rust `compute_zonemap_batch` pub fn over Arrow C Data Interface. Java allocates ArrowArray + ArrowSchema buffers; Rust writes FFI_ArrowArray + FFI_ArrowSchema to those addresses. fragmentIds=null means "all fragments" (matches the Rust Option<Vec<u32>> contract); an empty long[] is rejected — pass null to mean "every fragment". paramsJson="" falls through to ZoneMapIndexBuilderParams::default(). - Negative or > u32::MAX fragment ids from Java are rejected explicitly rather than silently wrapping. Java side (java/src/main/java/org/lance/Dataset.java): - `computeZonemapBatch(column, fragmentIds, paramsJson, allocator) -> VectorSchemaRoot` — caller owns and must close the returned root. Uses Arrow Java's Data.exportVectorSchemaRoot share-by- refcount semantics for the FFI export; sources remain valid afterward. FFI handles tracked-immediately-after-allocation and cleaned up via release()+close() in finally with NullPointerException swallow for the path where Data.importVectorSchemaRoot has already consumed the array. Tests (ComputeZonemapBatchTest): - All-fragments + explicit rows_per_zone: verifies the canonical 7-column schema (min, max, null_count, nan_count, fragment_id, zone_start, zone_length) round-trips, with type AND nullability pinned per column. Uses explicit rows_per_zone so the test doesn't depend on LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE env var. - Fragment subset + rows_per_zone=5: verifies both knobs reach Rust, and that the returned batch's fragment_id column literally contains the requested subset (not just the right zone count). - Empty fragment array, null column, null/empty paramsJson: precondition + Rust-side rejection paths.
Driver-side completion of the build-time zonemap consolidation JNI
surface. Pairs with computeZonemapBatch: workers compute per-fragment
batches → driver concatenates them via Rust → driver writes one
consolidated zonemap.lance and gets back an Index handle → driver
passes the handle to the existing commitExistingIndexSegments to
land one IndexMetadata covering every input fragment, instead of
one IndexMetadata per fragment.
JNI side (java/lance-jni/src/blocking_dataset.rs):
- `nativeWriteZonemapIndexFromBatches(indexName, column, arrayAddrs,
schemaAddrs, paramsJson) -> Index` takes parallel long[] arrays of
FFI_ArrowArray / FFI_ArrowSchema pointers — one pair per worker
batch. Rust reconstructs each RecordBatch via from_ffi_and_data_type
(matching the existing inner_write_batch ownership-transfer
contract), validates batch[0].schema() == every other batch's
schema (strict equality, not asymmetric Schema::contains), and
passes the concatenated batch to write_consolidated_zonemap_segment.
- Canonical join schema for concat is `batches[0].schema()`, NOT
`dataset.schema()` — compute_zonemap_batch derives min/max
value_type from the post-scan training stream (dictionary→primitive
adaptation, extension-type unwrap), so worker batches and dataset
schema can legitimately diverge on adapted-type columns.
Java side (java/src/main/java/org/lance/Dataset.java):
- `writeZonemapIndexFromBatches(indexName, column, List<VSR>,
paramsJson, allocator) -> Index`. Two-pass: validate every list
element non-null before any FFI export, then allocate one
(ArrowArray, ArrowSchema) handle per batch and add to a cleanup
list IMMEDIATELY after each allocation — an OOM on the second
allocation cannot strand the first one unreleased. The finally
block calls release()+close() on every tracked handle: release()
fires the producer-side callback that Data.exportVectorSchemaRoot
installed (so any retained ref to source buffers is dropped),
then close() frees the 80-byte struct holder. close() alone would
leak producer-side private_data on a JNI-error-before-from_raw
path (empirically demonstrated: a malformed paramsJson on a 2-
batch call leaks ~4.6KB without this fix).
- The returned Index is uncommitted; caller hands it to
commitExistingIndexSegments(name, column, List.of(index)) to land
one IndexMetadata for all fragments.
Driver helper doc (rust/lance/src/index/scalar.rs): spells out (a)
which IndexMetadata fields survive commitExistingIndexSegments
(uuid, fragment_bitmap, index_details, index_version) vs which are
re-derived from the segment template (name, dataset_version,
created_at, files, base_id), and (b) the column-provenance contract
— the `column` argument MUST match the column every batch's stats
were computed against; no read-time marker would detect a coordinator
that fed batches computed for column A into a write call for column B.
Tests (WriteZonemapIndexFromBatchesTest):
- endToEndConsolidation: 4 fragments × 20 rows; two simulated workers
each compute batches for half (rows_per_zone=5 → 4 zones each).
Driver consolidates + writes + commits one segment. Reopens the
dataset and verifies (a) exactly one IndexMetadata for the index
name (the consolidation invariant), (b) getZonemapStats returns
16 zones, (c) per-zone EXACT min/max equality bounds (not just
fragment-wide range — a regression emitting fragment-wide
constants would have passed the looser check), (d) monotone-
nondecreasing min along zone_start within each fragment.
- singleWorkerBatch: Spark num_partitions=1 degenerate case.
- fragmentIdHolePreserved: workers cover {fragA, fragC}, fragB is
deliberately unindexed — committed bitmap and getZonemapStats
must literally hold the hole.
- overlappingFragmentIdsAcrossWorkers: pins the observable behaviour
for a coordinator-bug (overlapping subsets produce duplicated zone
rows on disk; RoaringBitmap dedups in the IndexMetadata).
- allNullColumn: every value NULL; nullCount==zoneLength per zone;
min/max round-trip as null.
- nanNotConflatedWithNull: Float64 column with NaN values must
preserve nullCount=0 through the FFI pipeline (NaN must not be
counted as NULL).
- rejectsDivergentBatchSchemas: two batches with different min/max
types (Int32 vs Int64) — strict Schema-equality check rejects with
IllegalArgumentException naming "schema".
- rejectsEmptyBatchList / rejectsNullBatchInList: precondition gates.
- invalidParamsJsonDoesNotLeakFfiHandles: malformed paramsJson
triggers JNI error AFTER batches have been exported into FFI
handles. Without the release()+close() finally fix, RootAllocator
detects ~4.6KB leak; with the fix the test passes cleanly.
- aboveU32MaxFragmentIdRejected / negativeFragmentIdRejected: full
out-of-range contract for the long[] fragment-id parameter.
- unknownColumnRejected: column-not-found path surfaces as
IllegalArgumentException naming the missing column.
All tests use a getFragments()-based discovery pattern instead of
hardcoded fragment ids, so future Lance changes to fragment-id
allocation don't silently break the tests.
2412bae to
3f17183
Compare
Contributor
Author
|
CI's Could a maintainer with admin rights trigger a re-run of run 25841274377? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds Java bindings for the two Rust APIs introduced in #6779:
Dataset.computeZonemapBatch(column, fragmentIds, paramsJson, allocator) → VectorSchemaRootDataset.writeZonemapIndexFromBatches(indexName, column, batches, paramsJson, allocator) → IndexTogether they let an external coordinator (Spark, Python, anything sitting on top of the Java SDK) compute per-fragment zone batches on workers and write a single consolidated zonemap segment from the driver.
Stacked on #6779
This PR depends on #6779 (
feat(zonemap): public API + driver helper for build-time consolidation). The diff currently shows 4 commits (the 2 from #6779 + the 2 added here); once #6779 lands I'll rebase this ontomainand the diff drops to the 2 commits added here.Please don't merge until #6779 is in.
FFI lifecycle
Both bindings thread data via the Arrow C Data Interface (
FFI_ArrowArray+FFI_ArrowSchema):computeZonemapBatchallocates one(array, schema)pair and imports into aVectorSchemaRoot. The import consumes the array on success; on failure both are released and closed defensively.writeZonemapIndexFromBatchesexports one pair per input batch, tracks them in parallel lists, and releases + closes infinallyregardless of native result. SourceVectorSchemaRoots remain valid after the call (export shares buffers by refcount).Tests
ComputeZonemapBatchTest— 5 cases (round-trip,nullvs emptylong[]semantics, missing column, all-NULL column, NaN handling).WriteZonemapIndexFromBatchesTest— 13 cases covering end-to-end consolidation, fragment-id hole preservation, all-NULL column, NaN-vs-NULL distinction, schema-mismatch rejection, and an explicit FFI-handle-leak check on the failure path.