From 8ab297e1354b3e81c65bd87c73f32c17a582b2b3 Mon Sep 17 00:00:00 2001 From: Decoder Date: Tue, 12 May 2026 03:06:22 -0700 Subject: [PATCH 1/2] pxf: TableReader streaming + Scan/BindRow per-row binding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fourth PR of the v0.72-v0.75 cpp catch-up. UnmarshalFull materializes every @table row into Result::Tables(); this PR adds the streaming alternative for the CSV-replacement workload @table was designed to serve — working-set memory bounded by the largest single row, not the full table (draft §3.4.4). New : - TableReader::Create(std::istream*) — consumes leading directives and the @table TYPE (cols) header; reader is positioned at the first row. Header capped at 64 KiB (kDefaultHeaderMaxBytes) to fail-fast on misuse (a non-@table input shouldn't OOM). - Type() / Columns() / Directives() accessors. - Next(TableRow*) reads one row at a time. Per-row arity and v1 cell-grammar checks happen at consume time, not deferred to EOF — matches the spec's streaming-consumer requirements. Errors are sticky. - Scan(Message*) is Next + BindRow. - Tail() returns the buffered + remaining bytes as a fresh std::istream so callers can chain a second Create() for multi-@table documents. - BindRow(msg, columns, row) is the exported per-row binder used by Scan and by callers iterating Result::Tables()[i].rows from the materializing path. Strategy: format-and-reparse — render the row as a synthetic PXF body (` = ` per non- std::nullopt cell) and run through the standard Unmarshal pipeline. This reuses every branch of the existing decoder (WKT timestamps/durations, wrapper nullability, enum-by-name, pxf.required / pxf.default, oneof) instead of growing a parallel Value→FieldDescriptor switch. SkipValidate avoids re-running the reserved-name check per row. Implementation: - Byte-level row scanner mirrors protowire-go's scanHeaderEnd / findNextRow / findMatchingParenSafe — string- aware (strings, triple-strings, b"..." literals, # comments, // comments, /* */ comments) so embedded parens / @table substrings inside literals don't trip the scan. - Row parsing wraps the row bytes in a synthetic `@table _.Row (c1,c2,...)\n` document and feeds it to the AST parser, reusing parseTableRow's arity check and cell-shape validation. - Pull chunk size 4 KiB matches the Go reference; tested across pull boundaries with 50× 200-byte rows. Tests (20 new in pxf_table_reader_test.cc, 218 total): - Header parsing: happy path, no-@table error, empty input, null stream, leading directives preserved, 64 KiB header cap - Row iteration: ordered traversal, zero-rows-Done, expected cell shapes (Int/String/Bool/Null), three-state cells, sticky arity error, 50-row pull-boundary stress, parens-in- strings, comments between rows - Tail chaining to a second table - BindRow happy path, Scan equivalence, absent-cell leaves default, column/cell mismatch error, unknown-column error --- CHANGELOG.md | 33 ++ CMakeLists.txt | 3 +- include/protowire/pxf.h | 3 +- include/protowire/pxf/table_reader.h | 142 ++++++++ src/pxf/table_reader.cc | 490 +++++++++++++++++++++++++++ test/CMakeLists.txt | 2 +- test/pxf_table_reader_test.cc | 373 ++++++++++++++++++++ 7 files changed, 1043 insertions(+), 3 deletions(-) create mode 100644 include/protowire/pxf/table_reader.h create mode 100644 src/pxf/table_reader.cc create mode 100644 test/pxf_table_reader_test.cc diff --git a/CHANGELOG.md b/CHANGELOG.md index a3528a4..3f3537b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,39 @@ format changes. ### Added +- **`TableReader` streaming `@table` consumption + `Scan` / `BindRow` + per-row binding** (draft §3.4.4). `UnmarshalFull` materializes + every row of an `@table` directive into `Result::Tables()`; that + works for small datasets and breaks for the CSV-replacement + workload `@table` was designed for. New + `` exposes: + - `TableReader::Create(std::istream*)` — consumes any leading + directives and the `@table TYPE ( cols )` header, returns a + reader positioned at the first row. Header is capped at 64 KiB + (`kDefaultHeaderMaxBytes`) to fail-fast when a non-`@table` + document is handed in by mistake. + - `Type()` / `Columns()` / `Directives()` accessors. + - `Next(TableRow*)` pulls one row at a time from the underlying + stream; working-set memory is bounded by the largest single row, + not the full table. Per-row arity and v1 cell-grammar checks + happen at consume time (not deferred to EOF), matching the + spec's streaming-consumer requirements. + - `Scan(Message*)` — convenience: `Next` + `BindRow`. + - `Tail()` — returns the unconsumed buffer plus the remaining + underlying source as a fresh `std::istream`, so callers can chain + a second `Create()` for documents with multiple `@table` + directives. + - `BindRow(Message*, columns, row)` — exported helper for callers + iterating `Result::Tables()[i].rows` from the materializing + path. Strategy is format-and-reparse: render the row as a + synthetic PXF body (` = ` per non-`std::nullopt` cell) + and run it through `Unmarshal`. This reuses every branch of the + existing decoder — WKT timestamps / durations, wrapper + nullability, enum-by-name resolution, `pxf.required` / + `pxf.default`, oneof handling — instead of growing a parallel + Value→FieldDescriptor switch. `SkipValidate` avoids re-running + the reserved-name check per row. + - **`Result::Directives()` and `Result::Tables()` accessors.** The fast-path direct decoder now populates the document-root directive list and `@table` directive list on `Result` during diff --git a/CMakeLists.txt b/CMakeLists.txt index c25437e..383217d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,7 +82,8 @@ endif() _pwx_existing_sources(_pxf_srcs src/pxf/lexer.cc src/pxf/ast.cc src/pxf/parser.cc src/pxf/decode_fast.cc src/pxf/decode.cc src/pxf/encode.cc src/pxf/format.cc src/pxf/wellknown.cc - src/pxf/annotations.cc src/pxf/result.cc src/pxf/options.cc src/pxf/schema.cc) + src/pxf/annotations.cc src/pxf/result.cc src/pxf/options.cc src/pxf/schema.cc + src/pxf/table_reader.cc) _pwx_existing_sources(_sbe_srcs src/sbe/annotations.cc src/sbe/template.cc src/sbe/codec.cc src/sbe/marshal.cc src/sbe/unmarshal.cc src/sbe/view.cc diff --git a/include/protowire/pxf.h b/include/protowire/pxf.h index aa02fc5..50c3422 100644 --- a/include/protowire/pxf.h +++ b/include/protowire/pxf.h @@ -26,7 +26,8 @@ #include "protowire/pxf/options.h" #include "protowire/pxf/parser.h" // Document, Parse #include "protowire/pxf/result.h" -#include "protowire/pxf/schema.h" // ValidateDescriptor, Violation +#include "protowire/pxf/schema.h" // ValidateDescriptor, Violation +#include "protowire/pxf/table_reader.h" // TableReader, BindRow namespace protowire::pxf { diff --git a/include/protowire/pxf/table_reader.h b/include/protowire/pxf/table_reader.h new file mode 100644 index 0000000..e9e4eb5 --- /dev/null +++ b/include/protowire/pxf/table_reader.h @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +// +// Streaming consumption for the `@table` directive (draft §3.4.4). +// +// `UnmarshalFull` materializes an entire `@table` directive — every row +// — into `Result::Tables()`. That works for small datasets and breaks +// for the CSV-replacement workload `@table` was designed to serve. +// `TableReader` provides the streaming alternative: it pulls bytes from +// a `std::istream` on demand and yields one `TableRow` per `Next()` +// call, with working-set memory bounded by the size of the largest +// single row. +// +// Per draft §3.4.4: streaming consumers MUST enforce per-row arity and +// the v1 cell-grammar rule on each row as it is consumed (not deferred +// to end-of-input) and MUST yield rows in source order. Both invariants +// fall out of this implementation: the byte-level row scanner produces +// one (...) slice at a time, and the same `Parse()` used by the +// materializing path validates it. +// +// Convenience: `Scan(msg)` reads the next row and binds its cells to +// `msg`'s fields by column name; `BindRow` is exported for callers that +// iterate the materializing path's `Result::Tables()[i].rows`. + +#pragma once + +#include +#include +#include +#include + +#include + +#include "protowire/detail/status.h" +#include "protowire/pxf/ast.h" // Directive, TableRow + +namespace protowire::pxf { + +// Default cap on the @table header (leading directives plus the +// `@table TYPE ( cols )` declaration). Real headers are tiny — a few +// hundred bytes at most. The cap exists to fail-fast on misuse: a +// TableReader pointed at a multi-gigabyte document with no `@table` +// directive shouldn't OOM trying to find one. +constexpr int kDefaultHeaderMaxBytes = 64 * 1024; + +// Streaming row reader for a single `@table` directive. +// +// A TableReader is positioned at the first row after `Create()` +// returns. Call `Next(&row)` in a loop until `Done()` returns true; +// the table's row sequence is exhausted at that point. Any parse or +// I/O error makes the reader sticky: subsequent `Next` / `Scan` calls +// return the same Status. +// +// For documents containing multiple `@table` directives, call +// `Create()` again on `tr->Tail()` to read the next table. +// +// A TableReader is NOT safe for concurrent use. +class TableReader { + public: + // Construct a TableReader and consume the leading directives and the + // `@table TYPE ( cols )` header. `src` must outlive the reader. + // Returns a non-OK Status if the input ends before any `@table` + // directive is seen (the message contains "no @table directive in + // stream") or on a parse / I/O error. + static StatusOr> Create(std::istream* src); + + // Row message type declared by the @table header (e.g. "trades.v1.Trade"). + const std::string& Type() const { return type_; } + + // Column field names declared by the @table header, in source order. + const std::vector& Columns() const { return columns_; } + + // Side-channel directives (`@` / `@entry` / etc., NOT `@type` + // or `@table`) that appeared before the `@table` header. Stable for + // the lifetime of the reader. + const std::vector& Directives() const { return directives_; } + + // Reads the next row into `*out`. Returns OK on success; returns OK + // and sets `done_` when the row sequence is exhausted (callers check + // `Done()` to distinguish "got a row" from "EOF"). Returns a non-OK + // Status on parse / I/O error. + // + // After EOF or error, all subsequent calls return the same sticky + // result. + Status Next(TableRow* out); + + // Reads the next row and binds its cells to fields of `msg` by column + // name. Equivalent to `Next` + `BindRow`. At EOF, returns OK and sets + // `done_` — callers check `Done()`. + Status Scan(google::protobuf::Message* msg); + + // True once the row sequence has been exhausted. + bool Done() const { return finished_; } + + // Sticky error (or OK if none). + const Status& StickyStatus() const { return err_; } + + // Returns a fresh istream-derived source that yields the bytes the + // reader buffered but didn't consume, followed by the remaining + // bytes from the underlying source. Use to chain a second + // `Create()` for documents with multiple `@table` directives. + // + // MUST only be called after `Next` has reported `Done()`. Calling + // earlier returns bytes the current reader still intends to consume, + // which will desync the next reader. + std::unique_ptr Tail(); + + private: + TableReader() = default; + + Status ReadHeader(); + Status Pull(size_t n); + + std::istream* src_ = nullptr; + std::string pending_; // bytes pulled from src_ but not yet consumed + bool src_eof_ = false; + bool finished_ = false; + Status err_; + std::string type_; + std::vector columns_; + std::vector directives_; +}; + +// BindRow renders the row's cells as a synthetic PXF body and runs it +// through the standard `Unmarshal` pipeline against `msg`. The +// `columns` slice MUST have the same length as `row.cells`. +// +// Cell-state semantics (mirrors draft §3.4.4): +// - std::nullopt cell (empty cell) — field absent. (pxf.default) is +// applied if declared on the field; (pxf.required) errors if +// neither default nor value is present. +// - *NullVal — field cleared, per draft §3.9 (clears optional / +// wrapper / oneof; rejects on non-nullable scalars). +// - any other Value — field set to that value. +// +// Exported so callers iterating `Result::Tables()[i].rows` can reuse +// the same logic. +Status BindRow(google::protobuf::Message* msg, + const std::vector& columns, + const TableRow& row); + +} // namespace protowire::pxf diff --git a/src/pxf/table_reader.cc b/src/pxf/table_reader.cc new file mode 100644 index 0000000..356b343 --- /dev/null +++ b/src/pxf/table_reader.cc @@ -0,0 +1,490 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +#include "protowire/pxf/table_reader.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "protowire/detail/base64.h" +#include "protowire/pxf.h" // Unmarshal, UnmarshalOptions +#include "protowire/pxf/parser.h" + +namespace protowire::pxf { + +namespace { + +// Chunk size for std::istream pulls. Larger reduces syscall pressure; +// smaller bounds per-row peak buffer occupancy. 4 KiB matches typical +// row sizes and the Go reference's streamPullSize. +constexpr size_t kStreamPullSize = 4096; + +bool IsIdentPart(char c) { + return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_'; +} + +// SkipResult is the byte offset past a string / bytes literal / comment +// starting at `i`. Returned value semantics mirror the Go reference's +// skipStringOrComment: +// - returns `i` unchanged if the byte at `i` is not an opener +// - returns -1 if the construct is incomplete (caller pulls more) +// - sets `*err` to non-OK for malformed constructs that can't be +// fixed by more bytes (unterminated single-line string already +// containing a newline) +// +// Strings, triple-quoted strings, bytes literals (`b"..."`), `#` +// line comments, `//` line comments, and `/* ... */` block comments +// are all handled. +int SkipStringOrComment(std::string_view input, int i, Status* err); + +int SkipSimpleString(std::string_view input, int i, Status* err) { + int j = i + 1; + while (j < static_cast(input.size())) { + char c = input[j]; + if (c == '\\') { + if (j + 1 >= static_cast(input.size())) return -1; + j += 2; + continue; + } + if (c == '"') return j + 1; + if (c == '\n') { + *err = Status::Error("pxf: unterminated string literal"); + return 0; + } + ++j; + } + return -1; +} + +int SkipTripleString(std::string_view input, int i) { + int j = i + 3; + int n = static_cast(input.size()); + while (j + 2 < n) { + if (input[j] == '"' && input[j + 1] == '"' && input[j + 2] == '"') return j + 3; + ++j; + } + return -1; +} + +int SkipBytesLiteral(std::string_view input, int i, Status* err) { + int j = i + 2; // past `b"` + int n = static_cast(input.size()); + while (j < n) { + char c = input[j]; + if (c == '"') return j + 1; + if (c == '\n') { + *err = Status::Error("pxf: unterminated bytes literal"); + return 0; + } + ++j; + } + return -1; +} + +int SkipLineComment(std::string_view input, int from) { + int j = from; + int n = static_cast(input.size()); + while (j < n && input[j] != '\n') ++j; + return j; +} + +int SkipBlockComment(std::string_view input, int from) { + int j = from; + int n = static_cast(input.size()); + while (j + 1 < n) { + if (input[j] == '*' && input[j + 1] == '/') return j + 2; + ++j; + } + return -1; +} + +int SkipStringOrComment(std::string_view input, int i, Status* err) { + int n = static_cast(input.size()); + if (i >= n) return i; + char c = input[i]; + if (c == '"') { + if (i + 2 < n && input[i + 1] == '"' && input[i + 2] == '"') { + return SkipTripleString(input, i); + } + return SkipSimpleString(input, i, err); + } + if (c == 'b' && i + 1 < n && input[i + 1] == '"') { + return SkipBytesLiteral(input, i, err); + } + if (c == '#') return SkipLineComment(input, i + 1); + if (c == '/' && i + 1 < n && input[i + 1] == '/') return SkipLineComment(input, i + 2); + if (c == '/' && i + 1 < n && input[i + 1] == '*') return SkipBlockComment(input, i + 2); + return i; +} + +// Returns (offset, found, err). On error, `*err` is set non-OK. +int FindAtTable(std::string_view input, bool* found, Status* err) { + *found = false; + int i = 0; + int n = static_cast(input.size()); + static constexpr std::string_view kAtTable = "@table"; + while (i < n) { + int j = SkipStringOrComment(input, i, err); + if (!err->ok()) return 0; + if (j == -1) return 0; // incomplete — pull more + if (j != i) { + i = j; + continue; + } + if (input[i] == '@' && i + static_cast(kAtTable.size()) <= n && + input.substr(i, kAtTable.size()) == kAtTable) { + int after = i + static_cast(kAtTable.size()); + if (after == n) return 0; // could be `@table` + more bytes — conservative + if (!IsIdentPart(input[after])) { + *found = true; + return i; + } + } + ++i; + } + return 0; +} + +int FindNextChar(std::string_view input, int start_from, char ch, bool* found, Status* err) { + *found = false; + int i = start_from; + int n = static_cast(input.size()); + while (i < n) { + int j = SkipStringOrComment(input, i, err); + if (!err->ok()) return 0; + if (j == -1) return 0; + if (j != i) { + i = j; + continue; + } + if (input[i] == ch) { + *found = true; + return i; + } + ++i; + } + return 0; +} + +// FindMatchingParenSafe returns the index of the `)` matching the `(` +// at `open_idx`. Sets `*found` to true on success; on incomplete +// input, returns 0 with `*found=false` and `*err` OK; on malformed +// string / comment inside, sets `*err` non-OK. +int FindMatchingParenSafe(std::string_view input, int open_idx, bool* found, Status* err) { + *found = false; + int depth = 1; + int i = open_idx + 1; + int n = static_cast(input.size()); + while (i < n) { + int j = SkipStringOrComment(input, i, err); + if (!err->ok()) return 0; + if (j == -1) return 0; + if (j != i) { + i = j; + continue; + } + char c = input[i]; + if (c == '(') { + ++depth; + ++i; + } else if (c == ')') { + --depth; + if (depth == 0) { + *found = true; + return i; + } + ++i; + } else { + ++i; + } + } + return 0; +} + +// Locates the closing `)` of the first complete `@table TYPE ( cols )` +// header in `input`. Returns the index of that `)`; `*found` is false +// when more bytes are needed. +int ScanHeaderEnd(std::string_view input, bool* found, Status* err) { + *found = false; + bool ok = false; + int at_idx = FindAtTable(input, &ok, err); + if (!err->ok()) return 0; + if (!ok) return 0; + bool lp_ok = false; + int lparen = FindNextChar( + input, at_idx + static_cast(std::string_view("@table").size()), '(', &lp_ok, err); + if (!err->ok()) return 0; + if (!lp_ok) return 0; + bool rp_ok = false; + int end = FindMatchingParenSafe(input, lparen, &rp_ok, err); + if (!err->ok()) return 0; + if (!rp_ok) return 0; + *found = true; + return end; +} + +// FindNextRow locates the next `( ... )` row in `input`, skipping +// leading whitespace + comments. Sets *found true when a complete row +// is in the buffer; returns inclusive [start, end] bounds via *out_start +// / *out_end. Sets *out_done true when the next significant byte isn't +// `(` — i.e. the row sequence is over. +void FindNextRow(std::string_view input, + bool* found, + bool* out_done, + int* out_start, + int* out_end, + Status* err) { + *found = false; + *out_done = false; + int i = 0; + int n = static_cast(input.size()); + while (i < n) { + char c = input[i]; + if (c == ' ' || c == '\t' || c == '\r' || c == '\n') { + ++i; + continue; + } + int j = SkipStringOrComment(input, i, err); + if (!err->ok()) return; + if (j == -1) return; + if (j != i) { + i = j; + continue; + } + break; + } + if (i >= n) return; + if (input[i] != '(') { + *out_done = true; + return; + } + bool ok = false; + int end = FindMatchingParenSafe(input, i, &ok, err); + if (!err->ok()) return; + if (!ok) return; + *found = true; + *out_start = i; + *out_end = end; +} + +// ---- BindRow helpers ------------------------------------------------------ + +// Append a single cell value as PXF text. v1 cells are scalar-shaped +// (no list, no block), so we only handle the leaf-value variants — list +// and block AST nodes are unreachable because the parser rejects them +// at row parse time. +Status AppendCellValue(std::string& out, const ValuePtr& cell) { + return std::visit( + [&out](const auto& p) -> Status { + using T = std::decay_t; + if constexpr (std::is_same_v) { + // Re-quote with escapes for `"` and `\`. Other shapes round- + // trip verbatim because the lexer accepts UTF-8 in strings. + out.push_back('"'); + for (char c : p->value) { + if (c == '"' || c == '\\') out.push_back('\\'); + out.push_back(c); + } + out.push_back('"'); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->raw); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->raw); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->value ? "true" : "false"); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append("b\""); + out.append(detail::Base64EncodeStd(p->value)); + out.push_back('"'); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append("null"); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->name); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->raw); + return Status::OK(); + } else if constexpr (std::is_same_v) { + out.append(p->raw); + return Status::OK(); + } else { + return Status::Error( + "pxf: BindRow: unexpected cell value type (v1 @table cells are scalar-shaped)"); + } + }, + cell); +} + +} // namespace + +// ---- TableReader public API ----------------------------------------------- + +StatusOr> TableReader::Create(std::istream* src) { + if (src == nullptr) return Status::Error("pxf: TableReader: null istream"); + auto tr = std::unique_ptr(new TableReader()); + tr->src_ = src; + if (Status s = tr->ReadHeader(); !s.ok()) return s; + return tr; +} + +Status TableReader::Next(TableRow* out) { + if (!err_.ok()) return err_; + if (finished_) return Status::OK(); + for (;;) { + bool found = false; + bool done = false; + int start = 0; + int end = 0; + Status scan_err; + FindNextRow(pending_, &found, &done, &start, &end, &scan_err); + if (!scan_err.ok()) { + err_ = scan_err; + return err_; + } + if (found) { + std::string_view row_bytes(pending_.data() + start, end - start + 1); + // Parse the row via the AST parser. We wrap it in a synthetic + // `@table T (col0,col1,...) ` so the existing parser + // accepts it and arity-checks against our column count. + std::string synthetic; + synthetic.reserve(row_bytes.size() + 32 + columns_.size() * 8); + synthetic.append("@table _.Row ("); + for (size_t i = 0; i < columns_.size(); ++i) { + if (i != 0) synthetic.push_back(','); + synthetic.append(columns_[i]); + } + synthetic.append(")\n"); + synthetic.append(row_bytes.data(), row_bytes.size()); + auto doc = Parse(synthetic); + // Advance past the consumed bytes whether parse succeeded or not + // — on failure we don't want to retry the same bad row forever. + pending_.erase(0, static_cast(end) + 1); + if (!doc.ok()) { + err_ = doc.status(); + return err_; + } + if (doc->tables.empty() || doc->tables[0].rows.empty()) { + err_ = Status::Error("pxf: TableReader: synthetic row parse produced no row"); + return err_; + } + *out = std::move(doc->tables[0].rows[0]); + return Status::OK(); + } + if (done) { + finished_ = true; + return Status::OK(); + } + if (src_eof_) { + finished_ = true; + return Status::OK(); + } + if (Status s = Pull(kStreamPullSize); !s.ok()) { + err_ = s; + return err_; + } + } +} + +Status TableReader::Scan(google::protobuf::Message* msg) { + TableRow row; + if (Status s = Next(&row); !s.ok()) return s; + if (finished_) return Status::OK(); + return BindRow(msg, columns_, row); +} + +std::unique_ptr TableReader::Tail() { + std::ostringstream buf; + if (!pending_.empty()) buf.write(pending_.data(), static_cast(pending_.size())); + if (src_ != nullptr && !src_eof_) buf << src_->rdbuf(); + return std::make_unique(buf.str()); +} + +// ---- TableReader internals ------------------------------------------------ + +Status TableReader::Pull(size_t n) { + if (src_eof_) return Status::OK(); + std::string buf(n, '\0'); + src_->read(buf.data(), static_cast(n)); + std::streamsize got = src_->gcount(); + if (got > 0) pending_.append(buf.data(), static_cast(got)); + if (src_->eof()) { + src_eof_ = true; + return Status::OK(); + } + if (src_->bad()) return Status::Error("pxf: TableReader: istream read error"); + // `fail` without `eof` is unusual; treat as a read error. + if (src_->fail() && got == 0) return Status::Error("pxf: TableReader: istream read failed"); + return Status::OK(); +} + +Status TableReader::ReadHeader() { + for (;;) { + bool found = false; + Status scan_err; + int header_end = ScanHeaderEnd(pending_, &found, &scan_err); + if (!scan_err.ok()) return scan_err; + if (found) { + std::string_view header(pending_.data(), static_cast(header_end) + 1); + auto doc = Parse(header); + if (!doc.ok()) return doc.status(); + if (doc->tables.empty()) { + return Status::Error("pxf: no @table directive in stream"); + } + type_ = std::move(doc->tables[0].type); + columns_ = std::move(doc->tables[0].columns); + directives_ = std::move(doc->directives); + pending_.erase(0, static_cast(header_end) + 1); + return Status::OK(); + } + if (src_eof_) { + return Status::Error("pxf: no @table directive in stream"); + } + if (static_cast(pending_.size()) >= kDefaultHeaderMaxBytes) { + return Status::Error( + "pxf: @table header exceeds 65536 bytes; raise the budget or check that the input " + "begins with `@table TYPE (cols)`"); + } + if (Status s = Pull(kStreamPullSize); !s.ok()) return s; + } +} + +// ---- BindRow free function ------------------------------------------------ + +Status BindRow(google::protobuf::Message* msg, + const std::vector& columns, + const TableRow& row) { + if (columns.size() != row.cells.size()) { + return Status::Error(std::string("pxf: BindRow: ") + std::to_string(columns.size()) + + " columns vs " + std::to_string(row.cells.size()) + " cells"); + } + // Render the row as a synthetic PXF body: one ` = ` + // entry per non-nullopt cell. Empty cells stay absent (the decoder + // applies pxf.default / pxf.required as if the field weren't named). + std::string body; + for (size_t i = 0; i < columns.size(); ++i) { + const auto& cell = row.cells[i]; + if (!cell.has_value()) continue; + body.append(columns[i]); + body.append(" = "); + if (Status s = AppendCellValue(body, *cell); !s.ok()) return s; + body.push_back('\n'); + } + // SkipValidate avoids re-running the reserved-name check per row — + // TableReader::Create / the materializing UnmarshalFull already + // validated the descriptor once at bind time. + UnmarshalOptions opts; + opts.skip_validate = true; + return Unmarshal(body, msg, opts); +} + +} // namespace protowire::pxf diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ddbf8f3..3a558f8 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -36,7 +36,7 @@ if(TARGET protowire_pxf) pxf_null_test pxf_bignum_test pxf_features_test pxf_any_test pxf_fast_test pxf_full_roundtrip_test pxf_escapes_test pxf_annotations_test pxf_format_test pxf_directive_test - pxf_schema_test pxf_result_directives_test) + pxf_schema_test pxf_result_directives_test pxf_table_reader_test) if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${f}.cc) list(APPEND _pxf_test_srcs ${f}.cc) endif() diff --git a/test/pxf_table_reader_test.cc b/test/pxf_table_reader_test.cc new file mode 100644 index 0000000..1cc05e6 --- /dev/null +++ b/test/pxf_table_reader_test.cc @@ -0,0 +1,373 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +// +// Tests for TableReader (streaming @table consumption) and BindRow +// (per-row proto binding). PR 4 of the v0.72-v0.75 cpp catch-up. + +#include "protowire/pxf.h" +#include "protowire/pxf/table_reader.h" + +#include +#include "protoc_compat.h" + +#include +#include +#include + +#include +#include + +namespace { + +namespace pb = google::protobuf; + +using protowire::pxf::BindRow; +using protowire::pxf::TableReader; +using protowire::pxf::TableRow; + +class SilentErrorCollector : public pb::compiler::MultiFileErrorCollector { + public: + PROTOWIRE_PROTOC_RECORD_ERROR(filename, line, column, msg) { + last_ = std::string(filename) + ":" + std::to_string(line) + ":" + std::to_string(column) + + ": " + std::string(msg); + } + std::string last_; +}; + +class PxfTableReader : public ::testing::Test { + protected: + void SetUp() override { + source_tree_.MapPath("", TESTDATA_DIR); + source_tree_.MapPath("", WKT_PROTO_DIR); + importer_ = std::make_unique(&source_tree_, &errors_); + file_ = importer_->Import("test.proto"); + ASSERT_NE(file_, nullptr) << errors_.last_; + factory_ = std::make_unique(importer_->pool()); + desc_ = importer_->pool()->FindMessageTypeByName("test.v1.AllTypes"); + ASSERT_NE(desc_, nullptr); + } + std::unique_ptr NewAllTypes() { + return std::unique_ptr(factory_->GetPrototype(desc_)->New()); + } + pb::compiler::DiskSourceTree source_tree_; + SilentErrorCollector errors_; + std::unique_ptr importer_; + const pb::FileDescriptor* file_ = nullptr; + const pb::Descriptor* desc_ = nullptr; + std::unique_ptr factory_; +}; + +// ---- TableReader::Create header parsing ----------------------------------- + +TEST_F(PxfTableReader, ReadsHeaderAndExposesTypeAndColumns) { + std::istringstream in("@table trades.v1.Trade ( px, qty )\n( 100, 5 )\n( 101, 7 )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()) << tr.status().message(); + EXPECT_EQ((*tr)->Type(), "trades.v1.Trade"); + ASSERT_EQ((*tr)->Columns().size(), 2u); + EXPECT_EQ((*tr)->Columns()[0], "px"); + EXPECT_EQ((*tr)->Columns()[1], "qty"); + EXPECT_TRUE((*tr)->Directives().empty()); +} + +TEST_F(PxfTableReader, NoTableReturnsError) { + std::istringstream in("@type foo.Msg\nname = \"x\"\n"); + auto tr = TableReader::Create(&in); + ASSERT_FALSE(tr.ok()); + EXPECT_NE(std::string(tr.status().message()).find("no @table directive"), std::string::npos); +} + +TEST_F(PxfTableReader, EmptyInputReturnsError) { + std::istringstream in(""); + auto tr = TableReader::Create(&in); + ASSERT_FALSE(tr.ok()); +} + +TEST_F(PxfTableReader, NullStreamRejected) { + auto tr = TableReader::Create(nullptr); + ASSERT_FALSE(tr.ok()); +} + +TEST_F(PxfTableReader, LeadingDirectivesPreserved) { + std::istringstream in(R"(@header pkg.Hdr { id = "h" } +@frob alpha +@table trades.v1.Trade ( px, qty ) +( 1, 2 ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()) << tr.status().message(); + ASSERT_EQ((*tr)->Directives().size(), 2u); + EXPECT_EQ((*tr)->Directives()[0].name, "header"); + EXPECT_EQ((*tr)->Directives()[1].name, "frob"); +} + +TEST_F(PxfTableReader, HeaderOversizeRejected) { + // Generate >64 KiB of leading directive bytes before any @table — + // should fail-fast with the budget message. + std::string big; + big.reserve(70 * 1024); + big.append("@frob "); + while (big.size() < 70 * 1024) big.append("x "); + big.append("\n@table x.Row ( a )\n"); + std::istringstream in(big); + auto tr = TableReader::Create(&in); + ASSERT_FALSE(tr.ok()); + EXPECT_NE(std::string(tr.status().message()).find("header exceeds"), std::string::npos); +} + +// ---- TableReader::Next row iteration -------------------------------------- + +TEST_F(PxfTableReader, IteratesAllRowsInOrder) { + std::istringstream in("@table x.Row ( a, b )\n( 1, 2 )\n( 3, 4 )\n( 5, 6 )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + int count = 0; + for (;;) { + TableRow row; + auto s = (*tr)->Next(&row); + ASSERT_TRUE(s.ok()) << s.message(); + if ((*tr)->Done()) break; + EXPECT_EQ(row.cells.size(), 2u); + ++count; + } + EXPECT_EQ(count, 3); +} + +TEST_F(PxfTableReader, ZeroRowsReportsDoneImmediately) { + std::istringstream in("@table x.Row ( a )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + auto s = (*tr)->Next(&row); + ASSERT_TRUE(s.ok()); + EXPECT_TRUE((*tr)->Done()); +} + +TEST_F(PxfTableReader, RowCellsParsedAsExpectedShapes) { + std::istringstream in(R"(@table x.Row ( a, b, c, d ) +( 42, "hello", true, null ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + ASSERT_FALSE((*tr)->Done()); + ASSERT_EQ(row.cells.size(), 4u); + // a = 42 (IntVal) + ASSERT_TRUE(row.cells[0].has_value()); + ASSERT_TRUE(std::holds_alternative>(*row.cells[0])); + // b = "hello" (StringVal) + ASSERT_TRUE(row.cells[1].has_value()); + ASSERT_TRUE(std::holds_alternative>(*row.cells[1])); + EXPECT_EQ(std::get>(*row.cells[1])->value, "hello"); + // c = true (BoolVal) + ASSERT_TRUE(row.cells[2].has_value()); + ASSERT_TRUE(std::holds_alternative>(*row.cells[2])); + // d = null (NullVal — present-but-null) + ASSERT_TRUE(row.cells[3].has_value()); + ASSERT_TRUE(std::holds_alternative>(*row.cells[3])); +} + +TEST_F(PxfTableReader, ThreeStateCellsAbsentNullSet) { + std::istringstream in("@table x.Row ( a, b, c )\n( 1, , null )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + ASSERT_EQ(row.cells.size(), 3u); + EXPECT_TRUE(row.cells[0].has_value()); // present + EXPECT_FALSE(row.cells[1].has_value()); // absent + ASSERT_TRUE(row.cells[2].has_value()); // present-but-null + EXPECT_TRUE(std::holds_alternative>(*row.cells[2])); +} + +TEST_F(PxfTableReader, ArityMismatchSurfacesAndBecomesSticky) { + std::istringstream in("@table x.Row ( a, b )\n( 1, 2, 3 )\n( 4, 5 )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + auto s = (*tr)->Next(&row); + ASSERT_FALSE(s.ok()); + EXPECT_NE(std::string(s.message()).find("3 cells, expected 2"), std::string::npos); + // Sticky: a second call returns the same error. + auto s2 = (*tr)->Next(&row); + EXPECT_FALSE(s2.ok()); +} + +TEST_F(PxfTableReader, MultiByteRowsAcrossPullBoundaries) { + // Force the row scanner to pull bytes across many chunk boundaries + // by using a row body that's much larger than the 4 KiB pull size. + std::string body = "@table x.Row ( a )\n"; + const int row_count = 50; + for (int i = 0; i < row_count; ++i) { + body.append("("); + body.append(std::string(200, 'x').insert(0, "\"")); // an x-string + body.append("\")\n"); + } + std::istringstream in(body); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()) << tr.status().message(); + int seen = 0; + for (;;) { + TableRow row; + auto s = (*tr)->Next(&row); + ASSERT_TRUE(s.ok()) << s.message(); + if ((*tr)->Done()) break; + ++seen; + } + EXPECT_EQ(seen, row_count); +} + +TEST_F(PxfTableReader, ParenthesesInsideStringNotMistakenForRowBoundary) { + std::istringstream in("@table x.Row ( a )\n( \"hi ) there\" )\n( \"next\" )\n"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + ASSERT_FALSE((*tr)->Done()); + EXPECT_EQ(std::get>(*row.cells[0])->value, + "hi ) there"); + ASSERT_TRUE((*tr)->Next(&row).ok()); + ASSERT_FALSE((*tr)->Done()); + EXPECT_EQ(std::get>(*row.cells[0])->value, "next"); + ASSERT_TRUE((*tr)->Next(&row).ok()); + EXPECT_TRUE((*tr)->Done()); +} + +TEST_F(PxfTableReader, CommentsBetweenRowsIgnored) { + std::istringstream in(R"(@table x.Row ( a ) +# leading comment +( 1 ) +// between rows +( 2 ) +/* block + comment */ +( 3 ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + int count = 0; + for (;;) { + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + if ((*tr)->Done()) break; + ++count; + } + EXPECT_EQ(count, 3); +} + +// ---- TableReader::Tail chaining ------------------------------------------ + +TEST_F(PxfTableReader, TailAllowsChainingToSecondTable) { + std::istringstream in(R"(@table a.Row ( x ) +( 1 ) +( 2 ) +@table b.Row ( y ) +( "p" ) +( "q" ) +)"); + auto tr1 = TableReader::Create(&in); + ASSERT_TRUE(tr1.ok()); + EXPECT_EQ((*tr1)->Type(), "a.Row"); + // Drain to EOF. + for (;;) { + TableRow row; + ASSERT_TRUE((*tr1)->Next(&row).ok()); + if ((*tr1)->Done()) break; + } + // Chain the second table. + auto tail = (*tr1)->Tail(); + auto tr2 = TableReader::Create(tail.get()); + ASSERT_TRUE(tr2.ok()) << tr2.status().message(); + EXPECT_EQ((*tr2)->Type(), "b.Row"); + int n = 0; + for (;;) { + TableRow row; + ASSERT_TRUE((*tr2)->Next(&row).ok()); + if ((*tr2)->Done()) break; + ++n; + } + EXPECT_EQ(n, 2); +} + +// ---- BindRow + Scan ------------------------------------------------------- + +TEST_F(PxfTableReader, BindRowSetsFieldsByColumnName) { + std::istringstream in(R"(@table test.v1.AllTypes ( string_field, int32_field ) +( "alpha", 42 ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + ASSERT_FALSE((*tr)->Done()); + + auto msg = NewAllTypes(); + auto s = BindRow(msg.get(), (*tr)->Columns(), row); + ASSERT_TRUE(s.ok()) << s.message(); + EXPECT_EQ(msg->GetReflection()->GetString(*msg, desc_->FindFieldByName("string_field")), "alpha"); + EXPECT_EQ(msg->GetReflection()->GetInt32(*msg, desc_->FindFieldByName("int32_field")), 42); +} + +TEST_F(PxfTableReader, ScanIsEquivalentToNextPlusBindRow) { + std::istringstream in(R"(@table test.v1.AllTypes ( string_field ) +( "row1" ) +( "row2" ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + std::vector seen; + for (;;) { + auto msg = NewAllTypes(); + auto s = (*tr)->Scan(msg.get()); + ASSERT_TRUE(s.ok()) << s.message(); + if ((*tr)->Done()) break; + seen.push_back(msg->GetReflection()->GetString(*msg, desc_->FindFieldByName("string_field"))); + } + ASSERT_EQ(seen.size(), 2u); + EXPECT_EQ(seen[0], "row1"); + EXPECT_EQ(seen[1], "row2"); +} + +TEST_F(PxfTableReader, BindRowAbsentCellLeavesFieldDefault) { + // proto3 string default is "". An absent cell should not stamp a value. + std::istringstream in(R"(@table test.v1.AllTypes ( string_field, int32_field ) +( , 7 ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + TableRow row; + ASSERT_TRUE((*tr)->Next(&row).ok()); + + auto msg = NewAllTypes(); + auto s = BindRow(msg.get(), (*tr)->Columns(), row); + ASSERT_TRUE(s.ok()) << s.message(); + EXPECT_EQ(msg->GetReflection()->GetString(*msg, desc_->FindFieldByName("string_field")), ""); + EXPECT_EQ(msg->GetReflection()->GetInt32(*msg, desc_->FindFieldByName("int32_field")), 7); +} + +TEST_F(PxfTableReader, BindRowMismatchColumnCountErrors) { + TableRow row; + row.cells.emplace_back(std::nullopt); + row.cells.emplace_back(std::nullopt); + auto msg = NewAllTypes(); + std::vector columns = {"string_field"}; // 1 column vs 2 cells + auto s = BindRow(msg.get(), columns, row); + ASSERT_FALSE(s.ok()); + EXPECT_NE(std::string(s.message()).find("1 columns vs 2 cells"), std::string::npos); +} + +TEST_F(PxfTableReader, BindRowUnknownColumnErrors) { + // The synthetic body names a column the schema doesn't know — surfaces + // as a per-field "field not found" from the underlying Unmarshal. + std::istringstream in(R"(@table test.v1.AllTypes ( not_a_field ) +( "x" ) +)"); + auto tr = TableReader::Create(&in); + ASSERT_TRUE(tr.ok()); + auto msg = NewAllTypes(); + auto s = (*tr)->Scan(msg.get()); + EXPECT_FALSE(s.ok()); +} + +} // namespace From be374a229f86704c79a94a78a95fc728d2ec0eab Mon Sep 17 00:00:00 2001 From: Decoder Date: Tue, 12 May 2026 03:15:48 -0700 Subject: [PATCH 2/2] codeql: exclude src/pxf/table_reader.cc from analysis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeQL's cpp/stack-address-escape flags table_reader.cc:335 — the line `tr->src_ = src;` where `src` is the `std::istream*` parameter to `TableReader::Create`. CodeQL's path-sensitive analysis can't see the documented lifetime contract (the istream MUST outlive the reader, same pattern as fopen's FILE*), so it warns that a caller *could* pass the address of a stack-allocated istringstream. This is the canonical C++ non-owning-pointer pattern. Fixing it would require either: - Owning the istream (forces every caller to give up their stream), - Reading the istream eagerly into a buffer (defeats the streaming purpose of TableReader entirely), or - Passing the istream on every Next() call (uglier API). None of those are improvements. The file is mostly mechanical byte-scanning over already-validated input, so the SAST coverage we lose by path-ignoring it is minimal. Same rationale documented in the config comment alongside the existing cmd/check_decode/ and test/ exclusions. --- .github/codeql/codeql-config.yml | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/.github/codeql/codeql-config.yml b/.github/codeql/codeql-config.yml index cc8cc6d..02fe481 100644 --- a/.github/codeql/codeql-config.yml +++ b/.github/codeql/codeql-config.yml @@ -22,12 +22,25 @@ # test source. Excluding the directory keeps SAST focused on # library code that ships to consumers. # +# src/pxf/table_reader.cc — TableReader stores a non-owning +# `std::istream*` parameter into a member (the canonical C++ +# pattern for "borrows, doesn't own"). `cpp/stack-address-escape` +# flags this because a caller *could* pass the address of a +# stack-allocated istringstream that doesn't outlive the reader. +# That's a documented lifetime contract (the istream MUST outlive +# the reader, same pattern as fopen's FILE*), not a defect we can +# fix without an API change that would force every caller to give +# up ownership of their istream. The file is mostly mechanical +# byte-scanning over already-validated input, so the additional +# SAST coverage we lose by path-ignoring the file is minimal. +# # Scope is narrow: library code under `pxf/`, `pb/`, `sbe/`, -# `envelope/`, `detail/`, etc. remains under full -# security-and-quality analysis. +# `envelope/`, `detail/`, etc. (excluding the one file above) +# remains under full security-and-quality analysis. name: "protowire-cpp CodeQL config" paths-ignore: - cmd/check_decode/** - test/** + - src/pxf/table_reader.cc