From c8c7f14a15578654c4d74f24c87efe83466b5001 Mon Sep 17 00:00:00 2001 From: nik-localstack Date: Sat, 16 May 2026 13:03:21 +0300 Subject: [PATCH 1/2] fix(proxy): add backpressure handling to prevent hang on large responses When forwarding large responses, the proxy's send to the destination socket can block: the receiver's TCP buffer fills up, which fills the proxy's send buffer, which stalls the sender via flow control. With no EVENT_WRITE handling the proxy had no way to retry the stalled send, causing a deadlock for responses larger than the TCP send buffer (~400KB on macOS, ~256KB on Linux). Catch BlockingIOError explicitly (before the generic OSError handler) and register the socket for EVENT_WRITE so the selector retries the flush when buffer space becomes available. Also add return guards after connection close in the EVENT_READ path to prevent fall-through into the now-stale redirect_conn state. Add test_various_payload_sizes covering 1B, 1KB, 100KB, 1MB, 10MB and 10k/100k rows, over both plain and SSL connections, to catch regressions. Co-Authored-By: Claude Sonnet 4.6 --- postgresql_proxy/proxy.py | 28 +++++++++++++++++ tests/test_proxy.py | 64 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/postgresql_proxy/proxy.py b/postgresql_proxy/proxy.py index ba8b31e..46761fe 100644 --- a/postgresql_proxy/proxy.py +++ b/postgresql_proxy/proxy.py @@ -251,10 +251,29 @@ def service_connection(self, key: SelectorKeyProxy, mask): LOG.debug('%s connection closing %s', conn.name, conn.address) # A file object shall be unregistered prior to being closed. sock.close() + return except OSError as e: # it means the socket was closed by peer LOG.debug('%s connection closed by peer %s: %s', conn.name, conn.address, e) self._unregister_conn(conn) + return + + if mask & selectors.EVENT_WRITE: + # Socket has buffer space — flush this connection's backlogged output. + try: + while conn.out_bytes: + sent = sock.send(conn.out_bytes) + conn.sent(sent) + # All data drained; stop watching for writability. + conn.events = selectors.EVENT_READ + self.selector.modify(sock, selectors.EVENT_READ, data=conn) + except BlockingIOError: + pass # Still full; will retry on the next EVENT_WRITE notification. + except OSError as e: + LOG.debug('%s closed while flushing backlog: %s', conn.name, e) + self._unregister_conn(conn) + sock.close() + return next_conn = conn.redirect_conn if next_conn and next_conn.out_bytes: @@ -263,6 +282,15 @@ def service_connection(self, key: SelectorKeyProxy, mask): LOG.debug('sending to %s:\n%s', next_conn.name, next_conn.out_bytes) sent = next_conn.sock.send(next_conn.out_bytes) next_conn.sent(sent) + # All sent; clear write interest if it was previously registered. + if next_conn.events & selectors.EVENT_WRITE: + next_conn.events = selectors.EVENT_READ + self.selector.modify(next_conn.sock, selectors.EVENT_READ, data=next_conn) + except BlockingIOError: + # next_conn's send buffer is full — register for writability so we retry when there's space. + if not (next_conn.events & selectors.EVENT_WRITE): + next_conn.events = selectors.EVENT_READ | selectors.EVENT_WRITE + self.selector.modify(next_conn.sock, next_conn.events, data=next_conn) except OSError: # If one side is closed, close the other one # this can happen in the case where the client disconnects, and postgres still return a response diff --git a/tests/test_proxy.py b/tests/test_proxy.py index 56a2993..4d374a9 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -226,6 +226,70 @@ def test_repeated_connect_query_smoke_no_hang(postgres_settings, plain_proxy_por assert cur.fetchone() == (i,) +@pytest.mark.timeout(60) +@pytest.mark.parametrize("sslmode", ["disable", "require"]) +@pytest.mark.parametrize( + ["sql", "expected"], + [ + pytest.param( + "SELECT 1", + [(1,)], + id="tiny-1B", + ), + pytest.param( + "SELECT repeat('x', 1024)", + [("x" * 1024,)], + id="small-1KB", + ), + pytest.param( + "SELECT repeat('x', 102400)", + [("x" * 102400,)], + id="medium-100KB", + ), + pytest.param( + "SELECT repeat('x', 1048576)", + [("x" * 1048576,)], + id="large-1MB", + ), + pytest.param( + "SELECT repeat('x', 10485760)", + [("x" * 10485760,)], + id="xlarge-10MB", + ), + pytest.param( + "SELECT i FROM generate_series(1, 10000) AS t(i)", + [(i,) for i in range(1, 10001)], + id="rows-10k", + ), + pytest.param( + "SELECT i FROM generate_series(1, 100000) AS t(i)", + [(i,) for i in range(1, 100001)], + id="rows-100k", + ), + ] +) +def test_various_payload_sizes( + postgres_settings, + plain_proxy_port, + ssl_proxy_port, + sslmode, + sql, + expected, +): + with psycopg2.connect( + host="127.0.0.1", + port=plain_proxy_port if sslmode == "disable" else ssl_proxy_port, + user=postgres_settings["user"], + password=postgres_settings["password"], + dbname=postgres_settings["dbname"], + sslmode=sslmode, + connect_timeout=3, + ) as conn: + with conn.cursor() as cur: + cur.execute(sql) + assert cur.fetchall() == expected + + @pytest.mark.timeout(60) def test_psql_ssl_file_batch_stress_no_hang(postgres_settings, ssl_proxy_port): if shutil.which("psql") is None: From 0ea4e3d6cecde58d4848e98f8694f6d8eb33a1a2 Mon Sep 17 00:00:00 2001 From: nik-localstack Date: Sat, 16 May 2026 14:10:44 +0300 Subject: [PATCH 2/2] fix(proxy): handle ssl.SSLWantWriteError for large SSL responses ssl.SSLSocket.send() raises ssl.SSLWantWriteError (not BlockingIOError) when the underlying TCP buffer is full on a non-blocking SSL socket. SSLWantWriteError is a subclass of OSError, so it was caught by the generic connection-close handler, closing the connection mid-response. The client socket stayed open, leaving the caller hanging indefinitely. Catch SSLWantWriteError alongside BlockingIOError in both send paths so SSL connections correctly register EVENT_WRITE and retry when buffer space becomes available. Co-Authored-By: Claude Sonnet 4.6 --- postgresql_proxy/proxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/postgresql_proxy/proxy.py b/postgresql_proxy/proxy.py index 46761fe..3d4f70e 100644 --- a/postgresql_proxy/proxy.py +++ b/postgresql_proxy/proxy.py @@ -267,7 +267,7 @@ def service_connection(self, key: SelectorKeyProxy, mask): # All data drained; stop watching for writability. conn.events = selectors.EVENT_READ self.selector.modify(sock, selectors.EVENT_READ, data=conn) - except BlockingIOError: + except (BlockingIOError, ssl.SSLWantWriteError): pass # Still full; will retry on the next EVENT_WRITE notification. except OSError as e: LOG.debug('%s closed while flushing backlog: %s', conn.name, e) @@ -286,7 +286,7 @@ def service_connection(self, key: SelectorKeyProxy, mask): if next_conn.events & selectors.EVENT_WRITE: next_conn.events = selectors.EVENT_READ self.selector.modify(next_conn.sock, selectors.EVENT_READ, data=next_conn) - except BlockingIOError: + except (BlockingIOError, ssl.SSLWantWriteError): # next_conn's send buffer is full — register for writability so we retry when there's space. if not (next_conn.events & selectors.EVENT_WRITE): next_conn.events = selectors.EVENT_READ | selectors.EVENT_WRITE