Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Another operation is in progress" error when using transactions and asyncio.gather #313

Closed
jekel opened this issue Aug 21, 2018 · 20 comments
Labels
bug Describes a bug in the system.
Milestone

Comments

@jekel
Copy link
Contributor

jekel commented Aug 21, 2018

Hello!
When i need to make several operations in parallel with db i'm using asyncio.gather with task and transaction inside it, but i have found that in GINO it does not work correctly - throws an error
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

How to reproduce it:

    async def t():
        async with db.transaction():
            await Model.query.gino.first()

    await asyncio.gather(*[t() for i in range(5)])

If i will place db.acquire before db.transaction it does not change anything.

I think that problem is that there must be new real connection and transaction in it when using asyncio.gather

stacktrace

 await Model.query.gino.first()
  File "venv/project/lib/python3.7/site-packages/gino/api.py", line 135, in first
    **params)
  File "venv/project/lib/python3.7/site-packages/gino/engine.py", line 681, in first
    return await conn.first(clause, *multiparams, **params)
  File "venv/project/lib/python3.7/site-packages/gino/engine.py", line 336, in first
    return await result.execute(one=True)
  File "venv/project/lib/python3.7/site-packages/gino/dialects/base.py", line 207, in execute
    context.statement, context.timeout, args, 1 if one else 0)
  File "venv/project/lib/python3.7/site-packages/gino/dialects/asyncpg.py", line 175, in async_execute
    query, executor, timeout)
  File "venv/project/lib/python3.7/site-packages/asyncpg/connection.py", line 1400, in _do_execute
    stmt = await self._get_statement(query, None)
  File "venv/project/lib/python3.7/site-packages/asyncpg/connection.py", line 323, in _get_statement
    statement = await self._protocol.prepare(stmt_name, query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 165, in prepare
  File "asyncpg/protocol/protocol.pyx", line 675, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
  • GINO version: 0.7.5
  • Python version: 3.7.0
  • asyncpg version: 0.17.0
  • PostgreSQL version: 9.6.9
  • Quart version 0.6.4
@jekel
Copy link
Contributor Author

jekel commented Aug 21, 2018

So.. i have made investigation and found that without implicit connection passing all nested quieries are made not in connection of db.transaction :(

when code is

    async def t():
        async with db.transaction(reuse=False, reusable=False) as tx:
            await Model.get(109887, bind=tx.connection)
    await asyncio.gather(*[t() for i in range(5)])

i have added print(cursor._conn, cursor._conn.gino_conn, cursor._conn.raw_connection)
into /gino/dialects/base.py:206
there are differect connections as expected

<gino.engine._DBAPIConnection object at 0x7fcdc69b2080> <gino.ext.quart.GinoConnection object at 0x7fcdc69b2fd0> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7fcdc69b3048> 0x7fcdc3b590b8>
<gino.engine._DBAPIConnection object at 0x7fcdc3b591d0> <gino.ext.quart.GinoConnection object at 0x7fcdc3b59278> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7fcdc69b3120> 0x7fcdc3b592b0>
<gino.engine._DBAPIConnection object at 0x7fcdc3b59438> <gino.ext.quart.GinoConnection object at 0x7fcdc3b594e0> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7fcdc69b32d0> 0x7fcdc3b59518>
<gino.engine._DBAPIConnection object at 0x7fcdc3b596a0> <gino.ext.quart.GinoConnection object at 0x7fcdc3b59748> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7fcdc69b31f8> 0x7fcdc3b59780>

and without bind=tx.connection result is following
only one connection is used and error is raised like in the first mesage

<gino.engine._ReusingDBAPIConnection object at 0x7ff45c77cd30> <gino.ext.quart.GinoConnection object at 0x7ff45c77ce10> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7ff45c821480> 0x7ff45c77ce48>
<gino.engine._ReusingDBAPIConnection object at 0x7ff45c77cef0> <gino.ext.quart.GinoConnection object at 0x7ff45c77cfd0> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7ff45c821480> 0x7ff45c77ce48>
<gino.engine._ReusingDBAPIConnection object at 0x7ff45c7b3080> <gino.ext.quart.GinoConnection object at 0x7ff45c7b3160> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7ff45c821480> 0x7ff45c77ce48>
<gino.engine._ReusingDBAPIConnection object at 0x7ff45c7b31d0> <gino.ext.quart.GinoConnection object at 0x7ff45c7b32b0> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7ff45c821480> 0x7ff45c77ce48>
<gino.engine._ReusingDBAPIConnection object at 0x7ff45c7b33c8> <gino.ext.quart.GinoConnection object at 0x7ff45c7b3518> <PoolConnectionProxy <asyncpg.connection.Connection object at 0x7ff45c821480> 0x7ff45c77ce48>

@fantix fantix added the bug Describes a bug in the system. label Aug 22, 2018
@fantix fantix added this to the v0.7.x milestone Aug 22, 2018
@fantix
Copy link
Member

fantix commented Aug 22, 2018

Thanks for the report! I'm seeing two critical issues for GINO in Python 3.7:

  1. Sub-tasks are able to share connections when any connection was acquired and released in main task before spawning sub-tasks.
import asyncio
import gino

db = gino.Gino()


async def main():
    await db.set_bind('postgresql://localhost/gino')

    async with db.acquire():
        pass

    async def t():
        async with db.acquire(reuse=True):
            await db.scalar('SELECT now()')

    await asyncio.gather(*[t() for _ in range(5)])


asyncio.run(main())
  1. In the same scenario, db.acquire(reuse=False) won't fail, but db.transaction(reuse=False) will.

@jekel
Copy link
Contributor Author

jekel commented Aug 22, 2018

This error also is valid for python 3.6.

i've tried as initialization

async with db.acquire(reuse=False, reusable=False):
       pass

with different parameters combination also

But it does not work also:

    async with db.transaction():
        pass

    async def t():
        async with db.transaction(reuse=True):
            await Model.get(...) 

    await asyncio.gather(*[t() for i in range(5)])

Is there anyway to use db.acquire(reuse=False) in all nested queries inside context block without passing any variables explicitly?

@fantix
Copy link
Member

fantix commented Aug 24, 2018

This error also is valid for python 3.6.

Could you please share your env and errors when above code fails in Python 3.6? I didn't reproduce in Python 3.6.

Is there anyway to use db.acquire(reuse=False) in all nested queries inside context block without passing any variables explicitly?

FYI db.acquire() has reuse=False by default.

@jekel
Copy link
Contributor Author

jekel commented Aug 24, 2018

Python 3.6.6 (default, Aug 20 2018, 16:26:45)

@app.route('/test')
async def test_save():
    async with db.acquire(reuse=False, reusable=False):
        pass

    async def t():
        async with db.transaction():
            await Model.get(...)

    await asyncio.gather(*[t() for i in range(5)])

    return {}
Traceback (most recent call last):
  File "/venv/project/lib/python3.6/site-packages/quart/app.py", line 1419, in handle_request
    return await self.full_dispatch_request(request_context)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 129, in throw
    return self.gen.throw(type, value, traceback)
  File "/venv/project/lib/python3.6/site-packages/quart/app.py", line 1441, in full_dispatch_request
    result = await self.handle_user_exception(error)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/quart/flask_patch/app.py", line 23, in new_handle_user_exception
    return await old_handle_user_exception(self, error)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/quart/app.py", line 876, in handle_user_exception
    raise error
  File "/venv/project/lib/python3.6/site-packages/quart/app.py", line 1439, in full_dispatch_request
    result = await self.dispatch_request(request_context)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 129, in throw
    return self.gen.throw(type, value, traceback)
  File "/venv/project/lib/python3.6/site-packages/quart/app.py", line 1487, in dispatch_request
    return await handler(**request_.view_args)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 129, in throw
    return self.gen.throw(type, value, traceback)
  File "/project/views.py", line 594, in test_save
    await asyncio.gather(*[t() for i in range(5)])
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 126, in send
    return self.gen.send(value)
  File "/project/views.py", line 591, in t
    async with db.transaction():
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/gino/engine.py", line 168, in __aenter__
    return await self._tx_ctx.__aenter__()
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/gino/transaction.py", line 165, in __aenter__
    await self._begin()
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/gino/transaction.py", line 80, in _begin
    await self._tx.begin()
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/gino/dialects/asyncpg.py", line 238, in begin
    await self._tx.start()
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/asyncpg/transaction.py", line 138, in start
    await self._connection.execute(query)
  File "/.pyenv/versions/3.6.6/lib/python3.6/asyncio/coroutines.py", line 110, in __next__
    return self.gen.send(None)
  File "/venv/project/lib/python3.6/site-packages/asyncpg/connection.py", line 273, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 317, in query
  File "asyncpg/protocol/protocol.pyx", line 675, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
aiocontextvars==0.1.2
aiofiles==0.3.2
aiohttp==3.3.2
aioredis==1.1.0
aioresponses==0.4.2
alembic==0.9.9
aniso8601==1.3.0
aresponses==1.0.1
astroid==1.6.3
async-timeout==3.0.0
asyncpg==0.17.0
atomicwrites==1.1.5
attrs==18.1.0
backcall==0.1.0
blinker==1.4
cached-property==1.4.3
certifi==2017.11.5
chardet==3.0.4
click==6.7
cycler==0.10.0
decorator==4.3.0
enum34==1.1.6
flake8==3.5.0
flake8-import-order-spoqa==1.3.0
Flask==1.0.2
Flask-Cors==3.0.3
Flask-Environments==0.1
Flask-Mail==0.9.1
Flask-Migrate==2.1.1
Flask-SQLAlchemy==2.3.2
gino==0.7.5
h11==0.7.0
h2==3.0.1
hiredis==0.2.0
hpack==3.0.0
Hypercorn==0.2.3
hyperframe==5.1.0
idna==2.6
idna-ssl==1.0.1
ipython==6.5.0
ipython-genutils==0.2.0
isort==4.3.4
itsdangerous==0.24
jedi==0.12.1
Jinja2==2.10
kiwisolver==1.0.1
lazy-object-proxy==1.3.1
Mako==1.0.7
MarkupSafe==1.0
marshmallow==2.15.2
mccabe==0.6.1
more-itertools==4.2.0
multidict==4.3.1
networkx==2.1
parso==0.3.1
pep8==1.7.1
pexpect==4.6.0
pickleshare==0.7.4
pluggy==0.6.0
prompt-toolkit==1.0.15
ptyprocess==0.6.0
py==1.5.3
py-lru-cache==0.1.4
pycodestyle==2.3.1
pyflakes==1.6.0
Pygments==2.2.0
pylint==1.9.1
pyparsing==2.2.0
pytest==3.6.1
pytest-asyncio==0.8.0
python-dateutil==2.6.1
python-editor==1.0.3
pytoml==0.1.18
pytz==2017.3
PyYAML==3.12
Quart==0.6.4
Quart-CORS==0.1.0
requests==2.18.4
simplegeneric==0.8.1
six==1.11.0
sortedcontainers==2.0.4
SQLAlchemy==1.2.10
testfixtures==6.0.2
traitlets==4.3.2
typing-extensions==3.6.5
urllib3==1.22
uvloop==0.11.2
wcwidth==0.1.7
websocket-client==0.48.0
websockets==6.0
Werkzeug==0.14.1
wrapt==1.10.11
wsproto==0.11.0
yarl==1.2.4

@fantix
Copy link
Member

fantix commented Aug 28, 2018

Ah yes, thanks for the update! To reproduce this issue in Python 3.6, aiocontextvars.enable_inherit() is required, which is the default for web framework GINO plugins.

@jekel
Copy link
Contributor Author

jekel commented Aug 29, 2018

@fantix You know, the same issue is also valid for Sanic

import asyncio
from sanic import Sanic
from sanic.exceptions import abort
from sanic.response import json
from gino.ext.sanic import Gino

app = Sanic()
app.config.DB_DSN = '...'

db = Gino()
db.init_app(app)


class Model(db.Model):
    __tablename__ = 'model'

    id = db.Column(db.Integer, primary_key=True)


@app.route("/test")
async def test(request):
    async def t():
        async with db.transaction():
            await Model.get_or_404(...)
    await asyncio.gather(*[t() for i in range(5)])
    return json({})


if __name__ == '__main__':
    app.run(debug=True)
    async with db.transaction():
  File "/venv//lib/python3.7/site-packages/gino/engine.py", line 168, in __aenter__
    return await self._tx_ctx.__aenter__()
  File "/venv//lib/python3.7/site-packages/gino/transaction.py", line 166, in __aenter__
    await self._begin()
  File "/venv//lib/python3.7/site-packages/gino/transaction.py", line 81, in _begin
    await self._tx.begin()
  File "/venv//lib/python3.7/site-packages/gino/dialects/asyncpg.py", line 238, in begin
    await self._tx.start()
  File "/venv//lib/python3.7/site-packages/asyncpg/transaction.py", line 138, in start
    await self._connection.execute(query)
  File "/venv//lib/python3.7/site-packages/asyncpg/connection.py", line 273, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 317, in query
  File "asyncpg/protocol/protocol.pyx", line 675, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

@fantix
Copy link
Member

fantix commented Aug 29, 2018

Yes thanks, all web framework plugins in GINO enabled inherit. I'm trying to make that a default in upstream contextvars PEP-567 backport.

@wwwjfy
Copy link
Member

wwwjfy commented Sep 2, 2018

It doesn't matter whether the connection was acquired and released in the main task. It also happens in the following code:

import asyncio
import gino

db = gino.Gino()


async def main():
    await db.set_bind('postgresql://localhost/gino')

    async with db.acquire():
        async def t():
            async with db.acquire(reuse=True):
                print(await db.scalar('SELECT now()'))

        await asyncio.gather(*[t() for _ in range(5)])


asyncio.run(main())

The issue is with db.acquire() happening before subtasks start, they share the common context stack, in which case the subsequent tasks get the same connection pushed by the first task.

I don't have a clear answer in mind if gino should create a new connection, or have a lock-like mechanism on the connection. I assume there are valid use cases in both scenarios.

@fantix
Copy link
Member

fantix commented Sep 4, 2018

@wwwjfy My idea is to remove stack instances from shared contexts once stacks became empty. This requires unique remove APIs for Python 3.7 and aiocontextvars or preferably contextvars backport (MagicStack/contextvars#11).

@wwwjfy
Copy link
Member

wwwjfy commented Sep 4, 2018

@fantix That can fix the problem raised, but can't prevent that the same reused connection got by subtasks, like the snippet in my previous comment. Because scalar has self.acquire(reuse=True), if the connection is being used by another subtask, the exception will be thrown.

@fantix
Copy link
Member

fantix commented Sep 4, 2018

Ah, first of all, context should be inherited in subtasks, this was discussed in #84, PEP-550:

The main reason for why tasks inherit the context, and threads do not, is the common usage intent. Tasks are often used for relatively short-running operations which are logically tied to the code that spawned the task (like running a coroutine with a timeout in asyncio). OS threads, on the other hand, are normally used for long-running, logically separate code.

and eventually implemented in PEP-567:

Tasks in asyncio need to maintain their own context that they inherit from the point they were created at.

Regarding shared connections, I think it should follow the way contexts are inherited, so that people don't have to remember another rule. Therefore if users are spawning subtasks within an async with db.acquire() context, they should be responsible for causing such error.

In your example, if user was in doubt whether the reusing connection may be accessed concurrently, an explicit reuse=False should always be present. But in case of this:

async def main():
    async with db.acquire():
        await asyncio.wait_for(db.status('SELECT pg_sleep(10)'), 1)

db.status() should reuse the connection explicitly acquired previously, even if asyncio.wait_for() puts it into a subtask.

Hopefully this explains. 😃

@wwwjfy
Copy link
Member

wwwjfy commented Sep 4, 2018

I have no issues on context inheritance, but not sure if there is any case user wants to use only one connection for those subtasks or in one request, where we'll need to lock-release it.

@fantix
Copy link
Member

fantix commented Sep 4, 2018

Ah, got it. I think the users should lock-release it themselves manually, likewise, asyncpg didn't provide locking either.

@wwwjfy
Copy link
Member

wwwjfy commented Sep 4, 2018

ok, fair enough.
Removing stack from context shouldn't be very hard.

@fantix
Copy link
Member

fantix commented Sep 4, 2018

No it's not - I'm trying to remove aiocontextvars.ContextVar.delete() and honor contextvars.ContextVar.reset(token).

@fantix fantix closed this as completed in 0a8a199 Sep 13, 2018
fantix added a commit that referenced this issue Sep 13, 2018
Fixed #313, remove stack when empty
@fantix
Copy link
Member

fantix commented Sep 30, 2018

Guess this should be backported to GINO 0.7 without introducing aiocontextvars 0.2.

@skandalfo
Copy link

Wow.

I just found that the fixes for this (the upcoming 0.8.0 with aiocontextvars 0.2?) likely also fix my woes with transactions.

I'm explicitly not using the aiohttp integration because the web part is just one side of my application, that also reacts to external events, so I didn't have the enable_inherit call anywhere. Long story short, whenever I used asyncio.gather to fork execution from within a transaction context, the current connection wouldn't be reused, so each of the coroutines I ran gather on used their own, transaction-less one, and I got plenty of "race conditions" that I was counting on the serializable isolation level to prevent.

I got to this by looking at PostgreSQL's statement logs where I couldn't see savepoints that I expected nested transactions to trigger. I dug all the code up to asyncpg's transaction support to verify they should be there. Eventually realized by the PIDs in the logs that different connections were in use when there should be only one.

I've just checked against master head, and now I see shared connections and nested savepoints, so I expect I'll see serialization failure retries now :-)

BTW. This is running on Python 3.5.3, and I made sure to have aiocontextvars imported before creating any loops.

@wwwjfy
Copy link
Member

wwwjfy commented Oct 8, 2018

Glad you find it works. One thing I'm thinking is using asyncio.gather, if it's used in your issue description, different coroutines share the same connection. This may cause nested transactions conflict with each other.
I don't use nested transactions much. This may not be true. I'll try it later.

@skandalfo
Copy link

@wwwjfy you're right; I had not thought of different nested transactions being started in different branches. That's not going to work.

But I don't need them. I just need to know if a transaction is already active (and let it retry as a whole if it fails) or not (and create a new root one in the spot).

I'll likely upgrade my decorator to use aiocontextvars to implement that behavior rather than blindly starting a new (sub) transaction. That will also allow me to register actions to be run once a transaction has been irreversibly committed.

But losing the current connection just because I used asyncio.gather was completely unexpected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Describes a bug in the system.
Projects
None yet
Development

No branches or pull requests

4 participants