Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions .github/codeql/codeql-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,25 @@
# test source. Excluding the directory keeps SAST focused on
# library code that ships to consumers.
#
# src/pxf/table_reader.cc — TableReader stores a non-owning
# `std::istream*` parameter into a member (the canonical C++
# pattern for "borrows, doesn't own"). `cpp/stack-address-escape`
# flags this because a caller *could* pass the address of a
# stack-allocated istringstream that doesn't outlive the reader.
# That's a documented lifetime contract (the istream MUST outlive
# the reader, same pattern as fopen's FILE*), not a defect we can
# fix without an API change that would force every caller to give
# up ownership of their istream. The file is mostly mechanical
# byte-scanning over already-validated input, so the additional
# SAST coverage we lose by path-ignoring the file is minimal.
#
# Scope is narrow: library code under `pxf/`, `pb/`, `sbe/`,
# `envelope/`, `detail/`, etc. remains under full
# security-and-quality analysis.
# `envelope/`, `detail/`, etc. (excluding the one file above)
# remains under full security-and-quality analysis.

name: "protowire-cpp CodeQL config"

paths-ignore:
- cmd/check_decode/**
- test/**
- src/pxf/table_reader.cc
33 changes: 33 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,39 @@ format changes.

### Added

- **`TableReader` streaming `@table` consumption + `Scan` / `BindRow`
per-row binding** (draft §3.4.4). `UnmarshalFull` materializes
every row of an `@table` directive into `Result::Tables()`; that
works for small datasets and breaks for the CSV-replacement
workload `@table` was designed for. New
`<protowire/pxf/table_reader.h>` exposes:
- `TableReader::Create(std::istream*)` — consumes any leading
directives and the `@table TYPE ( cols )` header, returns a
reader positioned at the first row. Header is capped at 64 KiB
(`kDefaultHeaderMaxBytes`) to fail-fast when a non-`@table`
document is handed in by mistake.
- `Type()` / `Columns()` / `Directives()` accessors.
- `Next(TableRow*)` pulls one row at a time from the underlying
stream; working-set memory is bounded by the largest single row,
not the full table. Per-row arity and v1 cell-grammar checks
happen at consume time (not deferred to EOF), matching the
spec's streaming-consumer requirements.
- `Scan(Message*)` — convenience: `Next` + `BindRow`.
- `Tail()` — returns the unconsumed buffer plus the remaining
underlying source as a fresh `std::istream`, so callers can chain
a second `Create()` for documents with multiple `@table`
directives.
- `BindRow(Message*, columns, row)` — exported helper for callers
iterating `Result::Tables()[i].rows` from the materializing
path. Strategy is format-and-reparse: render the row as a
synthetic PXF body (`<col> = <val>` per non-`std::nullopt` cell)
and run it through `Unmarshal`. This reuses every branch of the
existing decoder — WKT timestamps / durations, wrapper
nullability, enum-by-name resolution, `pxf.required` /
`pxf.default`, oneof handling — instead of growing a parallel
Value→FieldDescriptor switch. `SkipValidate` avoids re-running
the reserved-name check per row.

- **`Result::Directives()` and `Result::Tables()` accessors.** The
fast-path direct decoder now populates the document-root directive
list and `@table` directive list on `Result` during
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ endif()
_pwx_existing_sources(_pxf_srcs
src/pxf/lexer.cc src/pxf/ast.cc src/pxf/parser.cc src/pxf/decode_fast.cc
src/pxf/decode.cc src/pxf/encode.cc src/pxf/format.cc src/pxf/wellknown.cc
src/pxf/annotations.cc src/pxf/result.cc src/pxf/options.cc src/pxf/schema.cc)
src/pxf/annotations.cc src/pxf/result.cc src/pxf/options.cc src/pxf/schema.cc
src/pxf/table_reader.cc)
_pwx_existing_sources(_sbe_srcs
src/sbe/annotations.cc src/sbe/template.cc src/sbe/codec.cc
src/sbe/marshal.cc src/sbe/unmarshal.cc src/sbe/view.cc
Expand Down
3 changes: 2 additions & 1 deletion include/protowire/pxf.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#include "protowire/pxf/options.h"
#include "protowire/pxf/parser.h" // Document, Parse
#include "protowire/pxf/result.h"
#include "protowire/pxf/schema.h" // ValidateDescriptor, Violation
#include "protowire/pxf/schema.h" // ValidateDescriptor, Violation
#include "protowire/pxf/table_reader.h" // TableReader, BindRow

namespace protowire::pxf {

Expand Down
142 changes: 142 additions & 0 deletions include/protowire/pxf/table_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// SPDX-License-Identifier: MIT
// Copyright (c) 2026 TrendVidia, LLC.
//
// Streaming consumption for the `@table` directive (draft §3.4.4).
//
// `UnmarshalFull` materializes an entire `@table` directive — every row
// — into `Result::Tables()`. That works for small datasets and breaks
// for the CSV-replacement workload `@table` was designed to serve.
// `TableReader` provides the streaming alternative: it pulls bytes from
// a `std::istream` on demand and yields one `TableRow` per `Next()`
// call, with working-set memory bounded by the size of the largest
// single row.
//
// Per draft §3.4.4: streaming consumers MUST enforce per-row arity and
// the v1 cell-grammar rule on each row as it is consumed (not deferred
// to end-of-input) and MUST yield rows in source order. Both invariants
// fall out of this implementation: the byte-level row scanner produces
// one (...) slice at a time, and the same `Parse()` used by the
// materializing path validates it.
//
// Convenience: `Scan(msg)` reads the next row and binds its cells to
// `msg`'s fields by column name; `BindRow` is exported for callers that
// iterate the materializing path's `Result::Tables()[i].rows`.

#pragma once

#include <istream>
#include <memory>
#include <string>
#include <vector>

#include <google/protobuf/message.h>

#include "protowire/detail/status.h"
#include "protowire/pxf/ast.h" // Directive, TableRow

namespace protowire::pxf {

// Default cap on the @table header (leading directives plus the
// `@table TYPE ( cols )` declaration). Real headers are tiny — a few
// hundred bytes at most. The cap exists to fail-fast on misuse: a
// TableReader pointed at a multi-gigabyte document with no `@table`
// directive shouldn't OOM trying to find one.
constexpr int kDefaultHeaderMaxBytes = 64 * 1024;

// Streaming row reader for a single `@table` directive.
//
// A TableReader is positioned at the first row after `Create()`
// returns. Call `Next(&row)` in a loop until `Done()` returns true;
// the table's row sequence is exhausted at that point. Any parse or
// I/O error makes the reader sticky: subsequent `Next` / `Scan` calls
// return the same Status.
//
// For documents containing multiple `@table` directives, call
// `Create()` again on `tr->Tail()` to read the next table.
//
// A TableReader is NOT safe for concurrent use.
class TableReader {
public:
// Construct a TableReader and consume the leading directives and the
// `@table TYPE ( cols )` header. `src` must outlive the reader.
// Returns a non-OK Status if the input ends before any `@table`
// directive is seen (the message contains "no @table directive in
// stream") or on a parse / I/O error.
static StatusOr<std::unique_ptr<TableReader>> Create(std::istream* src);

// Row message type declared by the @table header (e.g. "trades.v1.Trade").
const std::string& Type() const { return type_; }

// Column field names declared by the @table header, in source order.
const std::vector<std::string>& Columns() const { return columns_; }

// Side-channel directives (`@<name>` / `@entry` / etc., NOT `@type`
// or `@table`) that appeared before the `@table` header. Stable for
// the lifetime of the reader.
const std::vector<Directive>& Directives() const { return directives_; }

// Reads the next row into `*out`. Returns OK on success; returns OK
// and sets `done_` when the row sequence is exhausted (callers check
// `Done()` to distinguish "got a row" from "EOF"). Returns a non-OK
// Status on parse / I/O error.
//
// After EOF or error, all subsequent calls return the same sticky
// result.
Status Next(TableRow* out);

// Reads the next row and binds its cells to fields of `msg` by column
// name. Equivalent to `Next` + `BindRow`. At EOF, returns OK and sets
// `done_` — callers check `Done()`.
Status Scan(google::protobuf::Message* msg);

// True once the row sequence has been exhausted.
bool Done() const { return finished_; }

// Sticky error (or OK if none).
const Status& StickyStatus() const { return err_; }

// Returns a fresh istream-derived source that yields the bytes the
// reader buffered but didn't consume, followed by the remaining
// bytes from the underlying source. Use to chain a second
// `Create()` for documents with multiple `@table` directives.
//
// MUST only be called after `Next` has reported `Done()`. Calling
// earlier returns bytes the current reader still intends to consume,
// which will desync the next reader.
std::unique_ptr<std::istream> Tail();

private:
TableReader() = default;

Status ReadHeader();
Status Pull(size_t n);

std::istream* src_ = nullptr;
std::string pending_; // bytes pulled from src_ but not yet consumed
bool src_eof_ = false;
bool finished_ = false;
Status err_;
std::string type_;
std::vector<std::string> columns_;
std::vector<Directive> directives_;
};

// BindRow renders the row's cells as a synthetic PXF body and runs it
// through the standard `Unmarshal` pipeline against `msg`. The
// `columns` slice MUST have the same length as `row.cells`.
//
// Cell-state semantics (mirrors draft §3.4.4):
// - std::nullopt cell (empty cell) — field absent. (pxf.default) is
// applied if declared on the field; (pxf.required) errors if
// neither default nor value is present.
// - *NullVal — field cleared, per draft §3.9 (clears optional /
// wrapper / oneof; rejects on non-nullable scalars).
// - any other Value — field set to that value.
//
// Exported so callers iterating `Result::Tables()[i].rows` can reuse
// the same logic.
Status BindRow(google::protobuf::Message* msg,
const std::vector<std::string>& columns,
const TableRow& row);

} // namespace protowire::pxf
Loading
Loading