Merged
Conversation
Streaming queries use the extended query protocol with Execute(max_rows) to deliver results in windowed batches via PortalSuspended, rather than buffering all rows in memory. The client controls flow through fetch_more() and close_stream() on Session, receiving batches via pg_stream_batch on StreamingResultReceiver. Adds _StreamingQueryInFlight query sub-state with _completing and _error guards to prevent duplicate Sync and post-completion Execute messages. Dispatch for fetch_more/close_stream uses match on concrete type in _SessionLoggedIn rather than the _QueryState interface. Design: #125
4c03ebb to
bada8bc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implement windowed batch delivery using the extended query protocol's Execute(max_rows) + PortalSuspended mechanism. Unlike
execute()which buffers all rows before delivery, streaming enables pull-based paged result consumption with bounded memory.New public API:
StreamingResultReceiverinterface (pg_stream_batch,pg_stream_complete,pg_stream_failed)Session.stream(query, window_size, receiver)— starts streamingSession.fetch_more()— requests next batchSession.close_stream()— ends streaming earlyUses Flush (not Sync) to keep the portal alive between batches. Sync is sent only on completion, error, or early close. A
_completingflag guards againstfetch_more/close_streamsending messages afteron_command_completehas already sent Sync — the receiver may callfetch_more()in response to the finalpg_stream_batchbeforeReadyForQueryarrives.Only
PreparedQueryandNamedPreparedQueryare supported — streaming requires the extended query protocol.Design: #72