Skip to content

feat: streaming FromRow mapping (stream_as) — constant-memory struct-mapped queries (#91)#94

Merged
StefanSteiner merged 10 commits into
tableau:mainfrom
StefanSteiner:stream-as-from-row
Jun 1, 2026
Merged

feat: streaming FromRow mapping (stream_as) — constant-memory struct-mapped queries (#91)#94
StefanSteiner merged 10 commits into
tableau:mainfrom
StefanSteiner:stream-as-from-row

Conversation

@StefanSteiner
Copy link
Copy Markdown
Contributor

Summary

Closes #91.

fetch_all_as::<T>() was the only public API wiring FromRow into typed
struct mapping, but it calls fetch_all first — collecting all rows into a
Vec<Row>
before any mapping occurs (memory O(total rows)), making it
unusable for large or unbounded result sets. The streaming path
(execute_query + next_chunk) only yielded untyped Rows, and
RowAccessor::{new,build_indices} are pub(crate), so callers couldn't wire a
#[derive(FromRow)] struct into the chunk loop themselves.

This PR adds Form 5: streaming FromRow mapping — a lazy, constant-memory
typed iterator/stream:

// Sync — lazy iterator, O(chunk_size) memory
for row in conn.stream_as::<User>("SELECT id, name FROM users")? {
    let user = row?;
}

// Async — impl Stream<Item = Result<T>>
let stream = conn.stream_as::<User>("SELECT id, name FROM users");
tokio::pin!(stream);
while let Some(row) = stream.next().await {
    let user = row?;
}

Design

The central constraint: RowAccessor<'a> holds indices: &'a HashMap<&'a str, usize> whose keys borrow from the ResultSchema. A lazy iterator must
persist the index map across next() calls, which would make it own both the
schema and a map borrowing from it — a self-referential struct, not expressible
in safe Rust.

Resolution: an owned-key index map (HashMap<String, usize>) for the
streaming path, exposed via a small crate-internal Indices<'a> enum on
RowAccessor:

  • Indices::Borrowed(&HashMap<&str, usize>) — the existing zero-alloc
    fetch_*_as hot path, unchanged.
  • Indices::Owned(&HashMap<String, usize>) — the streaming path, whose map the
    iterator/stream owns outright.

Both look up by &str via HashMap's Borrow bound, so the get/get_opt
getters are agnostic and their error shapes (Missing/Null/TypeMismatch)
are byte-for-byte unchanged. No unsafe.

  • Sync: TypedRowIterator<'conn, T> (crate-private; stream_as returns
    impl Iterator<Item = Result<T>>) mirrors the existing RowIterator
    chunk-buffering pattern.
  • Async: async_stream::try_stream! awaits next_chunk() in a loop;
    the query string is .to_owned() so nothing borrows the &str arg across an
    await.

The column-name → index map is built exactly once (on the first chunk,
guarded by Option::is_none) and reused for every row across all chunks, so
per-row mapping is O(1) in column count and total memory is O(chunk_size).

Acceptance criteria (issue #91)

  • Connection::stream_as::<T>(query) returns a lazy iterator of Result<T>
  • Index map built once per query (not per row or per chunk)
  • Memory stays O(chunk_size) regardless of total result-set size
  • AsyncConnection::stream_as returns impl Stream<Item = Result<T>>
  • Documented in docs/ROW_MAPPING.md as Form 5

Error semantics

Errors surface in two places and robust callers handle both:

  • The outer Result (sync) carries failures detected while opening the
    stream — transport/connection errors, and on the gRPC transport SQL
    parse/server errors. On the default TCP transport the query streams
    lazily, so a SQL error such as a missing table is reported as the first
    yielded item
    , not by the outer Result. (The async stream is fully lazy,
    so its submission error is always the first Err item.)
  • Each item is itself a Result<T> — per-row mapping failures (missing column,
    type mismatch, NULL in a non-Option field) or a transport error hit while
    fetching a later chunk.

This is documented on both methods and in the Form 5 doc.

Dependencies

  • async-stream = "0.3" (new) — powers the async try_stream!.
  • futures-core = "0.3" (promoted from transitive to a direct dep) — provides
    the Stream trait named in the public return type.
  • futures = "0.3" (dev-dependency only) — StreamExt/TryStreamExt for
    draining streams in tests.

Testing

11 new integration tests (sync + async parity): happy path matches
fetch_all_as, multi-chunk crossing the 65 536-row chunk boundary (140 000
rows — proving build-once-then-reuse across chunks), submit error, per-row map
error, empty result, and lenient extra-column. Plus owned-path unit tests in
row_accessor.rs.

Full verification (all clean):

  • cargo clippy --workspace --all-targets --all-features -- -D warnings
  • cargo doc -p hyperdb-api --no-deps --all-features with RUSTDOCFLAGS=-D warnings
  • cargo test --workspace (debug and release): 1331 passed, 0 failed
  • Release-mode workspace build of all targets + examples; async_usage,
    transactions, arrow, threaded_inserter examples run end-to-end (exit 0).

Review notes

The branch went through an adversarial review pass that caught and fixed three
issues before this PR:

  1. The multi-chunk tests originally used 5 000 rows < the 65 536-row chunk size,
    so they never actually crossed a chunk boundary (the cross-chunk path was
    untested despite the test name). Bumped to 140 000 rows.
  2. The sync iterator silently truncated (and the async stream silently skipped a
    chunk) if a schema were ever absent after a non-empty chunk. Both now fall
    back to an empty index map (→ per-row Missing error), matching
    fetch_all_as's unwrap_or_default().
  3. A stale rustdoc intra-doc link to a nonexistent Rowset::typed_rows method
    was removed.

- Multi-chunk tests now insert 140K rows (> 2x the 65536-row chunk
  size) so they genuinely cross a chunk boundary and exercise the
  build-once-then-reuse path. At 5000 rows the whole result fit in a
  single chunk and the cross-chunk path was untested.
- Schema-absent fallback now uses unwrap_or_default() (empty map ->
  per-row Missing error) in both sync and async paths, matching
  fetch_all_as, instead of sync silently truncating / async skipping
  the chunk.
- Remove broken intra-doc link to nonexistent Rowset::typed_rows.
…g forms

Runnable companion to docs/ROW_MAPPING.md: self-hosts a HyperProcess,
seeds the products table, and maps its rows via all five forms (manual
streaming, named access, hand-written FromRow, derive(FromRow), and
streaming stream_as), each printing identical output. Registered in
Cargo.toml and cross-referenced from the doc intro.
…n-all-examples scripts

- row_mapping_forms now also demonstrates AsyncConnection::stream_as
  (impl Stream) alongside the sync iterator, on a small current-thread
  Tokio runtime; all six sections print identical output.
- Register the example in run_all_examples.sh, run_all_examples.ps1, and
  run_examples_wsl.sh so it runs in the full example sweep.
run_examples_wsl.sh listed ~17 examples that no longer exist (reading_data,
struct_mapping, query_builder, geography, catalog_and_schema, the sea_query
feature example, etc.) — leftovers from a prior example layout that would
fail every run. Aligned its list (and the phantom feature-examples block) with
the canonical run_all_examples.sh set. Also added the two examples
run_all_examples.ps1 was missing (async_parity_smoke, prepared_statements).

All three scripts now run the identical set of registered hyperdb-api
examples; benchmarks and the feature-gated compile_time_validation example
remain intentionally excluded.
@StefanSteiner StefanSteiner changed the title chore: streaming FromRow mapping (stream_as) — constant-memory struct-mapped queries (#91) feat: streaming FromRow mapping (stream_as) — constant-memory struct-mapped queries (#91) Jun 1, 2026
@StefanSteiner StefanSteiner merged commit 3327fc0 into tableau:main Jun 1, 2026
11 checks passed
@StefanSteiner StefanSteiner mentioned this pull request Jun 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: streaming FromRow mapping — fetch_rows_as / stream_as for constant-memory struct-mapped queries

1 participant