Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .release-notes/fix-process-again-double-delivery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Fix double-delivery of pg_query_failed when close() races with error processing

When `close()` was called while the session was between processing an error response and processing the subsequent ready-for-query message, the `ResultReceiver` could receive `pg_query_failed` twice — once with the original error and again with `SessionClosed`. Query cycle messages are now processed synchronously, preventing other operations from interleaving.
87 changes: 50 additions & 37 deletions postgres/_response_message_parser.pony
Original file line number Diff line number Diff line change
@@ -1,47 +1,60 @@
use "buffered"

primitive _ResponseMessageParser
"""
Processes buffered messages synchronously within a query cycle, yielding
only after ReadyForQuery. This prevents other behaviors (like `close()`)
from interleaving between result delivery and query dequeuing, which would
cause double-delivery of `pg_query_failed`.

If a callback triggers shutdown, `on_shutdown` clears the read buffer,
causing the next parse to return `None` and exit the loop naturally.
"""
fun apply(s: Session ref, readbuf: Reader) =>
try
match _ResponseParser(readbuf)?
| let msg: _AuthenticationMD5PasswordMessage =>
s.state.on_authentication_md5_password(s, msg)
| _AuthenticationOkMessage =>
s.state.on_authentication_ok(s)
| let msg: _CommandCompleteMessage =>
s.state.on_command_complete(s, msg)
| let msg: _DataRowMessage =>
s.state.on_data_row(s, msg)
| let err: ErrorResponseMessage =>
match err.code
| "28000" =>
s.state.on_authentication_failed(s,
InvalidAuthenticationSpecification)
while true do
try
match _ResponseParser(readbuf)?
| let msg: _AuthenticationMD5PasswordMessage =>
s.state.on_authentication_md5_password(s, msg)
| _AuthenticationOkMessage =>
s.state.on_authentication_ok(s)
| let msg: _CommandCompleteMessage =>
s.state.on_command_complete(s, msg)
| let msg: _DataRowMessage =>
s.state.on_data_row(s, msg)
| let err: ErrorResponseMessage =>
match err.code
| "28000" =>
s.state.on_authentication_failed(s,
InvalidAuthenticationSpecification)
return
| "28P01" =>
s.state.on_authentication_failed(s, InvalidPassword)
return
else
s.state.on_error_response(s, err)
end
| let msg: _ReadyForQueryMessage =>
s.state.on_ready_for_query(s, msg)
// ReadyForQuery marks the end of a query cycle. Yield to other
// actors before processing the next cycle.
s._process_again()
return
| "28P01" =>
s.state.on_authentication_failed(s, InvalidPassword)
| let msg: _RowDescriptionMessage =>
s.state.on_row_description(s, msg)
| let msg: _EmptyQueryResponseMessage =>
s.state.on_empty_query_response(s)
| None =>
// No complete message was found. Stop parsing for now.
return
else
s.state.on_error_response(s, err)
end
| let msg: _ReadyForQueryMessage =>
s.state.on_ready_for_query(s, msg)
| let msg: _RowDescriptionMessage =>
s.state.on_row_description(s, msg)
| let msg: _EmptyQueryResponseMessage =>
s.state.on_empty_query_response(s)
| None =>
// No complete message was found. Stop parsing for now.
else
// An unrecoverable error was encountered while parsing. Once that
// happens, there's no way we are going to be able to figure out how
// to get the responses back into an understandable state. The only
// thing we can do is shut the session down.

s.state.shutdown(s)
return
end
else
// An unrecoverable error was encountered while parsing. Once that
// happens, there's no way we are going to be able to figure out how
// to get the responses back into an understandable state. The only
// thing we can do is shut the session down.

s.state.shutdown(s)
return
end

s._process_again()
4 changes: 4 additions & 0 deletions postgres/session.pony
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class ref _SessionConnected is _AuthenticableState
r.pg_query_failed(q, SessionNotAuthenticated)

fun ref on_shutdown(s: Session ref) =>
// Clearing the readbuf is required for _ResponseMessageParser's
// synchronous loop to exit — the next parse returns None.
_readbuf.clear()

fun user(): String =>
Expand Down Expand Up @@ -171,6 +173,8 @@ class _SessionLoggedIn is _AuthenticatedState
query_state.try_run_query(s, this)

fun ref on_shutdown(s: Session ref) =>
// Clearing the readbuf is required for _ResponseMessageParser's
// synchronous loop to exit — the next parse returns None.
_readbuf.clear()
for queue_item in query_queue.values() do
(let query, let receiver) = queue_item
Expand Down