Skip to content

Commit

Permalink
fix: Cancel query on Ctrl-C
Browse files Browse the repository at this point in the history
On KeyboardInterrupt, send a cancel to the server and keep waiting for
the result of the cancel, which is expected to raise a QueryCanceled,
then re-raise KeyboardInterrupt.

Before this, the connection was left in ACTIVE state, so it couldn't be rolled
back.

Only fixed on sync connections. Left a failing test for async
connections; the test fails with an output from the script such as:

    error ignored in rollback on <psycopg.AsyncConnection [ACTIVE] ...>:
    sending query failed: another command is already in progress
    Traceback (most recent call last):
      File "<string>", line 27, in <module>
      File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
        self.run_forever()
      File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
        self._run_once()
      File "/usr/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
        event_list = self._selector.select(timeout)
      File "/usr/lib/python3.8/selectors.py", line 468, in select
        fd_event_list = self._selector.poll(timeout, max_ev)
    KeyboardInterrupt

And the except branch in `AsyncConnection.wait()` is not reached.

See #231
  • Loading branch information
dvarrazzo committed Mar 2, 2022
1 parent 7c7fe2a commit c883f05
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/news.rst
Expand Up @@ -21,6 +21,13 @@ Psycopg 3.1 (unreleased)
- Drop support for Python 3.6.


Psycopg 3.0.10 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^

- Leave the connection working after interrupting a query with Ctrl-C
(currently only for sync connections, :ticket:`#231`).


Current release
---------------

Expand Down
13 changes: 12 additions & 1 deletion psycopg/psycopg/connection.py
Expand Up @@ -857,7 +857,18 @@ def wait(self, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV:
The function must be used on generators that don't change connection
fd (i.e. not on connect and reset).
"""
return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
try:
return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
except KeyboardInterrupt:
# On Ctrl-C, try to cancel the query in the server, otherwise
# otherwise the connection will be stuck in ACTIVE state

This comment has been minimized.

Copy link
@lelit

lelit Mar 2, 2022

repeated word: s/otherwise//

This comment has been minimized.

Copy link
@dvarrazzo

dvarrazzo Mar 3, 2022

Author Member

Thank you: fixed :)

c = self.pgconn.get_cancel()
c.cancel()
try:
waiting.wait(gen, self.pgconn.socket, timeout=timeout)
except e.QueryCanceled:
pass # as expected
raise

@classmethod
def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
Expand Down
17 changes: 16 additions & 1 deletion psycopg/psycopg/connection_async.py
Expand Up @@ -293,7 +293,22 @@ async def notifies(self) -> AsyncGenerator[Notify, None]:
yield n

async def wait(self, gen: PQGen[RV]) -> RV:
return await waiting.wait_async(gen, self.pgconn.socket)
try:
return await waiting.wait_async(gen, self.pgconn.socket)
except KeyboardInterrupt:
# TODO: this doesn't seem to work as it does for sync connections
# see tests/test_concurrency_async.py::test_ctrl_c
# In the test, the code doesn't reach this branch.

# On Ctrl-C, try to cancel the query in the server, otherwise
# otherwise the connection will be stuck in ACTIVE state
c = self.pgconn.get_cancel()
c.cancel()
try:
await waiting.wait_async(gen, self.pgconn.socket)
except e.QueryCanceled:
pass # as expected
raise

@classmethod
async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
Expand Down
57 changes: 56 additions & 1 deletion tests/test_concurrency.py
Expand Up @@ -6,11 +6,13 @@
import sys
import time
import queue
import pytest
import signal
import threading
import subprocess as sp
from typing import List

import pytest

import psycopg


Expand Down Expand Up @@ -201,3 +203,56 @@ def closer():
finally:
conn.close()
conn2.close()


@pytest.mark.slow
@pytest.mark.subprocess
def test_ctrl_c(dsn):
if sys.platform == "win32":
sig = int(signal.CTRL_C_EVENT)
# Or pytest will receive the Ctrl-C too
creationflags = sp.CREATE_NEW_PROCESS_GROUP
else:
sig = int(signal.SIGINT)
creationflags = 0

script = f"""\
import os
import time
import psycopg
from threading import Thread
def tired_of_life():
time.sleep(1)
os.kill(os.getpid(), {sig!r})
t = Thread(target=tired_of_life, daemon=True)
t.start()
with psycopg.connect({dsn!r}) as conn:
cur = conn.cursor()
ctrl_c = False
try:
cur.execute("select pg_sleep(2)")
except KeyboardInterrupt:
ctrl_c = True
assert ctrl_c, "ctrl-c not received"
assert (
conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR
), f"transaction status: {{conn.info.transaction_status!r}}"
conn.rollback()
assert (
conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
), f"transaction status: {{conn.info.transaction_status!r}}"
cur.execute("select 1")
assert cur.fetchone() == (1,)
"""
t0 = time.time()
proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags)
proc.communicate()
t = time.time() - t0
assert proc.returncode == 0
assert 1 < t < 2
55 changes: 54 additions & 1 deletion tests/test_concurrency_async.py
@@ -1,9 +1,13 @@
import sys
import time
import pytest
import signal
import asyncio
import subprocess as sp
from asyncio.queues import Queue
from typing import List, Tuple

import pytest

import psycopg
from psycopg._compat import create_task

Expand Down Expand Up @@ -151,3 +155,52 @@ async def closer():
finally:
await aconn.close()
await conn2.close()


@pytest.mark.xfail(reason="fix #231 for async connection")
@pytest.mark.slow
@pytest.mark.subprocess
async def test_ctrl_c(dsn):
script = f"""\
import asyncio
import psycopg
ctrl_c = False
async def main():
async with await psycopg.AsyncConnection.connect({dsn!r}) as conn:
cur = conn.cursor()
try:
await cur.execute("select pg_sleep(2)")
except KeyboardInterrupt:
ctrl_c = True
assert ctrl_c, "ctrl-c not received"
assert (
conn.info.transaction_status == psycopg.pq.TransactionStatus.INERROR
), f"transaction status: {{conn.info.transaction_status!r}}"
await conn.rollback()
assert (
conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
), f"transaction status: {{conn.info.transaction_status!r}}"
await cur.execute("select 1")
assert (await cur.fetchone()) == (1,)
asyncio.run(main())
"""
if sys.platform == "win32":
creationflags = sp.CREATE_NEW_PROCESS_GROUP
sig = signal.CTRL_C_EVENT
else:
creationflags = 0
sig = signal.SIGINT

proc = sp.Popen([sys.executable, "-s", "-c", script], creationflags=creationflags)
with pytest.raises(sp.TimeoutExpired):
outs, errs = proc.communicate(timeout=1)

proc.send_signal(sig)
proc.communicate()
assert proc.returncode == 0

0 comments on commit c883f05

Please sign in to comment.