Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-4493 - Use asyncio protocols instead of sockets for network IO #2151

Open
wants to merge 70 commits into
base: master
Choose a base branch
from

Conversation

NoahStapp
Copy link
Contributor

@NoahStapp NoahStapp commented Feb 19, 2025

Opening this now to begin the review process, as these changes are significant and especially critical to the driver's behavior.

Remaining TODOs:

  • Ensure exhaust cursors fetch only batch_size messages at a time, rather than all at once.
  • Verify our entire EG test suite passes
  • Async performance benchmarks (PYTHON-5144)

A first review pass should focus on the structure of the code, especially PyMongoProtocol and async_receive_message.

PyMongoProtocol reads complete MongoDB wire protocol messages instead of raw bytes. This significantly increases the complexity of its read and buffer_updated methods, but allows us to maximize performance and encapsulate the wire protocol within.

For additional context on the Python Transport/Protocol API, see the official docs and this discuss thread that forms the backbone of our approach here.

self._is_compressed = False
self._compressor_id = None
# If at least one header's worth of data remains after the current message, reprocess all leftover data
if self._end_index - self._start_index >= 16:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability, do we want to make some var like HEADER_LENGTH = 16 and use that instead? (not a must, just a thought)

bson/__init__.py Outdated
raise InvalidBSON("invalid object size")
raise InvalidBSON(
f"invalid object size: expected {obj_size}, got {data_len - position}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this change into its own ticket and add a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was using this change for more helpful debugging but this should be shifted into a separate fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in PYTHON-5214.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

async def async_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
# These socket-based I/O methods are for KMS requests and any other network operations that do not use
# the MongoDB wire protocol
async def async_socket_sendall(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we should have a protocol implementation of KMS right? Can you open a ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in PYTHON-5215.

@@ -1821,7 +1822,7 @@ async def test_exhaust_cursor_db_set(self):

listener.reset()

result = await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1).to_list()
result = list(await c.find({}, cursor_type=pymongo.CursorType.EXHAUST, batch_size=1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks confusing to call list(await ...). Can we go back to to_list()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why I did this change, sure.

)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(10):
await client.admin.command("ping")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make these changes in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must have been debugging, reverted.

pyproject.toml Outdated
@@ -115,6 +115,12 @@ filterwarnings = [
"module:unclosed <socket.socket:ResourceWarning",
"module:unclosed <ssl.SSLSocket:ResourceWarning",
"module:unclosed <socket object:ResourceWarning",
# TODO: Remove the next five as part of PYTHON-5036.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do these resource warning have to do with unittest? They seem like unrelated resource leaks in the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

def receive_message(
conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
) -> Union[_OpReply, _OpMsg]:
"""Receive a raw BSON message or raise socket.error."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"raw BSON message" -> "MongoDB wire protocol message"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronous receive_message still receives raw BSON messages, so the docstring is correct.

message[4],
message[5],
message[6],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified to:

start, end, op_code, is_compressed, compressor_id, overflow, overflow_index = message

@NoahStapp
Copy link
Contributor Author

NoahStapp commented Mar 14, 2025

@ShaneHarvey results for protocols vs sockets for 8.0 no ssl:

Protocols throughput / sockets throughput:
	FindManyAndEmptyCursor: 0.9703507038392449
	FindManyAndEmptyCursor80Tasks: 1.0650953172933906
	FindManyAndEmptyCursor8Tasks: 0.9775536901241683
	FindOneByID: 1.46147262094986
	FindOneByID80Tasks: 1.4069179290234897
	FindOneByID8Tasks: 1.3381678794014957
	FindOneByIDUnlimitedTasks: 1.7597285568316348
	GridFsDownload: 0.6186035254783006
	GridFsUpload: 0.8580698951398888
	LargeDocBulkInsert: 0.8025809271466768
	LargeDocClientBulkInsert: 0.8284300217652859
	LargeDocInsertOne: 0.7958393829114708
	LargeDocInsertOneUnlimitedTasks: 0.6921674824365835
	RunCommand: 1.5270634310466316
	RunCommand80Tasks: 1.5297638111746044
	RunCommand8Tasks: 1.4591083572975148
	RunCommandUnlimitedTasks: 1.7314534944040327
	SmallDocBulkInsert: 0.8669518930819982
	SmallDocBulkMixedOps: 1.4989227527182083
	SmallDocClientBulkInsert: 0.9279355775863976
	SmallDocClientBulkMixedOps: 1.0482811945271218
	SmallDocInsertOne: 1.5143680814769194
	SmallDocInsertOneUnlimitedTasks: 1.754511082924126

And protocols vs sockets for 6.0 ssl:

Protocols throughput / sockets throughput:
	FindManyAndEmptyCursor: 1.0672273249852247
	FindManyAndEmptyCursor80Tasks: 1.0554761260568173
	FindManyAndEmptyCursor8Tasks: 1.004977518204416
	FindOneByID: 1.3637094086549113
	FindOneByID80Tasks: 1.422098420891551
	FindOneByID8Tasks: 1.2411505015962017
	FindOneByIDUnlimitedTasks: 1.7764270578284072
	GridFsDownload: 0.8628237726987754
	GridFsUpload: 0.8999105894010296
	LargeDocBulkInsert: 0.7910248314608147
	LargeDocInsertOne: 0.7787282630464124
	LargeDocInsertOneUnlimitedTasks: 0.7016240808656158
	RunCommand: 1.4062308622380693
	RunCommand80Tasks: 1.5994168409487448
	RunCommand8Tasks: 1.3972976028982338
	RunCommandUnlimitedTasks: 1.7735264640532364
	SmallDocBulkInsert: 0.8359165476288549
	SmallDocBulkMixedOps: 1.37245161361126
	SmallDocInsertOne: 1.4027430862050714
	SmallDocInsertOneUnlimitedTasks: 1.8038574351861747

assert not self._paused
self._paused = True

def resume_writing(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the defaults for set_write_buffer_limits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Noah:

>>> get_write_buffer_limits()
(16384, 65536)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we call set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE), that will fix the large message perf issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity: using set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE) resolved the large message perf issues, along with simplifying the protocol state management.

raise OSError("Connection is closed")
self.transport.write(message)
await self._drain_helper()
self.transport.resume_reading()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we resume before writing?:

        self.transport.resume_reading()
        self.transport.write(message)

Will that improve latency?

We can answer this in a follow-up ticket.

@ShaneHarvey
Copy link
Member

ShaneHarvey commented Mar 14, 2025

I tested self.transport.set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE) here:
https://spruce.mongodb.com/version/67d48d795671830008ada0b7

Performance does seem to improve:
Screenshot 2025-03-14 at 2 49 01 PM

Now the only significant perf regressions are GridFsDownload and FindManyAndEmptyCursor. This indicates a regression with reading large messages off the socket.

@ShaneHarvey
Copy link
Member

ShaneHarvey commented Mar 21, 2025

I updated the protocol with set_write_buffer_limits(MAX_MESSAGE_SIZE, MAX_MESSAGE_SIZE) and changed the buffer implementation. Instead of allocating a static buffer and sharing it between multiple messages, the code now only reads the header (into a static 16-byte buffer), then allocates a buffer for the single message. This simplifies the implementation (88 insertions(+), 164 deletions(-)) because it avoids the complexity of handling:

  • multiple messages in a single buffer
  • messages that span across buffer boundaries
  • messages that wrap around the end of the buffer
  • recursive calls to buffer_udpated

It also appears to be more efficient for reading large messages compare to the previous approach. I'm running the benchmarks again to confirm:

Here are the results:
Screenshot 2025-03-21 at 12 18 50 PM

Here's another view, baseline here is 8.0-standalone non-ssl from PYTHON-5144 - Add async performance benchmarks:

Async throughput / sync throughput:
        FindManyAndEmptyCursor: 1.028769979946231
        FindManyAndEmptyCursor8Tasks: 1.16685531423854
        FindOneByID: 0.6658267269977103
        FindOneByID8Tasks: 0.7242021792991123
        GridFsDownload: 1.002698716143819
        GridFsUpload: 0.9699820038127586
        LargeDocBulkInsert: 0.9825807412819852
        LargeDocClientBulkInsert: 0.985247929305518
        LargeDocInsertOne: 0.9439773395818303
        RunCommand: 0.5116982253490802
        RunCommand8Tasks: 0.7460176924516458
        SmallDocBulkInsert: 0.9857789599984034
        SmallDocBulkMixedOps: 0.6924633024031408
        SmallDocClientBulkInsert: 0.9668354933162755
        SmallDocClientBulkMixedOps: 0.9902208314563807
        SmallDocInsertOne: 0.6112980369718504
Async-only test throughput:
        FindManyAndEmptyCursor80Tasks: 63.238690067968605
        FindOneByID80Tasks: 4.405040402943344
        FindOneByIDUnlimitedTasks: 3.8027140136828654
        LargeDocInsertOneUnlimitedTasks: 102.12955671927966
        RunCommand80Tasks: 0.04559851294032789
        RunCommandUnlimitedTasks: 0.043668460646305696
        SmallDocInsertOneUnlimitedTasks: 0.7610258000089632

Vs Noah's last commit:

Async throughput / sync throughput:
        FindManyAndEmptyCursor: 0.969500486061596
        FindManyAndEmptyCursor8Tasks: 1.1166730680637451
        FindOneByID: 0.6915328339047162
        FindOneByID8Tasks: 0.7635200774802731
        GridFsDownload: 0.7252275764460623
        GridFsUpload: 0.9675281572848322
        LargeDocBulkInsert: 0.9893640963704542
        LargeDocClientBulkInsert: 0.9997139927333727
        LargeDocInsertOne: 0.9483924199445896
        RunCommand: 0.5623174123607358
        RunCommand8Tasks: 0.7794363241154477
        SmallDocBulkInsert: 0.985122000549431
        SmallDocBulkMixedOps: 0.7261259860601634
        SmallDocClientBulkInsert: 0.9672046911214328
        SmallDocClientBulkMixedOps: 1.0093383464500998
        SmallDocInsertOne: 0.6393223110436699
Async-only test throughput:
        FindManyAndEmptyCursor80Tasks: 61.188263991950066
        FindOneByID80Tasks: 4.456931899691728
        FindOneByIDUnlimitedTasks: 3.982004465080102
        LargeDocInsertOneUnlimitedTasks: 102.91116882621556
        RunCommand80Tasks: 0.046401864771913694
        RunCommandUnlimitedTasks: 0.04435688474252977
        SmallDocInsertOneUnlimitedTasks: 0.7883753154057945

So we've resolved the GridFsDownload and FindManyAndEmptyCursor perf issues. I believe the reset of the differences can be attributed to noise.

@ShaneHarvey ShaneHarvey requested a review from sleepyStick March 21, 2025 19:17
@NoahStapp NoahStapp requested a review from ShaneHarvey March 26, 2025 20:24
def receive_message(
conn: Connection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
) -> Union[_OpReply, _OpMsg]:
"""Receive a raw BSON message or raise socket.error."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

pyproject.toml Outdated
@@ -115,6 +115,12 @@ filterwarnings = [
"module:unclosed <socket.socket:ResourceWarning",
"module:unclosed <ssl.SSLSocket:ResourceWarning",
"module:unclosed <socket object:ResourceWarning",
# TODO: Remove the next five as part of PYTHON-5036.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

pyproject.toml Outdated
"module:unclosed <_SelectorSocketTransport:ResourceWarning",
"module:Unclosed AsyncMongoClient:ResourceWarning",
"module:coroutine 'AsyncMongoClient.close' was never awaited:RuntimeWarning",
"module:coroutine 'UnifiedSpecTestMixinV1.kill_all_sessions' was never awaited:RuntimeWarning",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem RuntimeWarnings seems like legitimate bugs. Can we remove them and address it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only two tests raise ResourceWarning: unclosed transport warnings after removing these, test/asynchronous/test_pooling.py::TestPooling::test_maxConnecting and test/asynchronous/test_pooling.py::TestPooling::test_pool_reuses_open_socket. These warnings only happen with nossl. None of the other warnings here are raised locally.

bson/__init__.py Outdated
raise InvalidBSON("invalid object size")
raise InvalidBSON(
f"invalid object size: expected {obj_size}, got {data_len - position}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping.

@@ -1535,7 +1535,9 @@ def unpack(cls, msg: bytes) -> _OpMsg:
raise ProtocolError(f"Unsupported OP_MSG payload type: 0x{first_payload_type:x}")

if len(msg) != first_payload_size + 5:
raise ProtocolError("Unsupported OP_MSG reply: >1 section")
raise ProtocolError(
f"Unsupported OP_MSG reply: >1 section, {len(msg)} vs {first_payload_size + 5}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a debugging change or does it add actionable info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugging change, reverted.

timeoutMS=2000,
w="majority",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add readConcernLevel/readPreference/w options here? They seem unneeded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants