From c883f05ca9e297ecf5fa11e49b3e0111d241163c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 22 Feb 2022 04:02:13 +0100 Subject: [PATCH] fix: Cancel query on Ctrl-C On KeyboardInterrupt, send a cancel to the server and keep waiting for the result of the cancel, which is expected to raise a QueryCanceled, then re-raise KeyboardInterrupt. Before this, the connection was left in ACTIVE state, so it couldn't be rolled back. Only fixed on sync connections. Left a failing test for async connections; the test fails with an output from the script such as: error ignored in rollback on : sending query failed: another command is already in progress Traceback (most recent call last): File "", line 27, in File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete self.run_forever() File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever self._run_once() File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once event_list = self._selector.select(timeout) File "/usr/lib/python3.8/selectors.py", line 468, in select fd_event_list = self._selector.poll(timeout, max_ev) KeyboardInterrupt And the except branch in `AsyncConnection.wait()` is not reached. See #231 --- docs/news.rst | 7 ++++ psycopg/psycopg/connection.py | 13 ++++++- psycopg/psycopg/connection_async.py | 17 ++++++++- tests/test_concurrency.py | 57 ++++++++++++++++++++++++++++- tests/test_concurrency_async.py | 55 +++++++++++++++++++++++++++- 5 files changed, 145 insertions(+), 4 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 9388119d1..f96a54752 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -21,6 +21,13 @@ Psycopg 3.1 (unreleased) - Drop support for Python 3.6. +Psycopg 3.0.10 (unreleased) +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Leave the connection working after interrupting a query with Ctrl-C + (currently only for sync connections, :ticket:`#231`). + + Current release --------------- diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 0ca56d104..37a606eb7 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -857,7 +857,18 @@ def wait(self, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV: The function must be used on generators that don't change connection fd (i.e. not on connect and reset). """ - return waiting.wait(gen, self.pgconn.socket, timeout=timeout) + try: + return waiting.wait(gen, self.pgconn.socket, timeout=timeout) + except KeyboardInterrupt: + # On Ctrl-C, try to cancel the query in the server, otherwise + # otherwise the connection will be stuck in ACTIVE state + c = self.pgconn.get_cancel() + c.cancel() + try: + waiting.wait(gen, self.pgconn.socket, timeout=timeout) + except e.QueryCanceled: + pass # as expected + raise @classmethod def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 8bd2ca73f..e34477865 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -293,7 +293,22 @@ async def notifies(self) -> AsyncGenerator[Notify, None]: yield n async def wait(self, gen: PQGen[RV]) -> RV: - return await waiting.wait_async(gen, self.pgconn.socket) + try: + return await waiting.wait_async(gen, self.pgconn.socket) + except KeyboardInterrupt: + # TODO: this doesn't seem to work as it does for sync connections + # see tests/test_concurrency_async.py::test_ctrl_c + # In the test, the code doesn't reach this branch. + + # On Ctrl-C, try to cancel the query in the server, otherwise + # otherwise the connection will be stuck in ACTIVE state + c = self.pgconn.get_cancel() + c.cancel() + try: + await waiting.wait_async(gen, self.pgconn.socket) + except e.QueryCanceled: + pass # as expected + raise @classmethod async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV: diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 643c1f026..86e1fe410 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -6,11 +6,13 @@ import sys import time import queue -import pytest +import signal import threading import subprocess as sp from typing import List +import pytest + import psycopg @@ -201,3 +203,56 @@ def closer(): finally: conn.close() conn2.close() + + +@pytest.mark.slow +@pytest.mark.subprocess +def test_ctrl_c(dsn): + if sys.platform == "win32": + sig = int(signal.CTRL_C_EVENT) + # Or pytest will receive the Ctrl-C too + creationflags = sp.CREATE_NEW_PROCESS_GROUP + else: + sig = int(signal.SIGINT) + creationflags = 0 + + script = f"""\ +import os +import time +import psycopg +from threading import Thread + +def tired_of_life(): + time.sleep(1) + os.kill(os.getpid(), {sig!r}) + +t = Thread(target=tired_of_life, daemon=True) +t.start() + +with psycopg.connect({dsn!r}) as conn: + cur = conn.cursor() + ctrl_c = False + try: + cur.execute("select pg_sleep(2)") + except KeyboardInterrupt: + ctrl_c = True + + assert ctrl_c, "ctrl-c not received" + assert ( + conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR + ), f"transaction status: {{conn.info.transaction_status!r}}" + + conn.rollback() + assert ( + conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE + ), f"transaction status: {{conn.info.transaction_status!r}}" + + cur.execute("select 1") + assert cur.fetchone() == (1,) +""" + t0 = time.time() + proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags) + proc.communicate() + t = time.time() - t0 + assert proc.returncode == 0 + assert 1 < t < 2 diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index 6a1df9684..adc95ce3e 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -1,9 +1,13 @@ +import sys import time -import pytest +import signal import asyncio +import subprocess as sp from asyncio.queues import Queue from typing import List, Tuple +import pytest + import psycopg from psycopg._compat import create_task @@ -151,3 +155,52 @@ async def closer(): finally: await aconn.close() await conn2.close() + + +@pytest.mark.xfail(reason="fix #231 for async connection") +@pytest.mark.slow +@pytest.mark.subprocess +async def test_ctrl_c(dsn): + script = f"""\ +import asyncio +import psycopg + +ctrl_c = False + +async def main(): + async with await psycopg.AsyncConnection.connect({dsn!r}) as conn: + cur = conn.cursor() + try: + await cur.execute("select pg_sleep(2)") + except KeyboardInterrupt: + ctrl_c = True + + assert ctrl_c, "ctrl-c not received" + assert ( + conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR + ), f"transaction status: {{conn.info.transaction_status!r}}" + + await conn.rollback() + assert ( + conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE + ), f"transaction status: {{conn.info.transaction_status!r}}" + + await cur.execute("select 1") + assert (await cur.fetchone()) == (1,) + +asyncio.run(main()) +""" + if sys.platform == "win32": + creationflags = sp.CREATE_NEW_PROCESS_GROUP + sig = signal.CTRL_C_EVENT + else: + creationflags = 0 + sig = signal.SIGINT + + proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags) + with pytest.raises(sp.TimeoutExpired): + outs, errs = proc.communicate(timeout=1) + + proc.send_signal(sig) + proc.communicate() + assert proc.returncode == 0