Add packfile writer#702
Conversation
0e5725c to
f4db09f
Compare
There was a problem hiding this comment.
Pull request overview
Adds the packfile writer implementation for full-history RPC v2, including record-format definitions, optional content hashing, and an async Linux writeback hint to improve finish-time sync behavior.
Changes:
- Introduces
Writerwith serial and concurrent (pipeline) record assembly, compression/CRC handling, and optional SHA-256 content hashing. - Adds packfile on-disk format constants/enums and extends the offset index with a compile-time trailer-size check.
- Adds Linux-only
sync_file_range(...WRITE)writeback hint with a no-op fallback for other platforms.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod | Promotes golang.org/x/sys to a direct dependency (used by Linux writeback). |
| cmd/stellar-rpc/internal/packfile/packfile.go | Defines packfile magic/version/trailer constants, record formats, and trailer flag bits. |
| cmd/stellar-rpc/internal/packfile/index.go | Adds a compile-time constraint for groupSize trailer encoding. |
| cmd/stellar-rpc/internal/packfile/writer.go | Implements the packfile writer, including concurrency pipeline and content hashing support. |
| cmd/stellar-rpc/internal/packfile/contenthash.go | Adds chunked SHA-256 content-hash implementation for the serial path. |
| cmd/stellar-rpc/internal/packfile/writeback_linux.go | Implements Linux writeback hint via unix.SyncFileRange. |
| cmd/stellar-rpc/internal/packfile/writeback_other.go | No-op writeback implementation for non-Linux platforms. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5b0cac9 to
c5cad6b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c5cad6b to
daf2fe4
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| itemsPerRecord, err := resolveItemsPerRecord(opts) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| conc := opts.Concurrency | ||
| if opts.Format != Compressed && !opts.ContentHash { | ||
| conc = 0 // no pipeline when nothing to parallelize | ||
| } |
There was a problem hiding this comment.
WriterOptions.Concurrency and BytesPerSync aren’t validated for negative values. A negative Concurrency silently forces the serial path, and a negative BytesPerSync silently disables writeback. Consider rejecting negatives in Create() (similar to ItemsPerRecord) to avoid surprising behavior from misconfiguration.
| contentHash bool | ||
| serialHasher *contentHasher // serial path: streams items through contentHasher | ||
| digests []byte // concurrent path: accumulated 32-byte chunk digests | ||
| sizesPool sync.Pool // concurrent path: pooled []uint32 for hash goroutines |
There was a problem hiding this comment.
Content hashing currently retains all per-chunk digests in memory (digests []byte) and then hashes that buffer at the end. This makes memory usage grow with the number of records (32 bytes each). Consider streaming the final hash instead (e.g., keep a second sha256.Hash and Write(chunkDigest) as each chunk completes) so hashing stays O(1) memory.
e87b53a to
fc2709b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fc2709b to
ddb3cd8
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import "fmt" | ||
|
|
||
| const ( | ||
| magic = 0x534C4348 // "SLCH" |
There was a problem hiding this comment.
magic is written/read using binary.LittleEndian, but the constant value/comment (0x534C4348 // "SLCH") doesn’t match the on-disk byte order (this value would encode as "HCLS"). Either update the constant to match the intended on-disk ASCII bytes, or adjust the comment/endian so the file magic is unambiguous and self-consistent.
| magic = 0x534C4348 // "SLCH" | |
| magic = 0x48434C53 // "SLCH" when encoded with binary.LittleEndian |
| for _, tc := range cases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| path := filepath.Join(dir, tc.name) | ||
| _, err := Create(path, tc.opts) | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), tc.errSub) | ||
| }) |
There was a problem hiding this comment.
Test case names are used directly as filename components (filepath.Join(dir, tc.name)), but several names contain characters (notably >) that are invalid on Windows filesystems. Consider using a sanitized name (e.g., an index-based filename) so the tests remain portable.
| for i := range maxIters { | ||
| if err := w.AppendItem(item); err != nil { | ||
| return // success: error surfaced | ||
| } | ||
| _ = i |
There was a problem hiding this comment.
The loop declares i but then immediately discards it (_ = i). This can be simplified to for range maxIters (or use i in the error message) to avoid unnecessary noise in the test.
| for i := range maxIters { | |
| if err := w.AppendItem(item); err != nil { | |
| return // success: error surfaced | |
| } | |
| _ = i | |
| for range maxIters { | |
| if err := w.AppendItem(item); err != nil { | |
| return // success: error surfaced | |
| } |
ddb3cd8 to
2f5bf53
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The helper had a dual-role parameter (`payload`): scratch buffer for the compressed branch, actual record content for the passthrough branch. Reading the helper required reconciling those two semantics. Inline the 4-line operation at blockWorker and flushSerial, with a comment at each site about the aliasing constraint. The compress vs passthrough branching now reads as a single conditional that gets the record bytes into our own memory, followed by an unconditional append of the FOR index. Same allocation pattern, clearer intent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The on-disk format consistently calls the unit a "record" (ItemsPerRecord, recordCount), but the in-flight pipeline used "block" — same concept, different word. Unify on "record": blockWork -> pendingRecord (input to worker, awaiting processing) blockResult -> record (output from worker, ready to write) blockID -> ordinal (in-flight sequence number for ordering) nextBlockID -> nextOrdinal blockWorker -> recordWorker blockWg -> recordWg writeBlock -> writeRecord buildBlock -> buildRecord Also renames runWriter's `pending` map to `reorderBuf` — accurately describes its role (a reorder buffer keyed by ordinal), and avoids collision with the new `pendingRecord` type. No functional change. hashWork/hashResult left as-is since they're scoped to the inner hash sub-pipeline, not the outer record concept. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Workers are now spawned whenever a Writer has CPU work to do (NewCompressor != nil OR ContentHash). Pure passthrough writers (no compressor, no content hash) keep the inline write path on the main goroutine. This eliminates the serial/concurrent dichotomy in code: there's a single flush() that dispatches to workers, a single hash implementation living in the worker's hash sub-goroutine, and no separate flushSerial/serialHasher/serialCompressor. Concurrency now means "number of worker goroutines" with 0 defaulting to 1. The previous "0 or 1 means serial path" semantics are gone; 1 worker still serializes against the main goroutine via the channel but produces async hash overlap. Real wins: - contenthash.go and the chunked SHA-256 contentHasher type deleted (~80 lines) - flushSerial, hashSerial, serialHasher, serialCompressor removed - Single flush path; the if w.concurrency <= 1 branch is gone - ~2.81% wall-time win on serial_compressed_hash from async hash via the 1-worker pipeline Trade-off: - ~2 extra allocs per record on serial+compress configs (channel send overhead). Concurrent paths unchanged. - Real-world backfill workloads use Concurrency >= 1 already, so this affects test paths more than production. Bench (alternating runs at count=20×2s for thermal noise control): - All concurrent configs: parity (p > 0.1) - serial_compressed: parity (p=0.355) - serial_compressed_hash: -2.81% (p<0.001) - serial_passthrough_hash: parity (p=0.121) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review: a caller-supplied NewCompressor is documented
to return a Compressor but the type system can't prevent returning
nil. Without a guard, the worker's deferred compressor.Close() would
panic on a nil interface, crashing the worker goroutine.
Add the 1-line guard in recordWorker, plus tests for previously
untested paths:
- TestNewCompressorReturnsNil — exercises the nil-from-factory case
- TestCompressorCloseErrorSurfaces — verifies Close errors from
worker defers propagate via the first-error-wins atomic
- TestAppendItemEdgeCases — documents AppendItem()/[]byte{}/multi-part
contract behavior
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The interface is more general than just compression — it's a per-record byte transformation. Typical implementations compress, but raw + CRC wrapping, encryption, or any other framing also fit. Naming the type "Compressor" misled readers into thinking the contract was specifically about compression. Rename surface: - Compressor interface -> RecordEncoder - WriterOptions.NewCompressor -> WriterOptions.NewRecordEncoder - Writer.newCompressor -> Writer.newRecordEncoder - worker local compressor -> encoder - xorCompressor (test) -> xorEncoder - failingCompressor (test) -> failingEncoder - failingCompress (test) -> failingEncode - closingFailCompressor -> closingFailEncoder - zstdBenchCompressor (bench) -> zstdBenchEncoder - TestNewCompressorReturnsNil -> TestNewRecordEncoderReturnsNil - TestCompressorCloseErrorSurfaces -> TestRecordEncoderCloseErrorSurfaces Also reworks the doc to reflect the broader role: "transforms one record ... before it's written to disk. Typical implementations compress (e.g. zstd, which carries its own per-frame checksum) or add integrity (e.g. raw + a trailing CRC32C, for compressors that don't include a checksum)." The zstd package's Compressor type is unchanged — it's a concrete type whose entire purpose is zstd compression, so the specific name is correct there. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // On-disk flag bits (uint8 at trailer offset 5). Only one flag is currently | ||
| // defined; the remaining bits are reserved for future use. | ||
| const ( | ||
| flagContentHash uint8 = 1 << 0 |
There was a problem hiding this comment.
I think this flagContentHash also might need a doc update.
Flaggin it for reference
There was a problem hiding this comment.
Acked, will include flagContentHash semantics in the design-doc update PR.
- TestPassthroughStoresVerbatim is now table-driven over
ItemsPerRecord ∈ {1, 2, 3} and uses decodeIndex to extract per-record
payloads, so it correctly verifies verbatim storage past the trailing
FOR offset index when items span multiple records.
- RecordEncoder.Encode doc now states explicitly that Encode must not
modify the input slice or return a slice that aliases it, since the
writer reads the input in parallel for content hashing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Third in a series of PRs checking in the packfile implementation for full-history RPC v2.
This PR adds the writer:
packfile.go— on-disk format constants (magic, version, 76-byte trailer size, content-hash flag bit,ErrContentHashMismatch)writer.go—Writerwith a unified worker pipeline for compression/hashing; caller-supplied per-recordCompressorand optional SHA‑256 content hashingwriteback_linux.go/writeback_other.go— Linux-onlysync_file_range(SYNC_FILE_RANGE_WRITE)hint for background page flushing; no-op elsewherewriter_test.go— validation, lifecycle, trailer / index format, passthrough, content-hash extract parity, and error propagationwriter_bench_test.go— end-to-end write throughput across representative configurationsAlso tightens
zstd.Compressor.Closeandzstd.Decompressor.Closeto returnerrorso they satisfyio.Closerdirectly (always return nil; the error return is purely for interface conformance).Depends on the intpack + offset-index codec already merged in #649 and the zstd wrapper in #650.
Full roundtrip coverage (reader-side decode of record contents) arrives with the reader in the next PR.
Key design points
WriterOptions.NewCompressor func() Compressorreturns a per-workerCompressor(Encode([]byte) ([]byte, error)+io.Closer). The interface'sCloselets stateful codecs (e.g. zstd's CGo context) release native resources deterministically when a worker exits or the writer finalizes, rather than waiting on GC finalizers. Ifnil, records are written as-is (passthrough). This supports both zstd compression and the rocksdb-passthrough use case (where ledgers are pre-compressed and stored verbatim).Formatidentifier:WriterOptions.Formatis a caller-assigneduint32written to the trailer. The library does not interpret it; readers use it to dispatch to the matching decompressor.ContentHash boolenables a chunked SHA‑256 over the logical item stream, independent of compression.ContentHashExtract func([]byte) ([]byte, error)lets callers normalize already-encoded items before hashing — so a packfile-built file and a rocksdb-built file produce the same content hash over the same logical ledger stream.Concurrencysets the number of parallel worker goroutines (0 defaults to 1). Each worker compresses (if a compressor is supplied) and hashes (ifContentHashis enabled, via an inner hash sub-goroutine that overlaps with compress). Records are dispatched in ordinal order;runWriterreorders worker results and appends to the file in order. Pure passthrough writers (no compressor, no content hash) bypass the pipeline and write records directly from the main goroutine.atomic.Pointer[error]so both the pipeline goroutine and the main goroutine observe it with proper happens-before; first-error-wins. This prevents a data-loss window whereAppendItemcould keep enqueuing records after a background write/compress error.Compressor.Closeerrors from worker defers are funneled through the same atomic.Test plan
TestCreateValidation— input validation (Concurrency / BytesPerSync / ItemsPerRecord)TestCreateFailsIfFileExists,TestCloseWithoutFinishRemovesFile,TestFinishTwice,TestAppendItemAfterFinish,TestCloseIdempotent,TestEmptyPackfile,TestAppendItemEdgeCases(no-args, empty item, multi-part concatenation)TestTrailerFields,TestOffsetIndexDecodes,TestAppDataPreserved,TestItemsPerRecordOne,TestOverwriteReplacesTestPassthroughStoresVerbatim— items stored byte-identical whenNewCompressoris nil;TestNewCompressorReturnsNil— caller's factory returning nil doesn't panicTestContentHashParity(different worker counts produce same hash),TestContentHashExtract(extract hook),TestContentHashExtractConcurrent(extract hook with multiple workers),TestConcurrentContentHashExtractError(extract errors surface from worker goroutines)TestSerialCompressErrorSurfaces,TestConcurrentCompressErrorSurfaces,TestCompressorCloseErrorSurfaces(Close errors from worker defers surface from Finish; run under-race)/dev/full(Linux):TestConcurrentWriteErrorSurfaces,TestSerialWriteErrorSurfaces(run under-race)🤖 Generated with Claude Code