-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PYTHON-4493 - Use asyncio protocols instead of sockets for network IO #2151
base: master
Are you sure you want to change the base?
Conversation
Passing build with every non-PyPy and PyOpenSSL task run: https://spruce.mongodb.com/version/67c76d41c4b4b40007969d8e/tasks?sorts=STATUS%3AASC%3BBASE_STATUS%3ADESC. The serverless failures are also present in master: https://spruce.mongodb.com/task/mongo_python_driver_serverless_rhel8_python3.13_test_serverless_patch_5ac262783f7fae2bc95ba354aad0ab27d5c23423_67c86a77bfabe100079556ed_25_03_05_15_15_10?execution=0&sortBy=STATUS&sortDir=ASC The OCSP failures are either flakey or fixed in master: https://spruce.mongodb.com/version/67c8734c6a4a7d000729f75b/tasks?sorts=STATUS%3AASC%3BBASE_STATUS%3ADESC |
pymongo/network_layer.py
Outdated
self._is_compressed = False | ||
self._compressor_id = None | ||
# If at least one header's worth of data remains after the current message, reprocess all leftover data | ||
if self._end_index - self._start_index >= 16: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for readability, do we want to make some var like HEADER_LENGTH = 16 and use that instead? (not a must, just a thought)
bson/__init__.py
Outdated
raise InvalidBSON("invalid object size") | ||
raise InvalidBSON( | ||
f"invalid object size: expected {obj_size}, got {data_len - position}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we split this change into its own ticket and add a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I was using this change for more helpful debugging but this should be shifted into a separate fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in PYTHON-5214.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: | ||
# These socket-based I/O methods are for KMS requests and any other network operations that do not use | ||
# the MongoDB wire protocol | ||
async def async_socket_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually we should have a protocol implementation of KMS right? Can you open a ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in PYTHON-5215.
@@ -1821,7 +1822,7 @@ async def test_exhaust_cursor_db_set(self): | |||
|
|||
listener.reset() | |||
|
|||
result = await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1).to_list() | |||
result = list(await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks confusing to call list(await ...). Can we go back to to_list()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why I did this change, sure.
) | ||
# Initialize the client with a larger timeout to help make test less flakey | ||
with pymongo.timeout(10): | ||
await client.admin.command("ping") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make these changes in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must have been debugging, reverted.
pyproject.toml
Outdated
@@ -115,6 +115,12 @@ filterwarnings = [ | |||
"module:unclosed <socket.socket:ResourceWarning", | |||
"module:unclosed <ssl.SSLSocket:ResourceWarning", | |||
"module:unclosed <socket object:ResourceWarning", | |||
# TODO: Remove the next five as part of PYTHON-5036. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do these resource warning have to do with unittest? They seem like unrelated resource leaks in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
def receive_message( | ||
conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE | ||
) -> Union[_OpReply, _OpMsg]: | ||
"""Receive a raw BSON message or raise socket.error.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"raw BSON message" -> "MongoDB wire protocol message"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronous receive_message
still receives raw BSON messages, so the docstring is correct.
pymongo/network_layer.py
Outdated
message[4], | ||
message[5], | ||
message[6], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified to:
start, end, op_code, is_compressed, compressor_id, overflow, overflow_index = message
@ShaneHarvey results for protocols vs sockets for 8.0 no ssl:
And protocols vs sockets for 6.0 ssl:
|
pymongo/network_layer.py
Outdated
assert not self._paused | ||
self._paused = True | ||
|
||
def resume_writing(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the defaults for set_write_buffer_limits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From Noah:
>>> get_write_buffer_limits()
(16384, 65536)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we call set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE)
, that will fix the large message perf issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity: using set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE)
resolved the large message perf issues, along with simplifying the protocol state management.
raise OSError("Connection is closed") | ||
self.transport.write(message) | ||
await self._drain_helper() | ||
self.transport.resume_reading() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we resume before writing?:
self.transport.resume_reading()
self.transport.write(message)
Will that improve latency?
We can answer this in a follow-up ticket.
I tested Performance does seem to improve: Now the only significant perf regressions are GridFsDownload and FindManyAndEmptyCursor. This indicates a regression with reading large messages off the socket. |
I updated the protocol with
It also appears to be more efficient for reading large messages compare to the previous approach. I'm running the benchmarks again to confirm: Here's another view, baseline here is 8.0-standalone non-ssl from PYTHON-5144 - Add async performance benchmarks:
Vs Noah's last commit:
So we've resolved the GridFsDownload and FindManyAndEmptyCursor perf issues. I believe the reset of the differences can be attributed to noise. |
def receive_message( | ||
conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE | ||
) -> Union[_OpReply, _OpMsg]: | ||
"""Receive a raw BSON message or raise socket.error.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
pyproject.toml
Outdated
@@ -115,6 +115,12 @@ filterwarnings = [ | |||
"module:unclosed <socket.socket:ResourceWarning", | |||
"module:unclosed <ssl.SSLSocket:ResourceWarning", | |||
"module:unclosed <socket object:ResourceWarning", | |||
# TODO: Remove the next five as part of PYTHON-5036. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
pyproject.toml
Outdated
"module:unclosed <_SelectorSocketTransport:ResourceWarning", | ||
"module:Unclosed AsyncMongoClient:ResourceWarning", | ||
"module:coroutine 'AsyncMongoClient.close' was never awaited:RuntimeWarning", | ||
"module:coroutine 'UnifiedSpecTestMixinV1.kill_all_sessions' was never awaited:RuntimeWarning", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem RuntimeWarnings seems like legitimate bugs. Can we remove them and address it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only two tests raise ResourceWarning: unclosed transport
warnings after removing these, test/asynchronous/test_pooling.py::TestPooling::test_maxConnecting
and test/asynchronous/test_pooling.py::TestPooling::test_pool_reuses_open_socket
. These warnings only happen with nossl
. None of the other warnings here are raised locally.
bson/__init__.py
Outdated
raise InvalidBSON("invalid object size") | ||
raise InvalidBSON( | ||
f"invalid object size: expected {obj_size}, got {data_len - position}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping.
pymongo/message.py
Outdated
@@ -1535,7 +1535,9 @@ def unpack(cls, msg: bytes) -> _OpMsg: | |||
raise ProtocolError(f"Unsupported OP_MSG payload type: 0x{first_payload_type:x}") | |||
|
|||
if len(msg) != first_payload_size + 5: | |||
raise ProtocolError("Unsupported OP_MSG reply: >1 section") | |||
raise ProtocolError( | |||
f"Unsupported OP_MSG reply: >1 section, {len(msg)} vs {first_payload_size + 5}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a debugging change or does it add actionable info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debugging change, reverted.
timeoutMS=2000, | ||
w="majority", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add readConcernLevel/readPreference/w options here? They seem unneeded.
Opening this now to begin the review process, as these changes are significant and especially critical to the driver's behavior.
Remaining TODOs:
batch_size
messages at a time, rather than all at once.A first review pass should focus on the structure of the code, especially
PyMongoProtocol
andasync_receive_message
.PyMongoProtocol
reads complete MongoDB wire protocol messages instead of raw bytes. This significantly increases the complexity of itsread
andbuffer_updated
methods, but allows us to maximize performance and encapsulate the wire protocol within.For additional context on the Python Transport/Protocol API, see the official docs and this discuss thread that forms the backbone of our approach here.