Skip to content

feat: end-to-end streaming pack parser#1

Merged
ivarvong merged 4 commits into
mainfrom
streaming-pack-parser
Apr 26, 2026
Merged

feat: end-to-end streaming pack parser#1
ivarvong merged 4 commits into
mainfrom
streaming-pack-parser

Conversation

@ivarvong
Copy link
Copy Markdown
Owner

What

Replaces the buffered pack-ingestion pipeline with a fully streaming one. The root cause was pack_binary + object_list living simultaneously in the heap — 2–3× the pack size, enough to OOM on large repos (esp-idf, linux, the adafruit CircuitPython bundle).

Memory model after this PR

Phase Before After
HTTP response Full body in binary One pkt-line at a time
Pack bytes Full binary in heap One chunk (~4 KB)
Object inflate Full compressed buffer Zlib port open across ingest/2 calls
Object storage IO.iodata_to_binary + zlib.compress Streaming deflate direct to store
Peak (opencode, 135 MB pack) ~300–400 MB ~store size only

Changes

Phase 1 — HTTP streaming transport

  • PktLine.Decoder: incremental decoder for arbitrary network chunks
  • HTTP.do_fetch / do_ls_refs: Req into: streaming callback feeds each chunk through the decoder and a fetch state machine; no response body is ever accumulated

Phase 2 — Pack.StreamParser

  • Forward-only per-object decode; zlib port kept open across ingest/2 calls (Phase 3)
  • Adler32 computed incrementally — locate_boundary never calls IO.iodata_to_binary
  • OFS/REF delta resolution through the object store; offset_to_sha and sha_to_depth maps
  • Key bug fixed: parse_next was discarding state2 (current + write_handle) on :need_more, causing O(n²) re-processing of the entire buffer on every new byte for large objects
  • Feed cap: inflate_upper_bound(expected_size) bytes per do_inflate call

Phase 3+ — Streaming object-store writes

  • New ObjectStore protocol callbacks: open_write/3, write_chunk/3, close_write/2, cancel_write/2
  • Memory store: streaming deflate port; raw content never coexists with compressed form
  • Disk store: temp file + streaming deflate + atomic rename
  • Promisor: delegates to inner Memory cache
  • Non-delta objects (blob/tag) use streaming writes; commits/trees go through the traditional path so their content lands in raw_cache for fast delta resolution

Phase 4 — Adversarial hardening

  • max_object_bytes, max_inflate_ratio (zip-bomb defence), max_delta_depth, max_objects, deadline
  • All enforced per-object during streaming parse, not as a post-parse check

Wiring

  • HTTP.do_fetch, exgit.ex fetch_into, FS.prefetch_blobs, FS.do_fetch_commit_and_root_tree: all pass object_store: so every fetch path uses the streaming parser

Bug fix

  • Tree.decode: removed .gitmodules from reserved names — it is a legitimate file in any repo using submodules; the URL-injection concern only applies if we process submodule config, which we do not. This was crashing prefetch on adafruit/Adafruit_CircuitPython_Bundle.

grep optimisation

  • compile_grep_pattern: case-sensitive literals (no PCRE metacharacters) compile to {:literal, needle}
  • scan_once/2: routes literals through :binary.matches (Boyer-Moore-Horspool) — 9.5× faster per-blob scan; adafruit bundle greps from N/A to 2 ms, opencode literals from ~200 ms to ~130 ms

Tests added

  • Pack.StreamParserTest — correctness, chunked, byte-at-a-time, memory bound (18 tests)
  • Pack.StreamParserAdversarialTest — zip bomb, deep delta chain, oversized, malformed, checksum, OFS-before-pack (17 tests + 1 property)
  • Pack.StreamParserStreamingWriteTest — Memory/Disk streaming write correctness (9 tests)
  • PktLine.DecoderTest — incremental decode parity (11 tests)
  • HTTP streaming tests — 5000-packet reassembly, channel-3 abort, memory regression guard (3 tests)

626 tests, 0 failures.

Not in this PR

  • FS.prefetch warm option: decompressing blobs into repo.blob_cache during prefetch so repeated greps avoid the 140 ms zlib.uncompress tax. The design is agreed (state on the struct, caller opts in); left for a follow-up.

Phase 1 — HTTP streaming transport
  - New PktLine.Decoder: incremental stateful pkt-line decoder that
    accepts arbitrary network chunks, never materialises the full
    response body. Eliminates the O(response-size) intermediate binary.
  - HTTP.do_fetch / do_ls_refs: rewired from post_upload_pack (buffer
    everything) to stream_upload_pack (Req into: callback → decoder →
    handler). Memory bounded by one pkt-line at a time.

Phase 2 — Streaming pack parser (Pack.StreamParser)
  - Forward-only per-object decode: type/size varint + OFS/REF delta
    extras decoded, then incremental zlib inflate (port kept open across
    ingest/2 calls), then object written to store immediately.
  - offset_to_sha map for O(1) OFS_DELTA base lookup.
  - Rolling 20-byte SHA-1 delay ensures sha_tail == pack trailer at
    finalize/1 without needing to buffer the trailing checksum.
  - Critical bug fixed: parse_next was discarding state2 (current +
    write_handle) on :need_more, causing O(n²) re-processing of the
    full buffer on every new byte for large objects.
  - Feed cap: inflate_upper_bound(expected_size) bytes per do_inflate
    call prevents O(buf_size × num_objects) zlib work.
  - raw_cache: 64 MB budget of decompressed content for fast OFS/REF
    delta base resolution, eliminating repeated zlib.uncompress +
    Object.decode + Object.encode round-trips that caused 226× slowdown
    vs Pack.Reader.

Phase 3 — Streaming inflate (port open across ingest calls)
  - Zlib inflate port opened on first @zlib_min bytes, fed incrementally
    on each subsequent ingest. Eliminates the O(compressed_object_size)
    buffer-accumulation spike.
  - Adler32 computed incrementally (inflate_adler field) for boundary
    detection without IO.iodata_to_binary on the full inflate output.
  - complete_inflate defers boundary detection until adler32 trailer
    has arrived in the buffer; returns {:ok, state} (port still open)
    rather than :need_more so the loop preserves state across ingests.

Phase 3+ — Streaming object-store writes
  - New ObjectStore protocol callbacks: open_write/3, write_chunk/3,
    close_write/2, cancel_write/2.
  - Memory store: streaming deflate port accumulates compressed chunks
    as inflate output arrives; raw content never coexists with the
    compressed form in the heap.
  - Disk store: temp file under objects/tmp/, streaming deflate direct
    to fd, atomic rename at close_write.
  - Promisor: delegates to inner Memory cache.
  - StreamParser: non-delta objects (blob/tag) use streaming write;
    commits/trees use traditional path so their raw content lands in
    raw_cache for fast delta base resolution.

Phase 4 — Adversarial hardening
  - max_object_bytes: rejects oversized objects before inflating.
  - max_inflate_ratio: zip-bomb defence (uncompressed/compressed ratio).
  - max_delta_depth: caps OFS/REF delta chain length (default 50).
  - max_objects: rejects absurd pack header counts before parsing.
  - deadline: monotonic cutoff; ingest/2 returns :deadline_exceeded.
  - sha_to_depth map for REF_DELTA depth tracking across pack boundaries.

Wiring
  - HTTP.do_fetch: passes object_store: opt to stream_upload_pack;
    StreamParser.finalize result threaded back via %{store: updated}.
  - exgit.ex fetch_into: passes object_store: repo.object_store to
    Transport.fetch; handles {:ok, <<>>, %{store: updated}} result.
  - FS.prefetch_blobs / do_fetch_commit_and_root_tree: pass
    object_store: so all prefetch paths use the streaming parser.

Bug fix
  - Tree.decode: removed .gitmodules from reserved names. It is a
    legitimate file in any repo using submodules. The URL-injection
    concern only applies if we process submodule config; we do not.
    Repos like adafruit/Adafruit_CircuitPython_Bundle were crashing
    on prefetch as a result.

grep optimisation
  - compile_grep_pattern: detects case-sensitive literal patterns
    (no PCRE metacharacters) and returns {:literal, needle}.
  - scan_once/2: routes literals through :binary.matches
    (Boyer-Moore-Horspool) instead of Regex.scan — 9.5× faster per
    blob scan on typical code-search patterns.
  - has_regex_metacharacters?/1: pure predicate, compiled once as a
    module attribute regex.

Tests
  - Pack.StreamParserTest (18 tests): correctness, chunked ingest,
    byte-at-a-time, memory bound.
  - Pack.StreamParserAdversarialTest (17 tests + 1 property): zip
    bomb, deep delta chain, oversized objects, malformed headers,
    checksum corruption, unknown type, OFS before pack start.
  - Pack.StreamParserStreamingWriteTest (9 tests): Memory and Disk
    store streaming write correctness and memory bounds.
  - PktLine.DecoderTest (11 tests): incremental decode parity with
    PktLine.decode_all.
  - HTTP streaming tests (3 tests): 5000-packet reassembly, channel-3
    abort, memory regression guard.

Memory impact (anomalyco/opencode, 135 MB pack, 85 815 objects)
  Before: full pack binary + full object list in heap ~= 2-3× pack
  After:  one chunk (~4 KB) + compressed store ~= store size only
…current numbers

- TL;DR table updated: opencode prefetch 57s → 6.4s, grep 451ms → 130ms (literal)
- Add adafruit/Adafruit_CircuitPython_Bundle as a new fixture
- Document streaming pack parser architecture and memory model
- Document parallelism failure mode (4.5× slower; why GC pressure kills it)
- Document literal grep fast path (:binary.matches, Boyer-Moore, 9.5×)
- Document .gitmodules bug fix and why it matters
- Move decompressed-blob cache and chunked parallel grep to "not doing"
  with explicit design rationale (state on struct only, no ETS/Process)
- Add memory model summary table
- Remove outdated candidate optimizations that shipped this cycle
Format:
  - lib/exgit/fs.ex: extra leading space on compile_grep_pattern
  - lib/exgit/object_store/disk.ex: close_write match arm layout

Credo:
  - stream_parser.ex: alias order (Object before Pack), TODO -> follow-up
  - http.ex: extract finalize_stream_parse/1 and finalize_pack_iolist/1
    to bring do_fetch cyclomatic complexity from 13 to within the 12 cap
  - stream_parser_streaming_write_test.exs: alias order, remove {Object}
  - stream_parser_test.exs: alias order, remove unused ObjectStore alias

Test warnings (--warnings-as-errors):
  - stream_parser_adversarial_test.exs: remove unused encode_object_raw/2
    and pack_with_declared_size/3 helpers
  - stream_parser_test.exs: remove default value from parse_chunked/2
    (always called with explicit chunk_size; default was never used)

Docs:
  - CLAUDE.md: document pre-commit checklist, CI steps, common Credo
    traps, architecture invariants, and test tags
…ing write

pattern_match — fs.ex:1154
  pattern_repr/1 had a {:literal, s} clause but the function is only ever
  called with the original user pattern (binary | %Regex{}), not the
  compiled {:literal, binary()} form. Remove the dead clause.

unmatched_return — disk.ex:554,563
  :file.close/1 and File.rm/2 inside close_write return {:ok} | {:error,_}
  but results were discarded. Add _ = to silence intentional ignoring.

improper_list_constr — stream_parser.ex:573
  [iolist | binary()] builds an improper list; the cons tail must be a list.
  Change [inflate_out | chunk_bin] to [inflate_out, chunk_bin].

pattern_match on with-else — stream_parser.ex:773
  Dialyzer traced through the with chain and inferred the else block only
  ever receives {:error, _} tuples (never a bare :error atom). The original
  else had {:error, _} = err -> err and :error -> fallback. Dialyzer flagged
  {:error, _} as unreachable (it was right: the bare :error clause was dead).
  Simplify to else err -> err.

docs: CLAUDE.md — add Dialyzer to required pre-commit checklist with the
four warning classes seen in this codebase and how to fix each.
@ivarvong ivarvong merged commit 6fa32e8 into main Apr 26, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant