Skip to content

Commit

Permalink
MOTOR-209 Wrap ClientSession in Motor class
Browse files Browse the repository at this point in the history
In Motor 1.2 we supported logical sessions using the ClientSession class
from PyMongo - there was no need to wrap it because it does no I/O and
has no async methods of its own.

Now, closing a session that is in a transaction sends a command that
commits or aborts the transaction, and the application must await the
outcome. Therefore, I wrap ClientSession in a new MotorClientSession
class and apps must use it in "async with" instead of "with".

This incompatible change motivates Motor 2.0.
  • Loading branch information
ajdavis committed Jun 9, 2018
1 parent 7c58cbf commit bba885f
Show file tree
Hide file tree
Showing 20 changed files with 341 additions and 38 deletions.
5 changes: 5 additions & 0 deletions doc/api-asyncio/asyncio_motor_client_session.rst
@@ -0,0 +1,5 @@
:class:`~motor.motor_asyncio.AsyncIOMotorClientSession` -- Sequence of operations
=================================================================================

.. autoclass:: motor.motor_asyncio.AsyncIOMotorClientSession
:members:
1 change: 1 addition & 0 deletions doc/api-asyncio/index.rst
Expand Up @@ -4,6 +4,7 @@ Motor asyncio API
.. toctree::

asyncio_motor_client
asyncio_motor_client_session
asyncio_motor_database
asyncio_motor_collection
asyncio_motor_cursor
Expand Down
1 change: 1 addition & 0 deletions doc/api-tornado/index.rst
Expand Up @@ -4,6 +4,7 @@ Motor Tornado API
.. toctree::

motor_client
motor_client_session
motor_database
motor_collection
motor_cursor
Expand Down
7 changes: 7 additions & 0 deletions doc/api-tornado/motor_client_session.rst
@@ -0,0 +1,7 @@
:class:`~motor.motor_tornado.MotorClientSession` -- Sequence of operations
==========================================================================

.. currentmodule:: motor.motor_tornado

.. autoclass:: motor.motor_tornado.MotorClientSession
:members:
1 change: 1 addition & 0 deletions doc/developer-guide.rst
Expand Up @@ -37,6 +37,7 @@ is a module implementing these properties and functions:
- ``get_future``
- ``is_event_loop``
- ``is_future``
- ``mark_coroutine``
- ``pymongo_class_wrapper``
- ``run_on_executor``
- ``yieldable``
Expand Down
4 changes: 3 additions & 1 deletion doc/requirements.rst
Expand Up @@ -8,7 +8,7 @@ The current version of Motor requires:

Motor can integrate with either Tornado or asyncio.

Requires the `futures`_ package from PyPI on Python 2.
Requires the `futures`_ and `singledispatch`_ packages from PyPI on Python 2.

The default authentication mechanism for MongoDB 3.0+ is SCRAM-SHA-1.
Install `backports.pbkdf2`_ for faster authentication with MongoDB 3.0+,
Expand All @@ -23,6 +23,8 @@ Building the docs requires `sphinx`_.

.. _futures: https://pypi.python.org/pypi/futures

.. _singledispatch: https://pypi.python.org/pypi/singledispatch

.. _backports.pbkdf2: https://pypi.python.org/pypi/backports.pbkdf2/

.. _sphinx: http://sphinx.pocoo.org/
Expand Down
162 changes: 157 additions & 5 deletions motor/core.py
Expand Up @@ -32,6 +32,7 @@
from pymongo.bulk import BulkOperationBuilder
from pymongo.database import Database
from pymongo.change_stream import ChangeStream
from pymongo.client_session import ClientSession
from pymongo.collection import Collection
from pymongo.cursor import Cursor, _QUERY_OPTIONS
from pymongo.command_cursor import CommandCursor
Expand All @@ -44,7 +45,9 @@
DelegateMethod,
motor_coroutine,
MotorCursorChainingMethod,
ReadOnlyProperty)
ReadOnlyProperty,
unwrap_args_session,
unwrap_kwargs_session)
from .motor_common import callback_type_error
from motor.docstrings import *

Expand Down Expand Up @@ -115,8 +118,8 @@ class AgnosticClient(AgnosticBaseProperties):
secondaries = ReadOnlyProperty()
server_info = AsyncRead()
server_selection_timeout = ReadOnlyProperty()
start_session = AsyncRead(doc=start_session_doc)
unlock = AsyncCommand()
_start_session = AsyncCommand(attr_name='start_session')

def __init__(self, *args, **kwargs):
"""Create a new connection to a single MongoDB instance at *host:port*.
Expand Down Expand Up @@ -161,6 +164,12 @@ def __getitem__(self, name):

return db_class(self, name)

def start_session(self, *args, **kwargs):
session_class = create_class_with_framework(
AgnosticClientSession, self._framework, self.__module__)

return session_class(self, *args, **kwargs)

def wrap(self, obj):
if obj.__class__ == Database:
db_class = create_class_with_framework(
Expand All @@ -176,6 +185,146 @@ def wrap(self, obj):
self.__module__)

return command_cursor_class(obj, self)
elif obj.__class__ == ClientSession:
session_class = create_class_with_framework(
AgnosticClientSession,
self._framework,
self.__module__)

return session_class(obj, self)


class AgnosticClientSession(AgnosticBase):
"""A session for ordering sequential operations.
See :meth:`MotorClient.start_session`.
.. versionadded:: 2.0
"""

__motor_class_name__ = 'MotorClientSession'
__delegate_class__ = ClientSession


def __init__(self, motor_client, *args, **kwargs):
# We can't do I/O in the constructor; create the delegate in __iter__.
AgnosticBase.__init__(self, delegate=None)
self._client = motor_client
self.args = args
self.kwargs = kwargs

def get_io_loop(self):
return self._client.get_io_loop()

@property
def client(self):
"""The :class:`~MotorClient` this session was created from. """
return self._client

@property
def cluster_time(self):
"""The cluster time returned by the last operation in this session."""
self._check_started()
return self.delegate.cluster_time

@property
def has_ended(self):
"""True if this session is finished."""
self._check_started()
return self.delegate.has_ended

@property
def options(self):
"""The :class:`SessionOptions` this session was created with."""
self._check_started()
return self.delegate.options

@property
def operation_time(self):
"""The operation time returned by the last operation in this session."""
self._check_started()
return self.delegate.operation_time

@property
def session_id(self):
"""A BSON document, the opaque server session identifier."""
self._check_started()
return self.delegate.session_id

def advance_cluster_time(self, cluster_time):
"""Update the cluster time for this session.
:Parameters:
- `cluster_time`: The :data:`~MotorClientSession.cluster_time` from
another :class:`MotorClientSession` instance.
"""
self._check_started()
return self.delegate.advance_cluster_time(cluster_time)

def advance_operation_time(self, operation_time):
"""Update the operation time for this session.
:Parameters:
- `operation_time`: The :data:`~MotorClientSession.operation_time`
from another :class:`MotorClientSession` instance.
"""
self._check_started()
return self.delegate.advance_operation_time(operation_time)

@motor_coroutine
def end_session(self):
"""Finish this session. If a transaction has started, abort it.
It is an error to use the session after the session has ended.
"""
self._check_started()
return self._end_session()

def _internal_init(self):
if self.delegate:
raise pymongo.errors.InvalidOperation(
"Session already started, do not use it in an 'await'"
" expression or 'async with' statement again")

io_loop = self.get_io_loop()
original_future = self._framework.get_future(io_loop)
self._framework.add_future(
io_loop,
self._client._start_session(),
self._on_started,
original_future)

return original_future

# In Py 3.4 asyncio, "yield from client.start_session()" starts the session.
def __iter__(self):
return self._internal_init().__iter__()

def _on_started(self, original_future, future):
try:
self.delegate = future.result()
original_future.set_result(self)
except Exception as exc:
original_future.set_exception(exc)

def _check_started(self):
if not self.delegate:
raise pymongo.errors.InvalidOperation(
"Start this session like 's = await client.start_session()' or"
" 'async with client.start_session() as s'")

if PY35:
exec(textwrap.dedent("""
__await__ = __iter__
async def __aenter__(self):
if not self.delegate:
await self
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.delegate.__exit__(exc_type, exc_val, exc_tb)
"""), globals(), locals())


class AgnosticDatabase(AgnosticBaseProperties):
Expand Down Expand Up @@ -388,7 +537,8 @@ def find(self, *args, **kwargs):
raise pymongo.errors.InvalidOperation(
"Pass a callback to each, to_list, or count, not to find.")

cursor = self.delegate.find(*args, **kwargs)
cursor = self.delegate.find(*unwrap_args_session(args),
**unwrap_kwargs_session(kwargs))
cursor_class = create_class_with_framework(
AgnosticCursor, self._framework, self.__module__)

Expand Down Expand Up @@ -453,7 +603,8 @@ async def f():
if kwargs.get('cursor') is False:
kwargs.pop('cursor')
# One-shot aggregation, no cursor. Send command now, return Future.
return self._async_aggregate(pipeline, **kwargs)
return self._async_aggregate(
pipeline, **unwrap_kwargs_session(kwargs))
else:
if 'callback' in kwargs:
raise pymongo.errors.InvalidOperation(
Expand All @@ -464,7 +615,8 @@ async def f():
AgnosticLatentCommandCursor, self._framework, self.__module__)

# Latent cursor that will send initial command on first "async for".
return cursor_class(self, self._async_aggregate, pipeline, **kwargs)
return cursor_class(self, self._async_aggregate, pipeline,
**unwrap_kwargs_session(kwargs))

def watch(self, pipeline=None, full_document='default', resume_after=None,
max_await_time_ms=None, batch_size=None, collation=None,
Expand Down
14 changes: 9 additions & 5 deletions motor/docstrings.py
Expand Up @@ -1001,7 +1001,7 @@ async def add_3_to_x():
async def coro():
collection = client.db.collection
with (await client.start_session()) as s:
async with await client.start_session() as s:
doc = {'_id': ObjectId(), 'x': 1}
await collection.insert_one(doc, session=s)
Expand All @@ -1017,13 +1017,17 @@ async def coro():
Requires MongoDB 3.6. It is an error to call :meth:`start_session`
if this client has been authenticated to multiple databases using the
deprecated method :meth:`~pymongo.database.Database.authenticate`.
deprecated method :meth:`~motor.motor_tornado.MotorDatabase.authenticate`.
A :class:`~pymongo.client_session.ClientSession` may only be used with
the MongoClient that started it.
A :class:`~MotorClientSession` may only be used with the MotorClient that
started it.
:Returns:
An instance of :class:`~pymongo.client_session.ClientSession`.
An instance of :class:`~MotorClientSession`.
.. versionchanged:: 2.0
Returns a :class:`~MotorClientSession`. Before, this
method returned a PyMongo :class:`~pymongo.client_session.ClientSession`.
.. versionadded:: 1.2
"""
4 changes: 4 additions & 0 deletions motor/frameworks/asyncio/__init__.py
Expand Up @@ -136,6 +136,10 @@ def is_future(f):
return isinstance(f, asyncio.Future)


def mark_coroutine(f):
pass


def call_soon(loop, callback, *args, **kwargs):
if kwargs:
loop.call_soon(functools.partial(callback, *args, **kwargs))
Expand Down
4 changes: 4 additions & 0 deletions motor/frameworks/tornado/__init__.py
Expand Up @@ -125,6 +125,10 @@ def is_future(f):
return isinstance(f, concurrent.Future)


def mark_coroutine(f):
f.__tornado_coroutine__ = True


def call_soon(loop, callback, *args, **kwargs):
if args or kwargs:
loop.add_callback(functools.partial(callback, *args, **kwargs))
Expand Down
47 changes: 38 additions & 9 deletions motor/metaprogramming.py
Expand Up @@ -49,20 +49,31 @@ def method(self, *args, **kwargs):
# Don't call isinstance(), not checking subclasses.
unwrapped_args = [
obj.delegate
if obj.__class__.__name__.endswith(unwrap_class)
if obj.__class__.__name__.endswith(
(unwrap_class, 'MotorClientSession'))
else obj
for obj in args]
unwrapped_kwargs = dict([
(key, obj.delegate
if obj.__class__.__name__.endswith(unwrap_class)
else obj)
for key, obj in kwargs.items()])
unwrapped_kwargs = {
key: (obj.delegate
if obj.__class__.__name__.endswith(
(unwrap_class, 'MotorClientSession'))
else obj)
for key, obj in kwargs.items()}
else:
unwrapped_args = args
unwrapped_kwargs = kwargs
# For speed, don't call unwrap_args_session/unwrap_kwargs_session.
unwrapped_args = [
obj.delegate
if obj.__class__.__name__.endswith('MotorClientSession')
else obj
for obj in args]
unwrapped_kwargs = {
key: (obj.delegate
if obj.__class__.__name__.endswith('MotorClientSession')
else obj)
for key, obj in kwargs.items()}

loop = self.get_io_loop()
callback = kwargs.pop('callback', None)
callback = unwrapped_kwargs.pop('callback', None)
future = framework.run_on_executor(loop,
sync_method,
self.delegate,
Expand All @@ -81,12 +92,30 @@ def method(self, *args, **kwargs):
name = sync_method.__name__
method.pymongo_method_name = name

framework.mark_coroutine(method)

if doc is not None:
method.__doc__ = doc

return method


def unwrap_args_session(args):
return (
obj.delegate
if obj.__class__.__name__.endswith('MotorClientSession')
else obj
for obj in args)


def unwrap_kwargs_session(kwargs):
return {
key: (obj.delegate
if obj.__class__.__name__.endswith('MotorClientSession')
else obj)
for key, obj in kwargs.items()}


_coro_token = object()


Expand Down

0 comments on commit bba885f

Please sign in to comment.