feat(core): add local parquet metadata sidecar file for optimized query planning#6913
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
739982c to
85bedbf
Compare
b6f00e6 to
f117804
Compare
…s and documentation
…ntext and O3PartitionJob
…ration after corrupt metadata
bb24c23 to
7c50bc3
Compare
- Renamed test methods for clarity and consistency. - Added tests to ensure migration handles corrupt and empty parquet files correctly. - Implemented checks for parquet metadata generation and validation after migration. - Enhanced error handling for corrupt parquet files during migration. - Introduced a method to patch parquet file size in transaction files. - Ensured migration respects the committed parquet file size over actual file length. - Added tests to verify behavior when encountering stale or missing metadata.
TableReader.openParquetMetadata previously opened the _pm file twice per partition: ParquetMetaFileReader.readParquetMetaFileSize opened the fd to read the 8-byte size header and closed it, then MemoryCMRDetachedImpl reopened the same path to mmap the file. Add MemoryCMRImpl.ofWithSizeFromHeader, which opens once, reads the mapping size from offset 0 through the just-opened fd, validates against ff.length(fd) to prevent SIGBUS on an over-large mapping, and maps. The detached subclass overrides the method to close the fd after mapping (the mmap survives), matching the existing of() override. TableReader.openParquetMetadata now constructs (or reuses) a MemoryCMRDetachedImpl slot and calls ofWithSizeFromHeader, halving the per-partition openRO count on the parquet-partition open path. The slot reuse pattern via parquetMetadataPartitions is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
[PR Coverage check]😍 pass : 11111 / 12398 (89.62%) file detail
|
Brings in 4 upstream commits (c0c5638..8735521): * feat(core): add local parquet metadata sidecar file for optimized query planning (#6913). Introduces the `_pm` sidecar produced by `ParquetMetadataWriter`, the qdb-parquet-meta Rust crate, the pm_generate / pm_inspect binaries, Mig940 to backfill existing partitions, and a refactor of attachPartition / O3 parquet paths to derive parquet state from the sidecar. * fix(sql): EMA, VWEMA and KSUM failures in combined window queries (#7030). * chore(core): another posting index fix avoiding corrupt results (#7062). * docs(core): bump Java requirement to 25 in core/README (#7036). Conflict 1: core/src/main/resources/io/questdb/bin/linux-x86-64/libquestdbr.so is a prebuilt native library; took master's version (same resolution as prior 046c874 / 4302e10 merges). Other native libs updated cleanly. Conflict 2: MemoryTag.java tag-name registration. Took both - the errand-side NATIVE_MEMFD_STORAGE entry and the new master-side MMAP_PARQUET_METADATA_READER entry are independent. Semantic merge: TableWriter.attachPartition. Master refactored the method to detect parquet partitions from the post-rename `_pm` file (`parquetFileSize > -1`) and dropped the local `boolean isParquet`. Auto-merge silently kept errand's three `isParquet` references (skip attachValidateMetadata, upsert column tops instead of iterateDir, skip configureAppendPosition) without their declaration. Restored `boolean isParquet` and the early `PARQUET_PARTITION_NAME` probe so errand's parquet-attach optimisations stay intact alongside master's new sidecar generation block. Path is reset via trimTo after the detection probe. Also relocated the parquet-testing nested submodule from core/rust/qdbr/parquet2/testing/parquet-testing to core/rust/parquet2/testing/parquet-testing per the upstream rename; removed the now-empty leftover directory under core/rust/qdbr/parquet2. Build: mvn package -DskipTests -pl core -am succeeds. Tests not run in this commit.
Closes questdb/roadmap#101
Tandem https://github.com/questdb/questdb-enterprise/pull/978
Summary
Introduces a compact binary
_pmsidecar file that accompanies eachdata.parquetpartition file. The_pmfile stores all metadata the query engine needs: column descriptors, QuestDB column types, per-row-group column chunk byte ranges, compression codecs, encodings, and min/max statistics.Replaces the JSON metadata blob previously embedded in the parquet footer's key-value section with a purpose-built binary format. The
_pmfile is the single authoritative source of partition metadata for parquet partitions.Enables row group pruning via min/max statistics and bloom filter offsets stored locally in
_pm, without reading the parquet file itself.Lays the groundwork for cold storage: when a parquet file lives in object storage (S3, GCS, Azure Blob), the
_pmfile stays local and provides everything the query planner needs to decide which column chunks to fetch by byte range, eliminating metadata round-trips.Adds migration
Mig940to generate_pmfiles for all existing parquet partitions on engine upgrade.Motivation
QuestDB's parquet partition support previously stored QuestDB-specific metadata (column types, column tops, symbol flags) as a JSON blob inside the parquet footer under the
"questdb"key. This approach has several limitations:No column chunk byte ranges. The JSON metadata does not include per-column byte offsets and lengths. To decode a specific column chunk, the engine must parse the full parquet footer (thrift-encoded) to locate byte ranges. For cold storage, this requires a network round-trip to read the footer from the remote object store before any data fetch.
No row group statistics. Min/max values per column per row group are available in the parquet footer but not extracted into any local structure. The query engine cannot prune row groups without first downloading and parsing the footer.
No bloom filter references. Bloom filter offsets and lengths live in the parquet footer. Without local access to these references, the engine cannot skip row groups for equality predicates without reading the remote file.
No incremental updates. When an O3 merge appends a new row group to an existing parquet partition, the entire parquet footer must be re-read and re-parsed to access the updated metadata.
The
_pmfile addresses all of these by storing a complete, locally accessible, binary-encoded copy of all metadata the query engine needs.The
_pmbinary formatAll multi-byte integers are little-endian.
File layout
A reader locates the footer by reading
footer_offsetfrom the header at a fixed position. The trailer at the end of the file is retained for standalone validation but is not used on the hot read path.Column chunk (64 bytes)
Each column chunk in a row group block stores the information needed to locate and decode a column's data in
data.parquet:Statistics encoding. For fixed-width types that fit in 8 bytes (BOOLEAN, BYTE, SHORT, CHAR, INT, LONG, FLOAT, DOUBLE, DATE, TIMESTAMP), stats are always inlined directly in the
min_stat/max_statfields. For variable-length or wider types (VARCHAR, STRING, UUID, LONG256), stats are stored out-of-line in the region after the column chunks. In that case, themin_stat/max_statfields encode[offset: u32 (>> 3), length: u32]pointing to the out-of-line data.Stat flags (u8):
Encoding bitmask (u8):
Column flags (i32)
Feature flags (u64)
The header carries a single
feature_flagsfield that gates optional sections in both the header and footer. This allows additive extensions without bumping the format version.Current feature bits:
Bit 1 requires bit 0. Bit 2 requires
designated_timestamp >= 0. When bit 2 isset,
sorting_column_counton disk is 0 and readers infer sorting by thedesignated timestamp column.
_txnintegrationEach partition entry in
_txnconsists of 4 longs (LONGS_PER_TX_ATTACHED_PARTITION = 4):Field 3 retains its original meaning: the byte size of
data.parquet. This preserves backward compatibility — older versions of QuestDB read field 3 to memory-map the parquet file, so changing its meaning would break rollback.The
_pmfile size is not stored in_txn. Instead, readersstat()the_pmfile and locate the correct footer version via the header'sfooter_offsetfield, using the parquet file size from field 3 as the MVCC version token (see Read path below).TxWriter.setPartitionParquetFormat()writes both the parquet format bit in field 1 and the parquet file size in field 3 within a single transaction commit.Write path
Initial creation (convert to parquet)
When the engine converts a native partition to parquet, the Rust parquet encoder generates both
data.parquetand_pmin a single pass. The encoder:data.parquet._pmfile in memory viaParquetMetaWriter: column descriptors from the table schema, one row group block per parquet row group with column chunk metadata (byte ranges, codecs, statistics)._pmfooter with a CRC32, setsfooter_offsetin the header to point to it, and setsprev_footer_offsetto 0 (first version).TxWriter.setPartitionParquetFormat()to record the parquet file size in_txnfield 3.Incremental update (O3 merge)
When an out-of-order insert merges data into an existing parquet partition,
O3PartitionJobusesPartitionUpdaterto append a new row group:PartitionUpdater.of()opens the existingdata.parquet(read + write) and_pmfile.data.parquet._pmafter the old footer (the old footer becomes dead space tracked in theunused_bytesfooter field).row_group_count, CRC32, andprev_footer_offsetpointing to the old footer.footer_offsetin the header is updated to point to the new footer. This is an 8-byte aligned atomic write and is the commit point for the_pmfile.O3PartitionJobcommits the new parquet file size to_txnfield 3.Readers using an older
_txnsnapshot see a different parquet file size in field 3 than the latest_pmfooter. They follow theprev_footer_offsetchain from the header'sfooter_offsetto locate the footer whoseparquet_footer_offset + parquet_footer_length + 8matches their_txnparquet file size. This provides lock-free concurrent read/write access without storing_pmfile size in_txn.Schema evolution
When the table schema changes via
ALTER TABLE ADD COLUMNorALTER TABLE DROP COLUMN,PartitionUpdater.setTargetSchema()receives the new schema. The Rust updater copies existing row groups, inserting null column chunks for added columns viacopyRowGroupWithNullColumns()and dropping chunks for removed columns. The_pmfile reflects the target schema: its column descriptors match the new schema, and column IDs link each descriptor to the corresponding writer index.Read path
Memory mapping
TableReaderstat()s the_pmfile and memory-maps its full size. The reader then locates the correct footer version by readingfooter_offsetfrom the header and walking theprev_footer_offsetchain until finding the footer whose parquet file size matches_txnfield 3. This avoids storing_pmfile size in_txn, preserving backward compatibility for rollback.If
footer_offsetexceeds the mapped size (rare race with a concurrent O3 merge extending the file betweenstat()and mmap), the reader remaps with a freshstat().The parquet file size is read from
_txnfield 3, so the engine does not need tostat()the parquet file.ParquetMetaFileReaderA zero-allocation reader that operates directly over mmapped memory via
Unsafeoffset arithmetic. It provides:getColumnCount(),getColumnId(),getColumnType(),getColumnName().getRowGroupCount(),getRowGroupSize(),getTotalRowCount().getChunkMinStat(),getChunkMaxStat(),getChunkStatFlags().ParquetRowGroupSkipperimplementation for filter pushdown (see below).of(addr, fileSize, parquetFileSize)initializes the reader: it readsfooter_offsetfrom the header, then walks theprev_footer_offsetchain to find the footer whose parquet file size matches theparquetFileSizeargument (the MVCC version token from_txnfield 3).The first call to
canSkipRowGroup()lazily allocates a native handle that caches the parsed_pmheader and footer. The handle is reused across subsequent calls and freed byclose().ParquetMetaPartitionDecoderReplaces footer-based
PartitionDecoderfor table partitions. Java owns all metadata viaParquetMetaFileReader; Rust acts as a stateless decode engine that receives explicit parameters per decode call:decodeRowGroup(): decodes a full row group given column indices, row range, and output buffers.decodeRowGroupWithRowFilter(): decodes with a row-level filter predicate.Column type overrides (Symbol to Varchar, Varchar to VarcharSlice) come from the Java-side column type, while base types come from the
_pmfile. For cold storage, the same decode API works: the engine downloads column chunks by byte range using offsets from_pmand passes the data to the same Rust decoder.Row group pruning
ParquetRowGroupFilter.prepareFilterList()accepts aParquetMetaFileReaderand builds a filter list from SQLWHEREclause conditions. The filter list references column indices resolved from_pmcolumn names and min/max statistics.At scan time,
ParquetMetaFileReader.canSkipRowGroup()evaluates the filter list against the row group's inlined statistics and returnstrueif the row group can be skipped entirely. This happens on the Java side without any JNI call for the common case of inlined fixed-width statistics; the native handle is only needed for out-of-line or complex comparisons.Bloom filter offsets stored in
_pmfooter feature sections (whenBLOOM_FILTERSflag is set) enable equality-predicate pruning for columns with bloom filters.Migration (
Mig940)Mig940runs during engine upgrade on tables with parquet partitions:_metato determinepartitionByand timestamp column type._txnfor read-only access._txnfield 3.data.parquetread-only._pmfile.ParquetMetadataWriter.generate(), which reads the parquet footer, extracts QuestDB metadata from the"questdb"JSON key-value pair, and writes the complete_pmfile. The parquet file size from_txnfield 3 is used as the authoritative parquet file size (notstat()on the parquet file).Mig940does not modify_txn. Field 3 retains its original meaning (parquet file size), preserving backward compatibility for rollback. The migration only generates_pmsidecar files.The migration is non-destructive: original
data.parquetfiles and_txnremain unchanged. It operates only on local partitions (no cold storage access required).Rollback safety
_txnfield 3 stores the parquet file size (not the_pmfile size). Older versions of QuestDB use field 3 to memory-mapdata.parquet, so keeping its original meaning preserves backward compatibility for rollback. Readers locate the_pmfooter via the header'sfooter_offsetfield and walk theprev_footer_offsetchain, using the parquet file size as an exact-match MVCC token.If a user rolls back to an older version, writes data that modifies
data.parquet(O3 merge, new partitions, etc.), and then re-upgrades, the_pmfiles become stale: the footer chain contains no footer matching the new parquet file size.ParquetMetaFileReader.of()detects this via exact matching and throws.Escape hatch: set
cairo.repeat.migration.from.versioninserver.confto forceMig940to re-run on the next restart:cairo.repeat.migration.from.version=427This causes the migration framework to reset the migration version and re-run
Mig940, which regenerates_pmfiles for all parquet partitions. Remove the property after the restart.SHOW PARTITIONSintegrationShowPartitionsRecordCursorFactoryusesParquetMetaFileReaderto extract min/max timestamps from parquet partitions without parsing the parquet footer. It reads the designated timestamp column index from the_pmheader, then fetches inlined min/max statistics from the first and last row group blocks.Test plan
_pmcreation: convert native partition to parquet, verify_pmfile is generated alongsidedata.parquetand matches expected metadata_pmis updated with the new row group and old data remains accessible_pmhandles repeated appends with growingunused_bytesfrom discarded footers_pmdescriptors_pmreflects the new schema with null chunks for added columnsMig940): existing parquet partitions without_pmfiles, verify migration generates correct_pmwithout modifying_txnParquetMetaFileReaderedge cases: zero row groups, large row group counts, format version validation, corrupted trailer detectionParquetMetaPartitionDecoderlifecycle: verifyof()releases pre-existing native handles,destroy()clears reader unconditionallycanSkipRowGroup()correctly prunes row groups based on min/max statistics for various column types and filter conditionsSHOW PARTITIONS: verify min/max timestamps from_pmmatch expected valuesparquet_footer_offset + parquet_footer_length + 8matches_txnfield 3 parquet file size_txnsnapshot walksprev_footer_offsetchain to find the correct footer versionfooter_offsetconcurrency: verify reader correctly handlesfooter_offsetexceeding mapped size (remap path)_pmfiles survive partition detach and reattachcargo test --libfor format serialization, deserialization, CRC computation, stat encoding, feature flag validationdecode_pm_e2e): write parquet, convert to_pm, extract fields, decode column chunks, compare with footer-based decode