-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PYTHON-5219 - Avoid awaiting coroutines when holding locks #2250
Conversation
pymongo/asynchronous/topology.py
Outdated
async with self._lock: | ||
# Close servers and clear the pools. | ||
for server in self._servers.values(): | ||
await server.close() | ||
close_servers.append(server) | ||
if not _IS_SYNC: | ||
self._monitor_tasks.append(server._monitor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR but why do we append to _monitor_tasks here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To cleanup all the monitor tasks owned by the closed servers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since asyncio doesn't support forking at all, should we just remove this? It seems like non-functional code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if replace the entire fork branch here with a warning on async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we make any code changes someone should test out the current behavior of fork+asyncio. Depending on the behavior we might need to reopen https://jira.mongodb.org/browse/PYTHON-5249.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example:
import os
from pymongo import AsyncMongoClient
import asyncio
async def test_func():
client = AsyncMongoClient()
await client.aconnect()
pid = os.fork()
if pid == 0:
await client.db.test.insert_one({'a': 1})
exit()
print("Done!")
asyncio.run(test_func())
Produces the following error multiple times, each with different tracebacks:
Traceback (most recent call last):
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/runners.py", line 194, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 708, in run_until_complete
self.run_forever()
~~~~~~~~~~~~~~~~^^
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 679, in run_forever
self._run_once()
~~~~~~~~~~~~~~^^
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 1989, in _run_once
event_list = self._selector.select(timeout)
File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/selectors.py", line 548, in select
kev_list = self._selector.control(None, max_ev, timeout)
ValueError: I/O operation on closed kqueue object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same error happens without using pymongo at all:
import asyncio
import os
async def test_func():
pid = os.fork()
if pid == 0:
await asyncio.sleep(.01)
print("Done child!")
exit()
await asyncio.sleep(.1)
print("Done parent!")
asyncio.run(test_func())
pymongo/asynchronous/topology.py
Outdated
async with self._lock: | ||
# Close servers and clear the pools. | ||
for server in self._servers.values(): | ||
await server.close() | ||
close_servers.append(server) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We close the servers here but we leave self._servers
untouched? Is using a client post fork broken right now? I don't see where the Server gets recreated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our test_fork.py
tests are all passing. We re-open each server in self._servers
in _ensure_opened
, called at the end of open
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the race I'm concerned about:
- app forks with an open client
- child process starts 2 threads that both call find_one()
- both threads see a different PID and enter this if-block.
- T1 acquires the lock first, resets the servers, reopens and then proceeds to server selection.
- T2 then closes the server selected by T1 which causes a PoolClosedError in T1.
It could be that this is already possible with the current code. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addition of async hasn't changed the structure of this code, only the async/await syntax, so if this race condition does exist, it's existed for some time. Here's the identical PyMongo 4.8 version, before we added async support:
mongo-python-driver/pymongo/topology.py
Line 177 in de0f46a
def open(self) -> None: |
I agree looking at the code that scenario certainly seems possible. We could add a flag set post-fork to prevent that race condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not concerned about async here. I'm concerned that delaying the server.close() call to after we release the lock will make this race more likely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized: since we don't support fork + async at all, this change is also non-functional and can be reverted.
I mention that this code hasn't changed beside the addition of async to show that this race condition has either been present for quite some time, or doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah let's undo the forking changes. Holding the lock while calling close() in the sync version isn't so bad in this case because it only happens once post fork().
No description provided.