From 979208be945de8eaa211243b29af7c13b0be8a0c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 17 Sep 2025 13:52:46 -0400 Subject: [PATCH 01/13] PYTHON-2390 - Retryable reads use the same implicit session --- pymongo/asynchronous/mongo_client.py | 23 ++++++++-------- pymongo/synchronous/mongo_client.py | 23 ++++++++-------- test/asynchronous/test_retryable_reads.py | 32 +++++++++++++++++++++++ test/test_retryable_reads.py | 32 +++++++++++++++++++++++ 4 files changed, 88 insertions(+), 22 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index b616647791..1134abb4ce 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2048,17 +2048,18 @@ async def _retryable_read( retryable = bool( retryable and self.options.retry_reads and not (session and session.in_transaction) ) - return await self._retry_internal( - func, - session, - None, - operation, - is_read=True, - address=address, - read_pref=read_pref, - retryable=retryable, - operation_id=operation_id, - ) + async with self._tmp_session(session) as s: + return await self._retry_internal( + func, + s, + None, + operation, + is_read=True, + address=address, + read_pref=read_pref, + retryable=retryable, + operation_id=operation_id, + ) async def _retryable_write( self, diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index ef0663584c..226df202ed 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2044,17 +2044,18 @@ def _retryable_read( retryable = bool( retryable and self.options.retry_reads and not (session and session.in_transaction) ) - return self._retry_internal( - func, - session, - None, - operation, - is_read=True, - address=address, - read_pref=read_pref, - retryable=retryable, - operation_id=operation_id, - ) + with self._tmp_session(session) as s: + return self._retry_internal( + func, + s, + None, + operation, + is_read=True, + address=address, + read_pref=read_pref, + retryable=retryable, + operation_id=operation_id, + ) def _retryable_write( self, diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 26454b3823..507fcac931 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -218,6 +218,38 @@ async def test_retryable_reads_are_retried_on_the_same_mongos_when_no_others_are # Assert that both events occurred on the same mongos. assert listener.succeeded_events[0].connection_id == listener.failed_events[0].connection_id + @async_client_context.require_failCommand_fail_point + async def test_retryable_reads_are_retried_on_the_same_implicit_session(self): + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": {"failCommands": ["count"], "errorCode": 6}, + } + + listener = OvertCommandListener() + client = await self.async_rs_or_single_client( + directConnection=False, + event_listeners=[listener], + retryReads=True, + ) + + await async_set_fail_point(client, fail_command) + + await client.t.t.estimated_document_count() + + # Disable failpoint. + fail_command["mode"] = "off" + await async_set_fail_point(client, fail_command) + + # Assert that both events occurred on the same session. + lsids = [ + event.command["lsid"] + for event in listener.started_events + if event.command_name == "count" + ] + self.assertEqual(len(lsids), 2) + self.assertEqual(lsids[0], lsids[1]) + if __name__ == "__main__": unittest.main() diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index fb8a374dac..05c7afb085 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -216,6 +216,38 @@ def test_retryable_reads_are_retried_on_the_same_mongos_when_no_others_are_avail # Assert that both events occurred on the same mongos. assert listener.succeeded_events[0].connection_id == listener.failed_events[0].connection_id + @client_context.require_failCommand_fail_point + def test_retryable_reads_are_retried_on_the_same_implicit_session(self): + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": {"failCommands": ["count"], "errorCode": 6}, + } + + listener = OvertCommandListener() + client = self.rs_or_single_client( + directConnection=False, + event_listeners=[listener], + retryReads=True, + ) + + set_fail_point(client, fail_command) + + client.t.t.estimated_document_count() + + # Disable failpoint. + fail_command["mode"] = "off" + set_fail_point(client, fail_command) + + # Assert that both events occurred on the same session. + lsids = [ + event.command["lsid"] + for event in listener.started_events + if event.command_name == "count" + ] + self.assertEqual(len(lsids), 2) + self.assertEqual(lsids[0], lsids[1]) + if __name__ == "__main__": unittest.main() From 382a94c42844f29af3eac8bf0d70509f721079b4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 17 Sep 2025 16:14:28 -0400 Subject: [PATCH 02/13] Add other retryable reads to test --- test/asynchronous/test_retryable_reads.py | 48 +++++++++++++---------- test/test_retryable_reads.py | 48 +++++++++++++---------- 2 files changed, 56 insertions(+), 40 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index 507fcac931..c77b8b5605 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -220,12 +220,6 @@ async def test_retryable_reads_are_retried_on_the_same_mongos_when_no_others_are @async_client_context.require_failCommand_fail_point async def test_retryable_reads_are_retried_on_the_same_implicit_session(self): - fail_command = { - "configureFailPoint": "failCommand", - "mode": {"times": 1}, - "data": {"failCommands": ["count"], "errorCode": 6}, - } - listener = OvertCommandListener() client = await self.async_rs_or_single_client( directConnection=False, @@ -233,22 +227,36 @@ async def test_retryable_reads_are_retried_on_the_same_implicit_session(self): retryReads=True, ) - await async_set_fail_point(client, fail_command) - - await client.t.t.estimated_document_count() + commands = [ + ("aggregate", lambda: client.t.t.count_documents({})), + ("aggregate", lambda: client.t.t.aggregate([{"$match": {}}])), + ("count", lambda: client.t.t.estimated_document_count()), + ("distinct", lambda: client.t.t.distinct("x")), + ("find", lambda: client.t.t.find_one({})), + ("listDatabases", lambda: client.list_databases()), + ("listCollections", lambda: client.t.list_collections()), + ("listIndexes", lambda: client.t.t.list_indexes()), + ] - # Disable failpoint. - fail_command["mode"] = "off" - await async_set_fail_point(client, fail_command) + for command_name, operation in commands: + listener.reset() + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": {"failCommands": [command_name], "errorCode": 6}, + } - # Assert that both events occurred on the same session. - lsids = [ - event.command["lsid"] - for event in listener.started_events - if event.command_name == "count" - ] - self.assertEqual(len(lsids), 2) - self.assertEqual(lsids[0], lsids[1]) + async with self.fail_point(fail_command): + await operation() + + # Assert that both events occurred on the same session. + lsids = [ + event.command["lsid"] + for event in listener.started_events + if event.command_name == command_name + ] + self.assertEqual(len(lsids), 2) + self.assertEqual(lsids[0], lsids[1]) if __name__ == "__main__": diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 05c7afb085..7d6d2d6666 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -218,12 +218,6 @@ def test_retryable_reads_are_retried_on_the_same_mongos_when_no_others_are_avail @client_context.require_failCommand_fail_point def test_retryable_reads_are_retried_on_the_same_implicit_session(self): - fail_command = { - "configureFailPoint": "failCommand", - "mode": {"times": 1}, - "data": {"failCommands": ["count"], "errorCode": 6}, - } - listener = OvertCommandListener() client = self.rs_or_single_client( directConnection=False, @@ -231,22 +225,36 @@ def test_retryable_reads_are_retried_on_the_same_implicit_session(self): retryReads=True, ) - set_fail_point(client, fail_command) - - client.t.t.estimated_document_count() + commands = [ + ("aggregate", lambda: client.t.t.count_documents({})), + ("aggregate", lambda: client.t.t.aggregate([{"$match": {}}])), + ("count", lambda: client.t.t.estimated_document_count()), + ("distinct", lambda: client.t.t.distinct("x")), + ("find", lambda: client.t.t.find_one({})), + ("listDatabases", lambda: client.list_databases()), + ("listCollections", lambda: client.t.list_collections()), + ("listIndexes", lambda: client.t.t.list_indexes()), + ] - # Disable failpoint. - fail_command["mode"] = "off" - set_fail_point(client, fail_command) + for command_name, operation in commands: + listener.reset() + fail_command = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": {"failCommands": [command_name], "errorCode": 6}, + } - # Assert that both events occurred on the same session. - lsids = [ - event.command["lsid"] - for event in listener.started_events - if event.command_name == "count" - ] - self.assertEqual(len(lsids), 2) - self.assertEqual(lsids[0], lsids[1]) + with self.fail_point(fail_command): + operation() + + # Assert that both events occurred on the same session. + lsids = [ + event.command["lsid"] + for event in listener.started_events + if event.command_name == command_name + ] + self.assertEqual(len(lsids), 2) + self.assertEqual(lsids[0], lsids[1]) if __name__ == "__main__": From e79290eee750816e242c28f5c560cf56aa2f3d1f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 18 Sep 2025 10:58:14 -0400 Subject: [PATCH 03/13] Fix test --- test/asynchronous/test_retryable_reads.py | 11 +++++++---- test/test_retryable_reads.py | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/test/asynchronous/test_retryable_reads.py b/test/asynchronous/test_retryable_reads.py index c77b8b5605..47ac91b0f5 100644 --- a/test/asynchronous/test_retryable_reads.py +++ b/test/asynchronous/test_retryable_reads.py @@ -227,6 +227,8 @@ async def test_retryable_reads_are_retried_on_the_same_implicit_session(self): retryReads=True, ) + await client.t.t.insert_one({"x": 1}) + commands = [ ("aggregate", lambda: client.t.t.count_documents({})), ("aggregate", lambda: client.t.t.aggregate([{"$match": {}}])), @@ -250,13 +252,14 @@ async def test_retryable_reads_are_retried_on_the_same_implicit_session(self): await operation() # Assert that both events occurred on the same session. - lsids = [ - event.command["lsid"] + command_docs = [ + event.command for event in listener.started_events if event.command_name == command_name ] - self.assertEqual(len(lsids), 2) - self.assertEqual(lsids[0], lsids[1]) + self.assertEqual(len(command_docs), 2) + self.assertEqual(command_docs[0]["lsid"], command_docs[1]["lsid"]) + self.assertIsNot(command_docs[0], command_docs[1]) if __name__ == "__main__": diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 7d6d2d6666..c9f72ae547 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -225,6 +225,8 @@ def test_retryable_reads_are_retried_on_the_same_implicit_session(self): retryReads=True, ) + client.t.t.insert_one({"x": 1}) + commands = [ ("aggregate", lambda: client.t.t.count_documents({})), ("aggregate", lambda: client.t.t.aggregate([{"$match": {}}])), @@ -248,13 +250,14 @@ def test_retryable_reads_are_retried_on_the_same_implicit_session(self): operation() # Assert that both events occurred on the same session. - lsids = [ - event.command["lsid"] + command_docs = [ + event.command for event in listener.started_events if event.command_name == command_name ] - self.assertEqual(len(lsids), 2) - self.assertEqual(lsids[0], lsids[1]) + self.assertEqual(len(command_docs), 2) + self.assertEqual(command_docs[0]["lsid"], command_docs[1]["lsid"]) + self.assertIsNot(command_docs[0], command_docs[1]) if __name__ == "__main__": From daf5f844c2c3f0f87fe483604940b0ba01365a39 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Thu, 18 Sep 2025 17:04:30 -0400 Subject: [PATCH 04/13] WIP --- pymongo/asynchronous/aggregation.py | 3 --- pymongo/asynchronous/change_stream.py | 9 +++----- pymongo/asynchronous/client_bulk.py | 1 - pymongo/asynchronous/client_session.py | 19 +++++++++++++++++ pymongo/asynchronous/collection.py | 13 +++--------- pymongo/asynchronous/command_cursor.py | 29 +++++++++++++------------- pymongo/asynchronous/cursor.py | 22 +++++++++---------- pymongo/asynchronous/database.py | 8 +++---- pymongo/asynchronous/mongo_client.py | 16 +++++++------- pymongo/synchronous/aggregation.py | 3 --- pymongo/synchronous/change_stream.py | 7 ++----- pymongo/synchronous/client_bulk.py | 1 - pymongo/synchronous/client_session.py | 15 +++++++++++++ pymongo/synchronous/collection.py | 11 ++-------- pymongo/synchronous/command_cursor.py | 28 ++++++++++++------------- pymongo/synchronous/cursor.py | 22 +++++++++---------- pymongo/synchronous/database.py | 8 +++---- pymongo/synchronous/mongo_client.py | 13 +++++------- 18 files changed, 114 insertions(+), 114 deletions(-) diff --git a/pymongo/asynchronous/aggregation.py b/pymongo/asynchronous/aggregation.py index 059d698772..6ca60ad9c3 100644 --- a/pymongo/asynchronous/aggregation.py +++ b/pymongo/asynchronous/aggregation.py @@ -50,7 +50,6 @@ def __init__( cursor_class: type[AsyncCommandCursor[Any]], pipeline: _Pipeline, options: MutableMapping[str, Any], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, user_fields: Optional[MutableMapping[str, Any]] = None, result_processor: Optional[Callable[[Mapping[str, Any], AsyncConnection], None]] = None, @@ -92,7 +91,6 @@ def __init__( self._options["cursor"]["batchSize"] = self._batch_size self._cursor_class = cursor_class - self._explicit_session = explicit_session self._user_fields = user_fields self._result_processor = result_processor @@ -197,7 +195,6 @@ async def get_cursor( batch_size=self._batch_size or 0, max_await_time_ms=self._max_await_time_ms, session=session, - explicit_session=self._explicit_session, comment=self._options.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/change_stream.py b/pymongo/asynchronous/change_stream.py index 3940111df2..b2b78b0660 100644 --- a/pymongo/asynchronous/change_stream.py +++ b/pymongo/asynchronous/change_stream.py @@ -236,7 +236,7 @@ def _process_result(self, result: Mapping[str, Any], conn: AsyncConnection) -> N ) async def _run_aggregation_cmd( - self, session: Optional[AsyncClientSession], explicit_session: bool + self, session: Optional[AsyncClientSession] ) -> AsyncCommandCursor: # type: ignore[type-arg] """Run the full aggregation pipeline for this AsyncChangeStream and return the corresponding AsyncCommandCursor. @@ -246,7 +246,6 @@ async def _run_aggregation_cmd( AsyncCommandCursor, self._aggregation_pipeline(), self._command_options(), - explicit_session, result_processor=self._process_result, comment=self._comment, ) @@ -258,10 +257,8 @@ async def _run_aggregation_cmd( ) async def _create_cursor(self) -> AsyncCommandCursor: # type: ignore[type-arg] - async with self._client._tmp_session(self._session, close=False) as s: - return await self._run_aggregation_cmd( - session=s, explicit_session=self._session is not None - ) + async with self._client._tmp_session(self._session) as s: + return await self._run_aggregation_cmd(session=s) async def _resume(self) -> None: """Reestablish this change stream after a resumable error.""" diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 45812b3400..7a54647431 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -449,7 +449,6 @@ async def _process_results_cursor( result["cursor"], conn.address, session=session, - explicit_session=session is not None, comment=self.comment, ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index be02295cea..63a33ce00e 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -137,6 +137,7 @@ import collections import time +import traceback import uuid from collections.abc import Mapping as _Mapping from typing import ( @@ -513,6 +514,7 @@ def __init__( # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + self._attached_to_cursor = False async def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -536,11 +538,14 @@ async def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections if self._server_session is not None: + # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: if self._server_session is None: + # print(f"Session {self} is already ended") + # print(f"Session {self} is already ended, implicit: {self._implicit}, attached: {self._attached_to_cursor}: {''.join(traceback.format_stack())}") raise InvalidOperation("Cannot use ended session") async def __aenter__(self) -> AsyncClientSession: @@ -588,6 +593,20 @@ def operation_time(self) -> Optional[Timestamp]: """ return self._operation_time + @property + def implicit(self) -> bool: + """Whether this session was implicitly created by the driver.""" + return self._implicit + + @property + def attached_to_cursor(self) -> bool: + """Whether this session is owned by a cursor.""" + return self._attached_to_cursor + + @attached_to_cursor.setter + def attached_to_cursor(self, value: bool) -> None: + self._attached_to_cursor = value + def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index 064231ccfc..6af1f4f782 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -2549,7 +2549,6 @@ async def _list_indexes( self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY), ) read_pref = (session and session._txn_read_preference()) or ReadPreference.PRIMARY - explicit_session = session is not None async def _cmd( session: Optional[AsyncClientSession], @@ -2576,13 +2575,12 @@ async def _cmd( cursor, conn.address, session=session, - explicit_session=explicit_session, comment=cmd.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) return cmd_cursor - async with self._database.client._tmp_session(session, False) as s: + async with self._database.client._tmp_session(session) as s: return await self._database.client._retryable_read( _cmd, read_pref, s, operation=_Op.LIST_INDEXES ) @@ -2678,7 +2676,6 @@ async def list_search_indexes( AsyncCommandCursor, pipeline, kwargs, - explicit_session=session is not None, comment=comment, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -2900,7 +2897,6 @@ async def _aggregate( pipeline: _Pipeline, cursor_class: Type[AsyncCommandCursor], # type: ignore[type-arg] session: Optional[AsyncClientSession], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, **kwargs: Any, @@ -2912,7 +2908,6 @@ async def _aggregate( cursor_class, pipeline, kwargs, - explicit_session, let, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -3018,13 +3013,12 @@ async def aggregate( .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - async with self._database.client._tmp_session(session, close=False) as s: + async with self._database.client._tmp_session(session) as s: return await self._aggregate( _CollectionAggregationCommand, pipeline, AsyncCommandCursor, session=s, - explicit_session=session is not None, let=let, comment=comment, **kwargs, @@ -3065,7 +3059,7 @@ async def aggregate_raw_batches( raise InvalidOperation("aggregate_raw_batches does not support auto encryption") if comment is not None: kwargs["comment"] = comment - async with self._database.client._tmp_session(session, close=False) as s: + async with self._database.client._tmp_session(session) as s: return cast( AsyncRawBatchCursor[_DocumentType], await self._aggregate( @@ -3073,7 +3067,6 @@ async def aggregate_raw_batches( pipeline, AsyncRawBatchCommandCursor, session=s, - explicit_session=session is not None, **kwargs, ), ) diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index db7c2b6638..08b07de2e1 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -64,7 +64,6 @@ def __init__( batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new command cursor.""" @@ -80,7 +79,8 @@ def __init__( self._max_await_time_ms = max_await_time_ms self._timeout = self._collection.database.client.options.timeout self._session = session - self._explicit_session = explicit_session + if self._session is not None: + self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._explicit_session: + if not self._session.implicit: return self._session return None @@ -218,10 +218,11 @@ def _die_no_lock(self) -> None: """Closes this cursor without acquiring a lock.""" cursor_id, address = self._prepare_to_die() self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None async def _die_lock(self) -> None: @@ -232,16 +233,18 @@ async def _die_lock(self) -> None: address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and not self._explicit_session: - self._session._end_implicit_session() - self._session = None + if self._session: + if self._session.implicit: + # print(f"Ending session {self}, session: {self._session}") + self._session._end_implicit_session() + self._session = None async def close(self) -> None: """Explicitly close / kill this cursor.""" @@ -430,7 +433,6 @@ def __init__( batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new cursor / iterator over raw batches of BSON data. @@ -449,7 +451,6 @@ def __init__( batch_size, max_await_time_ms, session, - explicit_session, comment, ) diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index d9fdd576f4..bb3454a53a 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -138,10 +138,9 @@ def __init__( if session: self._session = session - self._explicit_session = True + self._session.attached_to_cursor = True else: self._session = None - self._explicit_session = False spec: Mapping[str, Any] = filter or {} validate_is_mapping("filter", spec) @@ -150,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and not self._explicit_session: + if no_cursor_timeout and self._session and self._session.implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -283,7 +282,7 @@ def clone(self) -> AsyncCursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[AsyncCursor] = None) -> AsyncCursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._explicit_session: + if self._session and not self._session.implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -945,7 +944,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session.implicit: return self._session return None @@ -1034,10 +1033,11 @@ def _die_no_lock(self) -> None: cursor_id, address = self._prepare_to_die(already_killed) self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None async def _die_lock(self) -> None: @@ -1054,10 +1054,10 @@ async def _die_lock(self) -> None: address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None async def close(self) -> None: diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index f70c2b403f..7ba65133fa 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -699,7 +699,7 @@ async def aggregate( .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - async with self.client._tmp_session(session, close=False) as s: + async with self.client._tmp_session(session) as s: cmd = _DatabaseAggregationCommand( self, AsyncCommandCursor, @@ -1011,7 +1011,7 @@ async def cursor_command( else: command_name = next(iter(command)) - async with self._client._tmp_session(session, close=False) as tmp_session: + async with self._client._tmp_session(session) as tmp_session: opts = codec_options or DEFAULT_CODEC_OPTIONS if read_preference is None: @@ -1043,7 +1043,6 @@ async def cursor_command( conn.address, max_await_time_ms=max_await_time_ms, session=tmp_session, - explicit_session=session is not None, comment=comment, ) await cmd_cursor._maybe_pin_connection(conn) @@ -1089,7 +1088,7 @@ async def _list_collections( ) cmd = {"listCollections": 1, "cursor": {}} cmd.update(kwargs) - async with self._client._tmp_session(session, close=False) as tmp_session: + async with self._client._tmp_session(session) as tmp_session: cursor = ( await self._command(conn, cmd, read_preference=read_preference, session=tmp_session) )["cursor"] @@ -1098,7 +1097,6 @@ async def _list_collections( cursor, conn.address, session=tmp_session, - explicit_session=session is not None, comment=cmd.get("comment"), ) await cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 1134abb4ce..43ef3c60f7 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -35,6 +35,7 @@ import asyncio import contextlib import os +import traceback import warnings import weakref from collections import defaultdict @@ -2084,6 +2085,7 @@ async def _retryable_write( :param bulk: bulk abstraction to execute operations in bulk, defaults to None """ async with self._tmp_session(session) as s: + # print(f"Called retryable write with session = {session!r} and got {s}: {s._server_session!r}") return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id) def _cleanup_cursor_no_lock( @@ -2092,7 +2094,6 @@ def _cleanup_cursor_no_lock( address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[AsyncClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from __del__ without locking. @@ -2107,7 +2108,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and not explicit_session: + if session and session.implicit: session._end_implicit_session() async def _cleanup_cursor_lock( @@ -2116,7 +2117,6 @@ async def _cleanup_cursor_lock( address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[AsyncClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from cursor.close() using a lock. @@ -2128,7 +2128,6 @@ async def _cleanup_cursor_lock( :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. - :param explicit_session: True if the session was passed explicitly. """ if cursor_id: if conn_mgr and conn_mgr.more_to_come: @@ -2141,7 +2140,7 @@ async def _cleanup_cursor_lock( await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: await conn_mgr.close() - if session and not explicit_session: + if session and session.implicit: session._end_implicit_session() async def _close_cursor_now( @@ -2222,7 +2221,7 @@ async def _process_kill_cursors(self) -> None: for address, cursor_id, conn_mgr in pinned_cursors: try: - await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) + await self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it @@ -2267,7 +2266,7 @@ def _return_server_session( @contextlib.asynccontextmanager async def _tmp_session( - self, session: Optional[client_session.AsyncClientSession], close: bool = True + self, session: Optional[client_session.AsyncClientSession] ) -> AsyncGenerator[Optional[client_session.AsyncClientSession], None]: """If provided session is None, lend a temporary session.""" if session is not None: @@ -2292,7 +2291,8 @@ async def _tmp_session( raise finally: # Call end_session when we exit this scope. - if close: + if not s.attached_to_cursor: + # print(f"Ending session {s}: {''.join(traceback.format_stack(limit=10))}") await s.end_session() else: yield None diff --git a/pymongo/synchronous/aggregation.py b/pymongo/synchronous/aggregation.py index 9845f28b08..486768ab7d 100644 --- a/pymongo/synchronous/aggregation.py +++ b/pymongo/synchronous/aggregation.py @@ -50,7 +50,6 @@ def __init__( cursor_class: type[CommandCursor[Any]], pipeline: _Pipeline, options: MutableMapping[str, Any], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, user_fields: Optional[MutableMapping[str, Any]] = None, result_processor: Optional[Callable[[Mapping[str, Any], Connection], None]] = None, @@ -92,7 +91,6 @@ def __init__( self._options["cursor"]["batchSize"] = self._batch_size self._cursor_class = cursor_class - self._explicit_session = explicit_session self._user_fields = user_fields self._result_processor = result_processor @@ -197,7 +195,6 @@ def get_cursor( batch_size=self._batch_size or 0, max_await_time_ms=self._max_await_time_ms, session=session, - explicit_session=self._explicit_session, comment=self._options.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index f5f6352186..35bca9926c 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -235,9 +235,7 @@ def _process_result(self, result: Mapping[str, Any], conn: Connection) -> None: f"response : {result!r}" ) - def _run_aggregation_cmd( - self, session: Optional[ClientSession], explicit_session: bool - ) -> CommandCursor: # type: ignore[type-arg] + def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCursor: # type: ignore[type-arg] """Run the full aggregation pipeline for this ChangeStream and return the corresponding CommandCursor. """ @@ -246,7 +244,6 @@ def _run_aggregation_cmd( CommandCursor, self._aggregation_pipeline(), self._command_options(), - explicit_session, result_processor=self._process_result, comment=self._comment, ) @@ -259,7 +256,7 @@ def _run_aggregation_cmd( def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] with self._client._tmp_session(self._session, close=False) as s: - return self._run_aggregation_cmd(session=s, explicit_session=self._session is not None) + return self._run_aggregation_cmd(session=s) def _resume(self) -> None: """Reestablish this change stream after a resumable error.""" diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 1076ceba99..794708b653 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -447,7 +447,6 @@ def _process_results_cursor( result["cursor"], conn.address, session=session, - explicit_session=session is not None, comment=self.comment, ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 72a5b8e885..7d9f97fd83 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -512,6 +512,7 @@ def __init__( # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + self._attached_to_cursor = False def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -587,6 +588,20 @@ def operation_time(self) -> Optional[Timestamp]: """ return self._operation_time + @property + def implicit(self) -> bool: + """Whether this session was implicitly created by the driver.""" + return self._implicit + + @property + def attached_to_cursor(self) -> bool: + """Whether this session is owned by a cursor.""" + return self._attached_to_cursor + + @attached_to_cursor.setter + def attached_to_cursor(self, value: bool) -> None: + self._attached_to_cursor = value + def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index e5cc816cd3..99f9025548 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -2546,7 +2546,6 @@ def _list_indexes( self.with_options(codec_options=codec_options, read_preference=ReadPreference.PRIMARY), ) read_pref = (session and session._txn_read_preference()) or ReadPreference.PRIMARY - explicit_session = session is not None def _cmd( session: Optional[ClientSession], @@ -2573,7 +2572,6 @@ def _cmd( cursor, conn.address, session=session, - explicit_session=explicit_session, comment=cmd.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) @@ -2675,7 +2673,6 @@ def list_search_indexes( CommandCursor, pipeline, kwargs, - explicit_session=session is not None, comment=comment, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -2893,7 +2890,6 @@ def _aggregate( pipeline: _Pipeline, cursor_class: Type[CommandCursor], # type: ignore[type-arg] session: Optional[ClientSession], - explicit_session: bool, let: Optional[Mapping[str, Any]] = None, comment: Optional[Any] = None, **kwargs: Any, @@ -2905,7 +2901,6 @@ def _aggregate( cursor_class, pipeline, kwargs, - explicit_session, let, user_fields={"cursor": {"firstBatch": 1}}, ) @@ -3011,13 +3006,12 @@ def aggregate( .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - with self._database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session) as s: return self._aggregate( _CollectionAggregationCommand, pipeline, CommandCursor, session=s, - explicit_session=session is not None, let=let, comment=comment, **kwargs, @@ -3058,7 +3052,7 @@ def aggregate_raw_batches( raise InvalidOperation("aggregate_raw_batches does not support auto encryption") if comment is not None: kwargs["comment"] = comment - with self._database.client._tmp_session(session, close=False) as s: + with self._database.client._tmp_session(session) as s: return cast( RawBatchCursor[_DocumentType], self._aggregate( @@ -3066,7 +3060,6 @@ def aggregate_raw_batches( pipeline, RawBatchCommandCursor, session=s, - explicit_session=session is not None, **kwargs, ), ) diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index bcdeed5f94..b5665d4b42 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -64,7 +64,6 @@ def __init__( batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new command cursor.""" @@ -80,7 +79,8 @@ def __init__( self._max_await_time_ms = max_await_time_ms self._timeout = self._collection.database.client.options.timeout self._session = session - self._explicit_session = explicit_session + if self._session is not None: + self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._explicit_session: + if not self._session.implicit: return self._session return None @@ -218,10 +218,11 @@ def _die_no_lock(self) -> None: """Closes this cursor without acquiring a lock.""" cursor_id, address = self._prepare_to_die() self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None def _die_lock(self) -> None: @@ -232,16 +233,17 @@ def _die_lock(self) -> None: address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and not self._explicit_session: - self._session._end_implicit_session() - self._session = None + if self._session: + if self._session.implicit: + self._session._end_implicit_session() + self._session = None def close(self) -> None: """Explicitly close / kill this cursor.""" @@ -430,7 +432,6 @@ def __init__( batch_size: int = 0, max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, - explicit_session: bool = False, comment: Any = None, ) -> None: """Create a new cursor / iterator over raw batches of BSON data. @@ -449,7 +450,6 @@ def __init__( batch_size, max_await_time_ms, session, - explicit_session, comment, ) diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 3dd550f4d5..1355969c48 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -138,10 +138,9 @@ def __init__( if session: self._session = session - self._explicit_session = True + self._session.attached_to_cursor = True else: self._session = None - self._explicit_session = False spec: Mapping[str, Any] = filter or {} validate_is_mapping("filter", spec) @@ -150,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and not self._explicit_session: + if no_cursor_timeout and self._session and self._session.implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -283,7 +282,7 @@ def clone(self) -> Cursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[Cursor] = None) -> Cursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._explicit_session: + if self._session and not self._session.implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -943,7 +942,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._explicit_session: + if self._session and not self._session.implicit: return self._session return None @@ -1032,10 +1031,11 @@ def _die_no_lock(self) -> None: cursor_id, address = self._prepare_to_die(already_killed) self._collection.database.client._cleanup_cursor_no_lock( - cursor_id, address, self._sock_mgr, self._session, self._explicit_session + cursor_id, address, self._sock_mgr, self._session ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None def _die_lock(self) -> None: @@ -1052,10 +1052,10 @@ def _die_lock(self) -> None: address, self._sock_mgr, self._session, - self._explicit_session, ) - if not self._explicit_session: - self._session = None + if self._session: + if self._session.implicit: + self._session = None self._sock_mgr = None def close(self) -> None: diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index e30f97817c..bd724c83d4 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -699,7 +699,7 @@ def aggregate( .. _aggregate command: https://mongodb.com/docs/manual/reference/command/aggregate """ - with self.client._tmp_session(session, close=False) as s: + with self.client._tmp_session(session) as s: cmd = _DatabaseAggregationCommand( self, CommandCursor, @@ -1009,7 +1009,7 @@ def cursor_command( else: command_name = next(iter(command)) - with self._client._tmp_session(session, close=False) as tmp_session: + with self._client._tmp_session(session) as tmp_session: opts = codec_options or DEFAULT_CODEC_OPTIONS if read_preference is None: @@ -1039,7 +1039,6 @@ def cursor_command( conn.address, max_await_time_ms=max_await_time_ms, session=tmp_session, - explicit_session=session is not None, comment=comment, ) cmd_cursor._maybe_pin_connection(conn) @@ -1085,7 +1084,7 @@ def _list_collections( ) cmd = {"listCollections": 1, "cursor": {}} cmd.update(kwargs) - with self._client._tmp_session(session, close=False) as tmp_session: + with self._client._tmp_session(session) as tmp_session: cursor = ( self._command(conn, cmd, read_preference=read_preference, session=tmp_session) )["cursor"] @@ -1094,7 +1093,6 @@ def _list_collections( cursor, conn.address, session=tmp_session, - explicit_session=session is not None, comment=cmd.get("comment"), ) cmd_cursor._maybe_pin_connection(conn) diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 226df202ed..f49e64a2b1 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2088,7 +2088,6 @@ def _cleanup_cursor_no_lock( address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from __del__ without locking. @@ -2103,7 +2102,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and not explicit_session: + if session and session.implicit: session._end_implicit_session() def _cleanup_cursor_lock( @@ -2112,7 +2111,6 @@ def _cleanup_cursor_lock( address: Optional[_CursorAddress], conn_mgr: _ConnectionManager, session: Optional[ClientSession], - explicit_session: bool, ) -> None: """Cleanup a cursor from cursor.close() using a lock. @@ -2124,7 +2122,6 @@ def _cleanup_cursor_lock( :param address: The _CursorAddress. :param conn_mgr: The _ConnectionManager for the pinned connection or None. :param session: The cursor's session. - :param explicit_session: True if the session was passed explicitly. """ if cursor_id: if conn_mgr and conn_mgr.more_to_come: @@ -2137,7 +2134,7 @@ def _cleanup_cursor_lock( self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: conn_mgr.close() - if session and not explicit_session: + if session and session.implicit: session._end_implicit_session() def _close_cursor_now( @@ -2218,7 +2215,7 @@ def _process_kill_cursors(self) -> None: for address, cursor_id, conn_mgr in pinned_cursors: try: - self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None, False) + self._cleanup_cursor_lock(cursor_id, address, conn_mgr, None) except Exception as exc: if isinstance(exc, InvalidOperation) and self._topology._closed: # Raise the exception when client is closed so that it @@ -2263,7 +2260,7 @@ def _return_server_session( @contextlib.contextmanager def _tmp_session( - self, session: Optional[client_session.ClientSession], close: bool = True + self, session: Optional[client_session.ClientSession] ) -> Generator[Optional[client_session.ClientSession], None]: """If provided session is None, lend a temporary session.""" if session is not None: @@ -2288,7 +2285,7 @@ def _tmp_session( raise finally: # Call end_session when we exit this scope. - if close: + if not s.attached_to_cursor: s.end_session() else: yield None From d65fe457d55ab0a77c2bd61a874cfb06a939d1c4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Fri, 19 Sep 2025 10:12:16 -0400 Subject: [PATCH 05/13] WIP --- pymongo/asynchronous/client_session.py | 1 - pymongo/asynchronous/mongo_client.py | 1 - pymongo/synchronous/change_stream.py | 2 +- pymongo/synchronous/client_session.py | 3 +++ pymongo/synchronous/collection.py | 2 +- pymongo/synchronous/command_cursor.py | 1 + pymongo/synchronous/mongo_client.py | 2 ++ 7 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 63a33ce00e..43621b7600 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -137,7 +137,6 @@ import collections import time -import traceback import uuid from collections.abc import Mapping as _Mapping from typing import ( diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 43ef3c60f7..872bf91a6b 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -35,7 +35,6 @@ import asyncio import contextlib import os -import traceback import warnings import weakref from collections import defaultdict diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index 35bca9926c..7e34d7b848 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -255,7 +255,7 @@ def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCurso ) def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] - with self._client._tmp_session(self._session, close=False) as s: + with self._client._tmp_session(self._session) as s: return self._run_aggregation_cmd(session=s) def _resume(self) -> None: diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 7d9f97fd83..fa946fce2c 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -536,11 +536,14 @@ def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections if self._server_session is not None: + # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: if self._server_session is None: + # print(f"Session {self} is already ended") + # print(f"Session {self} is already ended, implicit: {self._implicit}, attached: {self._attached_to_cursor}: {''.join(traceback.format_stack())}") raise InvalidOperation("Cannot use ended session") def __enter__(self) -> ClientSession: diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index 99f9025548..b68e4befed 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -2577,7 +2577,7 @@ def _cmd( cmd_cursor._maybe_pin_connection(conn) return cmd_cursor - with self._database.client._tmp_session(session, False) as s: + with self._database.client._tmp_session(session) as s: return self._database.client._retryable_read( _cmd, read_pref, s, operation=_Op.LIST_INDEXES ) diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index b5665d4b42..232ee10b3c 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -242,6 +242,7 @@ def _die_lock(self) -> None: def _end_session(self) -> None: if self._session: if self._session.implicit: + # print(f"Ending session {self}, session: {self._session}") self._session._end_implicit_session() self._session = None diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index f49e64a2b1..fde8e7f363 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2080,6 +2080,7 @@ def _retryable_write( :param bulk: bulk abstraction to execute operations in bulk, defaults to None """ with self._tmp_session(session) as s: + # print(f"Called retryable write with session = {session!r} and got {s}: {s._server_session!r}") return self._retry_with_session(retryable, func, s, bulk, operation, operation_id) def _cleanup_cursor_no_lock( @@ -2286,6 +2287,7 @@ def _tmp_session( finally: # Call end_session when we exit this scope. if not s.attached_to_cursor: + # print(f"Ending session {s}: {''.join(traceback.format_stack(limit=10))}") s.end_session() else: yield None From 8026e7aab825a47006522a1cbf36439729328eb8 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 10:24:39 -0400 Subject: [PATCH 06/13] Flag implicit sessions used by retryable operations that also use CommandCursors --- pymongo/asynchronous/client_session.py | 15 ++++++++++++++- pymongo/asynchronous/command_cursor.py | 7 +++++-- pymongo/asynchronous/database.py | 2 +- pymongo/synchronous/client_session.py | 15 ++++++++++++++- pymongo/synchronous/command_cursor.py | 7 +++++-- pymongo/synchronous/database.py | 2 +- 6 files changed, 40 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 43621b7600..b4565e90ff 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -514,6 +514,7 @@ def __init__( self._implicit = implicit self._transaction = _Transaction(None, client) self._attached_to_cursor = False + self._leave_alive = False async def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -536,7 +537,7 @@ async def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections - if self._server_session is not None: + if not self._leave_alive and self._server_session is not None: # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None @@ -606,6 +607,18 @@ def attached_to_cursor(self) -> bool: def attached_to_cursor(self, value: bool) -> None: self._attached_to_cursor = value + @property + def leave_alive(self) -> bool: + """Whether to leave this session alive when it is + no longer in use. + Typically used for implicit sessions that are used for multiple operations within a single larger operation. + """ + return self._leave_alive + + @leave_alive.setter + def leave_alive(self, value: bool) -> None: + self._leave_alive = value + def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 08b07de2e1..ac2c5d7a9a 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -65,6 +65,7 @@ def __init__( max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, comment: Any = None, + leave_session_alive: bool = False, ) -> None: """Create a new command cursor.""" self._sock_mgr: Any = None @@ -83,6 +84,7 @@ def __init__( self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment + self._leave_session_alive = leave_session_alive if self._killed: self._end_session() @@ -240,8 +242,9 @@ async def _die_lock(self) -> None: self._sock_mgr = None def _end_session(self) -> None: - if self._session: - if self._session.implicit: + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + if not self._leave_session_alive: # print(f"Ending session {self}, session: {self._session}") self._session._end_implicit_session() self._session = None diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index 7ba65133fa..501051e9d3 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -611,6 +611,7 @@ async def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) async with self._client._tmp_session(session) as s: + s.leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( @@ -705,7 +706,6 @@ async def aggregate( AsyncCommandCursor, pipeline, kwargs, - session is not None, user_fields={"cursor": {"firstBatch": 1}}, ) return await self.client._retryable_read( diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index fa946fce2c..48be914f4e 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -513,6 +513,7 @@ def __init__( self._implicit = implicit self._transaction = _Transaction(None, client) self._attached_to_cursor = False + self._leave_alive = False def end_session(self) -> None: """Finish this session. If a transaction has started, abort it. @@ -535,7 +536,7 @@ def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections - if self._server_session is not None: + if not self._leave_alive and self._server_session is not None: # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None @@ -605,6 +606,18 @@ def attached_to_cursor(self) -> bool: def attached_to_cursor(self, value: bool) -> None: self._attached_to_cursor = value + @property + def leave_alive(self) -> bool: + """Whether to leave this session alive when it is + no longer in use. + Typically used for implicit sessions that are used for multiple operations within a single larger operation. + """ + return self._leave_alive + + @leave_alive.setter + def leave_alive(self, value: bool) -> None: + self._leave_alive = value + def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 232ee10b3c..b69b6b7758 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -65,6 +65,7 @@ def __init__( max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, comment: Any = None, + leave_session_alive: bool = False, ) -> None: """Create a new command cursor.""" self._sock_mgr: Any = None @@ -83,6 +84,7 @@ def __init__( self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment + self._leave_session_alive = leave_session_alive if self._killed: self._end_session() @@ -240,8 +242,9 @@ def _die_lock(self) -> None: self._sock_mgr = None def _end_session(self) -> None: - if self._session: - if self._session.implicit: + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + if not self._leave_session_alive: # print(f"Ending session {self}, session: {self._session}") self._session._end_implicit_session() self._session = None diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index bd724c83d4..c8eadb733b 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -611,6 +611,7 @@ def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) with self._client._tmp_session(session) as s: + s.leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( @@ -705,7 +706,6 @@ def aggregate( CommandCursor, pipeline, kwargs, - session is not None, user_fields={"cursor": {"firstBatch": 1}}, ) return self.client._retryable_read( From af572bd7211dd384918ded228c309c7749ab7403 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 13:04:46 -0400 Subject: [PATCH 07/13] Remove debugging --- pymongo/asynchronous/change_stream.py | 1 + pymongo/asynchronous/client_bulk.py | 1 + pymongo/asynchronous/client_session.py | 3 --- pymongo/asynchronous/command_cursor.py | 22 +++++++++------------- pymongo/asynchronous/cursor.py | 12 ++++++------ pymongo/asynchronous/mongo_client.py | 6 ++---- pymongo/synchronous/change_stream.py | 1 + pymongo/synchronous/client_bulk.py | 1 + pymongo/synchronous/client_session.py | 3 --- pymongo/synchronous/command_cursor.py | 22 +++++++++------------- pymongo/synchronous/cursor.py | 12 ++++++------ pymongo/synchronous/mongo_client.py | 6 ++---- 12 files changed, 38 insertions(+), 52 deletions(-) diff --git a/pymongo/asynchronous/change_stream.py b/pymongo/asynchronous/change_stream.py index b2b78b0660..82961a5fd2 100644 --- a/pymongo/asynchronous/change_stream.py +++ b/pymongo/asynchronous/change_stream.py @@ -258,6 +258,7 @@ async def _run_aggregation_cmd( async def _create_cursor(self) -> AsyncCommandCursor: # type: ignore[type-arg] async with self._client._tmp_session(self._session) as s: + s.leave_alive = True return await self._run_aggregation_cmd(session=s) async def _resume(self) -> None: diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 7a54647431..7b853bde0b 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -537,6 +537,7 @@ async def _execute_command( session._start_retryable_write() self.started_retryable_write = True session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) + session.leave_alive = True conn.send_cluster_time(cmd, session, self.client) conn.add_server_api(cmd) # CSOT: apply timeout before encoding the command. diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index b4565e90ff..07f0f790bf 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -538,14 +538,11 @@ async def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections if not self._leave_alive and self._server_session is not None: - # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: if self._server_session is None: - # print(f"Session {self} is already ended") - # print(f"Session {self} is already ended, implicit: {self._implicit}, attached: {self._attached_to_cursor}: {''.join(traceback.format_stack())}") raise InvalidOperation("Cannot use ended session") async def __aenter__(self) -> AsyncClientSession: diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index ac2c5d7a9a..df07ad0013 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -65,7 +65,6 @@ def __init__( max_await_time_ms: Optional[int] = None, session: Optional[AsyncClientSession] = None, comment: Any = None, - leave_session_alive: bool = False, ) -> None: """Create a new command cursor.""" self._sock_mgr: Any = None @@ -84,7 +83,6 @@ def __init__( self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment - self._leave_session_alive = leave_session_alive if self._killed: self._end_session() @@ -222,9 +220,9 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None async def _die_lock(self) -> None: @@ -236,18 +234,16 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session.implicit: + if self._session and self._session.implicit and not self._session.leave_alive: self._session.attached_to_cursor = False - if not self._leave_session_alive: - # print(f"Ending session {self}, session: {self._session}") - self._session._end_implicit_session() - self._session = None + self._session._end_implicit_session() + self._session = None async def close(self) -> None: """Explicitly close / kill this cursor.""" diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index bb3454a53a..bb632b1d41 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -1035,9 +1035,9 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None async def _die_lock(self) -> None: @@ -1055,9 +1055,9 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None async def close(self) -> None: diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 872bf91a6b..3f4bdb1c61 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2084,7 +2084,6 @@ async def _retryable_write( :param bulk: bulk abstraction to execute operations in bulk, defaults to None """ async with self._tmp_session(session) as s: - # print(f"Called retryable write with session = {session!r} and got {s}: {s._server_session!r}") return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id) def _cleanup_cursor_no_lock( @@ -2107,7 +2106,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session.implicit: + if session and session.implicit and not session.leave_alive: session._end_implicit_session() async def _cleanup_cursor_lock( @@ -2139,7 +2138,7 @@ async def _cleanup_cursor_lock( await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: await conn_mgr.close() - if session and session.implicit: + if session and session.implicit and not session.leave_alive: session._end_implicit_session() async def _close_cursor_now( @@ -2291,7 +2290,6 @@ async def _tmp_session( finally: # Call end_session when we exit this scope. if not s.attached_to_cursor: - # print(f"Ending session {s}: {''.join(traceback.format_stack(limit=10))}") await s.end_session() else: yield None diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index 7e34d7b848..431e2e903e 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -256,6 +256,7 @@ def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCurso def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] with self._client._tmp_session(self._session) as s: + s.leave_alive = True return self._run_aggregation_cmd(session=s) def _resume(self) -> None: diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index 794708b653..f34f48057f 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -535,6 +535,7 @@ def _execute_command( session._start_retryable_write() self.started_retryable_write = True session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) + session.leave_alive = True conn.send_cluster_time(cmd, session, self.client) conn.add_server_api(cmd) # CSOT: apply timeout before encoding the command. diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 48be914f4e..cfff73519e 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -537,14 +537,11 @@ def _end_session(self, lock: bool) -> None: def _end_implicit_session(self) -> None: # Implicit sessions can't be part of transactions or pinned connections if not self._leave_alive and self._server_session is not None: - # print(f"Ending session {self}, implicit: {self._implicit}, attached: {self._attached_to_cursor}") self._client._return_server_session(self._server_session) self._server_session = None def _check_ended(self) -> None: if self._server_session is None: - # print(f"Session {self} is already ended") - # print(f"Session {self} is already ended, implicit: {self._implicit}, attached: {self._attached_to_cursor}: {''.join(traceback.format_stack())}") raise InvalidOperation("Cannot use ended session") def __enter__(self) -> ClientSession: diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index b69b6b7758..31b2d5ad57 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -65,7 +65,6 @@ def __init__( max_await_time_ms: Optional[int] = None, session: Optional[ClientSession] = None, comment: Any = None, - leave_session_alive: bool = False, ) -> None: """Create a new command cursor.""" self._sock_mgr: Any = None @@ -84,7 +83,6 @@ def __init__( self._session.attached_to_cursor = True self._killed = self._id == 0 self._comment = comment - self._leave_session_alive = leave_session_alive if self._killed: self._end_session() @@ -222,9 +220,9 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None def _die_lock(self) -> None: @@ -236,18 +234,16 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session.implicit: + if self._session and self._session.implicit and not self._session.leave_alive: self._session.attached_to_cursor = False - if not self._leave_session_alive: - # print(f"Ending session {self}, session: {self._session}") - self._session._end_implicit_session() - self._session = None + self._session._end_implicit_session() + self._session = None def close(self) -> None: """Explicitly close / kill this cursor.""" diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 1355969c48..4188fed528 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -1033,9 +1033,9 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None def _die_lock(self) -> None: @@ -1053,9 +1053,9 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session: - if self._session.implicit: - self._session = None + if self._session and self._session.implicit: + self._session.attached_to_cursor = False + self._session = None self._sock_mgr = None def close(self) -> None: diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index fde8e7f363..c642b6ca16 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2080,7 +2080,6 @@ def _retryable_write( :param bulk: bulk abstraction to execute operations in bulk, defaults to None """ with self._tmp_session(session) as s: - # print(f"Called retryable write with session = {session!r} and got {s}: {s._server_session!r}") return self._retry_with_session(retryable, func, s, bulk, operation, operation_id) def _cleanup_cursor_no_lock( @@ -2103,7 +2102,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session.implicit: + if session and session.implicit and not session.leave_alive: session._end_implicit_session() def _cleanup_cursor_lock( @@ -2135,7 +2134,7 @@ def _cleanup_cursor_lock( self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: conn_mgr.close() - if session and session.implicit: + if session and session.implicit and not session.leave_alive: session._end_implicit_session() def _close_cursor_now( @@ -2287,7 +2286,6 @@ def _tmp_session( finally: # Call end_session when we exit this scope. if not s.attached_to_cursor: - # print(f"Ending session {s}: {''.join(traceback.format_stack(limit=10))}") s.end_session() else: yield None From 55aaee3efa00fa8f94dc32819be3e872d311f860 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 13:18:03 -0400 Subject: [PATCH 08/13] Fix typing --- pymongo/asynchronous/change_stream.py | 3 ++- pymongo/asynchronous/command_cursor.py | 2 +- pymongo/asynchronous/database.py | 3 ++- pymongo/synchronous/change_stream.py | 3 ++- pymongo/synchronous/command_cursor.py | 2 +- pymongo/synchronous/database.py | 3 ++- 6 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pymongo/asynchronous/change_stream.py b/pymongo/asynchronous/change_stream.py index 82961a5fd2..7c2a0cede3 100644 --- a/pymongo/asynchronous/change_stream.py +++ b/pymongo/asynchronous/change_stream.py @@ -258,7 +258,8 @@ async def _run_aggregation_cmd( async def _create_cursor(self) -> AsyncCommandCursor: # type: ignore[type-arg] async with self._client._tmp_session(self._session) as s: - s.leave_alive = True + if s: + s.leave_alive = True return await self._run_aggregation_cmd(session=s) async def _resume(self) -> None: diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index df07ad0013..8f80a0a101 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -197,7 +197,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if not self._session.implicit: + if self._session and not self._session.implicit: return self._session return None diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index 501051e9d3..7f6a90ad3b 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -611,7 +611,8 @@ async def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) async with self._client._tmp_session(session) as s: - s.leave_alive = True + if s: + s.leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index 431e2e903e..c0cfa98d60 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -256,7 +256,8 @@ def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCurso def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] with self._client._tmp_session(self._session) as s: - s.leave_alive = True + if s: + s.leave_alive = True return self._run_aggregation_cmd(session=s) def _resume(self) -> None: diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 31b2d5ad57..0a70186ce8 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -197,7 +197,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if not self._session.implicit: + if self._session and not self._session.implicit: return self._session return None diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index c8eadb733b..a72769d224 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -611,7 +611,8 @@ def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) with self._client._tmp_session(session) as s: - s.leave_alive = True + if s: + s.leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( From 9a274453bb2d567f25e79a9addb45304a16cd3b6 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 14:23:50 -0400 Subject: [PATCH 09/13] Remove leave_alive from changestream --- pymongo/asynchronous/change_stream.py | 2 -- pymongo/synchronous/change_stream.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/pymongo/asynchronous/change_stream.py b/pymongo/asynchronous/change_stream.py index 7c2a0cede3..b2b78b0660 100644 --- a/pymongo/asynchronous/change_stream.py +++ b/pymongo/asynchronous/change_stream.py @@ -258,8 +258,6 @@ async def _run_aggregation_cmd( async def _create_cursor(self) -> AsyncCommandCursor: # type: ignore[type-arg] async with self._client._tmp_session(self._session) as s: - if s: - s.leave_alive = True return await self._run_aggregation_cmd(session=s) async def _resume(self) -> None: diff --git a/pymongo/synchronous/change_stream.py b/pymongo/synchronous/change_stream.py index c0cfa98d60..7e34d7b848 100644 --- a/pymongo/synchronous/change_stream.py +++ b/pymongo/synchronous/change_stream.py @@ -256,8 +256,6 @@ def _run_aggregation_cmd(self, session: Optional[ClientSession]) -> CommandCurso def _create_cursor(self) -> CommandCursor: # type: ignore[type-arg] with self._client._tmp_session(self._session) as s: - if s: - s.leave_alive = True return self._run_aggregation_cmd(session=s) def _resume(self) -> None: From cee6c5a28ac7b8430821a30f9622188ee3775b5f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 14:26:39 -0400 Subject: [PATCH 10/13] Make new APIS private --- pymongo/asynchronous/client_session.py | 8 ++++---- pymongo/asynchronous/command_cursor.py | 16 ++++++++-------- pymongo/asynchronous/cursor.py | 16 ++++++++-------- pymongo/asynchronous/mongo_client.py | 6 +++--- pymongo/synchronous/client_session.py | 8 ++++---- pymongo/synchronous/command_cursor.py | 16 ++++++++-------- pymongo/synchronous/cursor.py | 16 ++++++++-------- pymongo/synchronous/mongo_client.py | 6 +++--- 8 files changed, 46 insertions(+), 46 deletions(-) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 07f0f790bf..3b8fb6aff0 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -591,17 +591,17 @@ def operation_time(self) -> Optional[Timestamp]: return self._operation_time @property - def implicit(self) -> bool: + def _is_implicit(self) -> bool: """Whether this session was implicitly created by the driver.""" return self._implicit @property - def attached_to_cursor(self) -> bool: + def _is_attached_to_cursor(self) -> bool: """Whether this session is owned by a cursor.""" return self._attached_to_cursor - @attached_to_cursor.setter - def attached_to_cursor(self, value: bool) -> None: + @_is_attached_to_cursor.setter + def _is_attached_to_cursor(self, value: bool) -> None: self._attached_to_cursor = value @property diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 8f80a0a101..31a8ba4b25 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -80,7 +80,7 @@ def __init__( self._timeout = self._collection.database.client.options.timeout self._session = session if self._session is not None: - self._session.attached_to_cursor = True + self._session._is_attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: return self._session return None @@ -220,8 +220,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None @@ -234,14 +234,14 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session.implicit and not self._session.leave_alive: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit and not self._session.leave_alive: + self._session._is_attached_to_cursor = False self._session._end_implicit_session() self._session = None diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index bb632b1d41..3788be1184 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -138,7 +138,7 @@ def __init__( if session: self._session = session - self._session.attached_to_cursor = True + self._session._is_attached_to_cursor = True else: self._session = None @@ -149,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and self._session and self._session.implicit: + if no_cursor_timeout and self._session and self._session._is_implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -282,7 +282,7 @@ def clone(self) -> AsyncCursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[AsyncCursor] = None) -> AsyncCursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -944,7 +944,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: return self._session return None @@ -1035,8 +1035,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1055,8 +1055,8 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 3f4bdb1c61..975dbd2758 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2106,7 +2106,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session.implicit and not session.leave_alive: + if session and session._is_implicit and not session.leave_alive: session._end_implicit_session() async def _cleanup_cursor_lock( @@ -2138,7 +2138,7 @@ async def _cleanup_cursor_lock( await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: await conn_mgr.close() - if session and session.implicit and not session.leave_alive: + if session and session._is_implicit and not session.leave_alive: session._end_implicit_session() async def _close_cursor_now( @@ -2289,7 +2289,7 @@ async def _tmp_session( raise finally: # Call end_session when we exit this scope. - if not s.attached_to_cursor: + if not s._is_attached_to_cursor: await s.end_session() else: yield None diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index cfff73519e..7402144bc0 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -590,17 +590,17 @@ def operation_time(self) -> Optional[Timestamp]: return self._operation_time @property - def implicit(self) -> bool: + def _is_implicit(self) -> bool: """Whether this session was implicitly created by the driver.""" return self._implicit @property - def attached_to_cursor(self) -> bool: + def _is_attached_to_cursor(self) -> bool: """Whether this session is owned by a cursor.""" return self._attached_to_cursor - @attached_to_cursor.setter - def attached_to_cursor(self, value: bool) -> None: + @_is_attached_to_cursor.setter + def _is_attached_to_cursor(self, value: bool) -> None: self._attached_to_cursor = value @property diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 0a70186ce8..743843ba77 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -80,7 +80,7 @@ def __init__( self._timeout = self._collection.database.client.options.timeout self._session = session if self._session is not None: - self._session.attached_to_cursor = True + self._session._is_attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: return self._session return None @@ -220,8 +220,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None @@ -234,14 +234,14 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session.implicit and not self._session.leave_alive: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit and not self._session.leave_alive: + self._session._is_attached_to_cursor = False self._session._end_implicit_session() self._session = None diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 4188fed528..12c49ead5c 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -138,7 +138,7 @@ def __init__( if session: self._session = session - self._session.attached_to_cursor = True + self._session._is_attached_to_cursor = True else: self._session = None @@ -149,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and self._session and self._session.implicit: + if no_cursor_timeout and self._session and self._session._is_implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -282,7 +282,7 @@ def clone(self) -> Cursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[Cursor] = None) -> Cursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -942,7 +942,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session.implicit: + if self._session and not self._session._is_implicit: return self._session return None @@ -1033,8 +1033,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1053,8 +1053,8 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session.implicit: - self._session.attached_to_cursor = False + if self._session and self._session._is_implicit: + self._session._is_attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index c642b6ca16..062fe299ec 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2102,7 +2102,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session.implicit and not session.leave_alive: + if session and session._is_implicit and not session.leave_alive: session._end_implicit_session() def _cleanup_cursor_lock( @@ -2134,7 +2134,7 @@ def _cleanup_cursor_lock( self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: conn_mgr.close() - if session and session.implicit and not session.leave_alive: + if session and session._is_implicit and not session.leave_alive: session._end_implicit_session() def _close_cursor_now( @@ -2285,7 +2285,7 @@ def _tmp_session( raise finally: # Call end_session when we exit this scope. - if not s.attached_to_cursor: + if not s._is_attached_to_cursor: s.end_session() else: yield None From 6f5bda7f6280492e1ca39d941591ea6bce4eb260 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 16:45:52 -0400 Subject: [PATCH 11/13] Remove property decorators --- pymongo/asynchronous/client_bulk.py | 2 +- pymongo/asynchronous/client_session.py | 28 ++------------------------ pymongo/asynchronous/command_cursor.py | 16 +++++++-------- pymongo/asynchronous/cursor.py | 16 +++++++-------- pymongo/asynchronous/database.py | 4 ++-- pymongo/asynchronous/mongo_client.py | 6 +++--- pymongo/synchronous/client_bulk.py | 2 +- pymongo/synchronous/client_session.py | 28 ++------------------------ pymongo/synchronous/command_cursor.py | 16 +++++++-------- pymongo/synchronous/cursor.py | 16 +++++++-------- pymongo/synchronous/database.py | 4 ++-- pymongo/synchronous/mongo_client.py | 6 +++--- 12 files changed, 48 insertions(+), 96 deletions(-) diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 7b853bde0b..db239505d3 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -537,7 +537,7 @@ async def _execute_command( session._start_retryable_write() self.started_retryable_write = True session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) - session.leave_alive = True + session._leave_alive = True conn.send_cluster_time(cmd, session, self.client) conn.add_server_api(cmd) # CSOT: apply timeout before encoding the command. diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index 3b8fb6aff0..8674e98447 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -513,7 +513,9 @@ def __init__( # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + # Is this session attached to a cursor? self._attached_to_cursor = False + # Should we leave the session alive when the cursor is closed? self._leave_alive = False async def end_session(self) -> None: @@ -590,32 +592,6 @@ def operation_time(self) -> Optional[Timestamp]: """ return self._operation_time - @property - def _is_implicit(self) -> bool: - """Whether this session was implicitly created by the driver.""" - return self._implicit - - @property - def _is_attached_to_cursor(self) -> bool: - """Whether this session is owned by a cursor.""" - return self._attached_to_cursor - - @_is_attached_to_cursor.setter - def _is_attached_to_cursor(self, value: bool) -> None: - self._attached_to_cursor = value - - @property - def leave_alive(self) -> bool: - """Whether to leave this session alive when it is - no longer in use. - Typically used for implicit sessions that are used for multiple operations within a single larger operation. - """ - return self._leave_alive - - @leave_alive.setter - def leave_alive(self, value: bool) -> None: - self._leave_alive = value - def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index 31a8ba4b25..ce936376c8 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -80,7 +80,7 @@ def __init__( self._timeout = self._collection.database.client.options.timeout self._session = session if self._session is not None: - self._session._is_attached_to_cursor = True + self._session._attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: return self._session return None @@ -220,8 +220,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -234,14 +234,14 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session._is_implicit and not self._session.leave_alive: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit and not self._session._leave_alive: + self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 3788be1184..df060a4fa9 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -138,7 +138,7 @@ def __init__( if session: self._session = session - self._session._is_attached_to_cursor = True + self._session._attached_to_cursor = True else: self._session = None @@ -149,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and self._session and self._session._is_implicit: + if no_cursor_timeout and self._session and self._session._implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -282,7 +282,7 @@ def clone(self) -> AsyncCursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[AsyncCursor] = None) -> AsyncCursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -944,7 +944,7 @@ def session(self) -> Optional[AsyncClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: return self._session return None @@ -1035,8 +1035,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1055,8 +1055,8 @@ async def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index 7f6a90ad3b..7bc2e8ad9d 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -611,8 +611,8 @@ async def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) async with self._client._tmp_session(session) as s: - if s: - s.leave_alive = True + if s and not s.in_transaction: + s._leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 975dbd2758..d9bf808d55 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2106,7 +2106,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session._is_implicit and not session.leave_alive: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() async def _cleanup_cursor_lock( @@ -2138,7 +2138,7 @@ async def _cleanup_cursor_lock( await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: await conn_mgr.close() - if session and session._is_implicit and not session.leave_alive: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() async def _close_cursor_now( @@ -2289,7 +2289,7 @@ async def _tmp_session( raise finally: # Call end_session when we exit this scope. - if not s._is_attached_to_cursor: + if not s._attached_to_cursor: await s.end_session() else: yield None diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index f34f48057f..ba5d88994a 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -535,7 +535,7 @@ def _execute_command( session._start_retryable_write() self.started_retryable_write = True session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn) - session.leave_alive = True + session._leave_alive = True conn.send_cluster_time(cmd, session, self.client) conn.add_server_api(cmd) # CSOT: apply timeout before encoding the command. diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 7402144bc0..9b547dc946 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -512,7 +512,9 @@ def __init__( # Is this an implicitly created session? self._implicit = implicit self._transaction = _Transaction(None, client) + # Is this session attached to a cursor? self._attached_to_cursor = False + # Should we leave the session alive when the cursor is closed? self._leave_alive = False def end_session(self) -> None: @@ -589,32 +591,6 @@ def operation_time(self) -> Optional[Timestamp]: """ return self._operation_time - @property - def _is_implicit(self) -> bool: - """Whether this session was implicitly created by the driver.""" - return self._implicit - - @property - def _is_attached_to_cursor(self) -> bool: - """Whether this session is owned by a cursor.""" - return self._attached_to_cursor - - @_is_attached_to_cursor.setter - def _is_attached_to_cursor(self, value: bool) -> None: - self._attached_to_cursor = value - - @property - def leave_alive(self) -> bool: - """Whether to leave this session alive when it is - no longer in use. - Typically used for implicit sessions that are used for multiple operations within a single larger operation. - """ - return self._leave_alive - - @leave_alive.setter - def leave_alive(self, value: bool) -> None: - self._leave_alive = value - def _inherit_option(self, name: str, val: _T) -> _T: """Return the inherited TransactionOption value.""" if val: diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 743843ba77..84c959ad59 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -80,7 +80,7 @@ def __init__( self._timeout = self._collection.database.client.options.timeout self._session = session if self._session is not None: - self._session._is_attached_to_cursor = True + self._session._attached_to_cursor = True self._killed = self._id == 0 self._comment = comment if self._killed: @@ -197,7 +197,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: return self._session return None @@ -220,8 +220,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -234,14 +234,14 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None def _end_session(self) -> None: - if self._session and self._session._is_implicit and not self._session.leave_alive: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit and not self._session._leave_alive: + self._session._attached_to_cursor = False self._session._end_implicit_session() self._session = None diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index 12c49ead5c..2cecc5b38a 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -138,7 +138,7 @@ def __init__( if session: self._session = session - self._session._is_attached_to_cursor = True + self._session._attached_to_cursor = True else: self._session = None @@ -149,7 +149,7 @@ def __init__( if not isinstance(limit, int): raise TypeError(f"limit must be an instance of int, not {type(limit)}") validate_boolean("no_cursor_timeout", no_cursor_timeout) - if no_cursor_timeout and self._session and self._session._is_implicit: + if no_cursor_timeout and self._session and self._session._implicit: warnings.warn( "use an explicit session with no_cursor_timeout=True " "otherwise the cursor may still timeout after " @@ -282,7 +282,7 @@ def clone(self) -> Cursor[_DocumentType]: def _clone(self, deepcopy: bool = True, base: Optional[Cursor] = None) -> Cursor: # type: ignore[type-arg] """Internal clone helper.""" if not base: - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: base = self._clone_base(self._session) else: base = self._clone_base(None) @@ -942,7 +942,7 @@ def session(self) -> Optional[ClientSession]: .. versionadded:: 3.6 """ - if self._session and not self._session._is_implicit: + if self._session and not self._session._implicit: return self._session return None @@ -1033,8 +1033,8 @@ def _die_no_lock(self) -> None: self._collection.database.client._cleanup_cursor_no_lock( cursor_id, address, self._sock_mgr, self._session ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None @@ -1053,8 +1053,8 @@ def _die_lock(self) -> None: self._sock_mgr, self._session, ) - if self._session and self._session._is_implicit: - self._session._is_attached_to_cursor = False + if self._session and self._session._implicit: + self._session._attached_to_cursor = False self._session = None self._sock_mgr = None diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index a72769d224..78a3daaadb 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -611,8 +611,8 @@ def create_collection( common.validate_is_mapping("clusteredIndex", clustered_index) with self._client._tmp_session(session) as s: - if s: - s.leave_alive = True + if s and not s.in_transaction: + s._leave_alive = True # Skip this check in a transaction where listCollections is not # supported. if ( diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 062fe299ec..6e716402f4 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2102,7 +2102,7 @@ def _cleanup_cursor_no_lock( # The cursor will be closed later in a different session. if cursor_id or conn_mgr: self._close_cursor_soon(cursor_id, address, conn_mgr) - if session and session._is_implicit and not session.leave_alive: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() def _cleanup_cursor_lock( @@ -2134,7 +2134,7 @@ def _cleanup_cursor_lock( self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr) if conn_mgr: conn_mgr.close() - if session and session._is_implicit and not session.leave_alive: + if session and session._implicit and not session._leave_alive: session._end_implicit_session() def _close_cursor_now( @@ -2285,7 +2285,7 @@ def _tmp_session( raise finally: # Call end_session when we exit this scope. - if not s._is_attached_to_cursor: + if not s._attached_to_cursor: s.end_session() else: yield None From 9479af6b50276a375fb25b7008e73d568b5d3aa0 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 16:46:10 -0400 Subject: [PATCH 12/13] Try to reproduce on GHA --- .github/workflows/test-python.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 6499e8ba8d..c3d1c23a69 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -74,7 +74,7 @@ jobs: with: version: "${{ matrix.mongodb-version }}" - name: Run tests - run: uv run --extra test pytest -v + run: uv run --extra test pytest -v test/asynchronous/test_session.py test/test_session.py doctest: runs-on: ubuntu-latest From d95ce5d8fac67ee106b3518f8974653645ef9834 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 22 Sep 2025 16:50:21 -0400 Subject: [PATCH 13/13] revert workflow changes --- .github/workflows/test-python.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index c3d1c23a69..6499e8ba8d 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -74,7 +74,7 @@ jobs: with: version: "${{ matrix.mongodb-version }}" - name: Run tests - run: uv run --extra test pytest -v test/asynchronous/test_session.py test/test_session.py + run: uv run --extra test pytest -v doctest: runs-on: ubuntu-latest