Skip to content

transactions and concurrency  #176

@dlax

Description

@dlax

It seems that transaction code does not work well with concurrent tasks. For instance, with the following test:

def test_concurrency(conn):
    conn.autocommit = True
            
    def fn(value):
        with conn.transaction():
            cur = conn.execute("select %s", (value,))
        return cur
    
    values = range(2)                                                               
    with concurrent.futures.ThreadPoolExecutor() as e:
        cursors = e.map(fn, values)
    assert sum(cur.fetchone()[0] for cur in cursors) == sum(values)

I get:

tests/test_transaction.py::test_concurrency FAILED                               [100%]

======================================= FAILURES =======================================
___________________________________ test_concurrency ___________________________________

conn = <psycopg.Connection [INTRANS] (host=localhost database=psycopg_test) at 0x7f7366336d00>

    def test_concurrency(conn):
        conn.autocommit = True
    
        def fn(value):
            with conn.transaction():
                cur = conn.execute("select %s", (value,))
            return cur
    
        values = range(2)
        with concurrent.futures.ThreadPoolExecutor() as e:
            cursors = e.map(fn, values)
>       assert sum(cur.fetchone()[0] for cur in cursors) == sum(values)

tests/test_transaction.py:665: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/test_transaction.py:665: in <genexpr>
    assert sum(cur.fetchone()[0] for cur in cursors) == sum(values)
/usr/lib/python3.9/concurrent/futures/_base.py:600: in result_iterator
    yield fs.pop().result()
/usr/lib/python3.9/concurrent/futures/_base.py:433: in result
    return self.__get_result()
/usr/lib/python3.9/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/lib/python3.9/concurrent/futures/thread.py:52: in run
    result = self.fn(*self.args, **self.kwargs)
tests/test_transaction.py:659: in fn
    cur = conn.execute("select %s", (value,))
/usr/lib/python3.9/contextlib.py:124: in __exit__
    next(self.gen)
psycopg/psycopg/connection.py:844: in transaction
    yield tx
psycopg/psycopg/transaction.py:215: in __exit__
    return self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
psycopg/psycopg/connection.py:867: in wait
    return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
psycopg/psycopg/waiting.py:228: in wait_epoll
    s = next(gen)
psycopg/psycopg/transaction.py:122: in _exit_gen
    yield from self._commit_gen()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.Transaction (active) [INTRANS] (host=localhost database=psycopg_test) at 0x7f7366353100>

    def _commit_gen(self) -> PQGen[PGresult]:
>       assert self._conn._savepoints[-1] == self._savepoint_name
E       AssertionError

psycopg/psycopg/transaction.py:138: AssertionError

See dlax@73dfb40 (branch transaction-concurrency in my fork) implementing the tests.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions