Skip to content
Permalink
Browse files

MOTOR-86 Can't return values from internal coros.

Consequences of PEP 479.

Motor not only supports users' coroutines, it uses coroutines to implement of
some of its own features, like client.open() and gridfs.put(). Some of those
coroutines return values.

In Tornado and Python 2.6, Motor's internal coros could return values with
"raise gen.Return(value)", and in Python 2.7 could alternatively use "raise
StopIteration(value)". The latter also works in Python 3.3+ and asyncio, so
this worked everywhere:

@motor_coroutine
def f():
    raise framework.return_value(value)

... where framework.return_value is gen.Return or StopIteration.

In Python 3.5 with asyncio, however, raising StopIteration to return from a
coroutine is prohibited (PEP 492 says, "PEP 479 is enabled by default for
coroutines"). Only "return value" works.

It's possible there's a workaround for now, but it will break in Python 3.7
(PEP 479 transition plan: "Python 3.7: Enable new semantics everywhere"). For
the future's sake, stop returning values from Motor's internal coroutes and use
callbacks and explicit Futures to implement returns.

Still TODO: reimplement parallel_collection_scan.
  • Loading branch information...
ajdavis committed Sep 27, 2015
1 parent 5ed98b6 commit dc19418cf90e86119039ecf4a983324127baed0e
@@ -522,8 +522,7 @@ def __init__(self, *args, **kwargs):
# 'MotorClient' that create_class_with_framework created.
super(self.__class__, self).__init__(io_loop, *args, **kwargs)

@motor_coroutine
def open(self):
def open(self, callback=None):
"""Connect to the server.
Takes an optional callback, or returns a Future that resolves to
@@ -548,8 +547,10 @@ def open(self):
:class:`MotorClient` now opens itself on demand, calling ``open``
explicitly is now optional.
"""
yield self._framework.yieldable(self._ensure_connected())
self._framework.return_value(self)
return self._framework.future_or_callback(self._ensure_connected(),
callback,
self.get_io_loop(),
self)

def _get_member(self):
# TODO: expose the PyMongo Member, or otherwise avoid this.
@@ -600,8 +601,7 @@ def __init__(self, *args, **kwargs):
# 'MotorClient' that create_class_with_framework created.
super(self.__class__, self).__init__(io_loop, *args, **kwargs)

@motor_coroutine
def open(self):
def open(self, callback=None):
"""Connect to the server.
Takes an optional callback, or returns a Future that resolves to
@@ -626,11 +626,10 @@ def open(self):
:class:`MotorReplicaSetClient` now opens itself on demand, calling
``open`` explicitly is now optional.
"""
yield self._framework.yieldable(self._ensure_connected(True))
primary = self._get_member()
if not primary:
raise pymongo.errors.AutoReconnect('no primary is available')
self._framework.return_value(self)
return self._framework.future_or_callback(self._ensure_connected(),
callback,
self.get_io_loop(),
self)

def _get_member(self):
# TODO: expose the PyMongo RSC members, or otherwise avoid this.
@@ -1218,18 +1217,18 @@ def fetch_next(self):
.. _`large batches`: http://docs.mongodb.org/manual/core/read-operations/#cursor-behaviors
"""
future = self._framework.get_future(self.get_io_loop())

if not self._buffer_size() and self.alive:
# Return the Future, which resolves to number of docs fetched or 0.
return self._get_more()
elif self._buffer_size():
future = self._framework.get_future(self.get_io_loop())
future.set_result(True)
return future
else:
# Dead
future = self._framework.get_future(self.get_io_loop())
future.set_result(False)
return future
return future

def next_object(self):
"""Get a document from the most recently fetched batch, or ``None``.
@@ -1320,8 +1319,7 @@ def _each_got_more(self, callback, future):
self.get_io_loop(),
functools.partial(callback, None, None))

@motor_coroutine
def to_list(self, length):
def to_list(self, length, callback=None):
"""Get a list of documents.
.. testsetup:: to_list
@@ -1370,24 +1368,47 @@ def to_list(self, length):
raise pymongo.errors.InvalidOperation(
"Can't call to_list on tailable cursor")

the_list = []
collection = self.collection
fix_outgoing = collection.database.delegate._fix_outgoing
future = self._framework.get_future(self.get_io_loop())

# Run future_or_callback's type checking before we change anything.
retval = self._framework.future_or_callback(future,
callback,
self.get_io_loop())

self.started = True
while True:
yield self._framework.yieldable(self._refresh())
while (self._buffer_size() > 0 and
(length is None or len(the_list) < length)):
the_list = []
self._refresh(callback=functools.partial(self._to_list,
length,
the_list,
future))

doc = self._data().popleft()
the_list.append(fix_outgoing(doc, collection))
return retval

def _to_list(self, length, the_list, future, result, error):
if error:
# TODO: lost exc_info
future.set_exception(error)
else:
collection = self.collection
fix_outgoing = collection.database.delegate._fix_outgoing

if length is None:
n = result
else:
n = min(length, result)

for _ in range(n):
the_list.append(fix_outgoing(self._data().popleft(),
collection))

reached_length = (length is not None and len(the_list) >= length)
if reached_length or not self.alive:
break

self._framework.return_value(the_list)
future.set_result(the_list)
else:
self._refresh(callback=functools.partial(self._to_list,
length,
the_list,
future))

def get_io_loop(self):
return self.collection.get_io_loop()
@@ -39,15 +39,34 @@ def check_event_loop(loop):
"not %r" % loop)


def return_value(value):
# In Python 3.3, StopIteration can accept a value.
raise StopIteration(value)


def get_future(loop):
return asyncio.Future(loop=loop)


_DEFAULT = object()


def future_or_callback(future, callback, loop, return_value=_DEFAULT):
if callback:
raise NotImplementedError("Motor with asyncio prohibits callbacks")

if return_value is _DEFAULT:
return future

chained = asyncio.Future(loop=loop)

def done_callback(_future):
try:
result = _future.result()
chained.set_result(result if return_value is _DEFAULT
else return_value)
except Exception as exc:
chained.set_exception(exc)

future.add_done_callback(done_callback)
return chained


def is_future(f):
return isinstance(f, asyncio.Future)

@@ -92,6 +111,26 @@ def close_resolver(resolver):
coroutine = asyncio.coroutine


def pymongo_class_wrapper(f, pymongo_class):
"""Executes the coroutine f and wraps its result in a Motor class.
See WrapAsync.
"""
@functools.wraps(f)
@asyncio.coroutine
def _wrapper(self, *args, **kwargs):
result = yield from f(self, *args, **kwargs)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == pymongo_class:
# Delegate to the current object to wrap the result.
return self.wrap(result)
else:
return result

return _wrapper


def yieldable(future):
# TODO: really explain.
return next(iter(future))
@@ -51,10 +51,6 @@ def check_event_loop(loop):
"io_loop must be instance of IOLoop, not %r" % loop)


def return_value(value):
raise gen.Return(value)


# Beginning in Tornado 4, TracebackFuture is a deprecated alias for Future.
# Future-proof Motor in case TracebackFuture is some day removed. Remove this
# Future-proofing once we drop support for Tornado 3.
@@ -68,6 +64,44 @@ def get_future(loop):
return _TornadoFuture()


_DEFAULT = object()


def future_or_callback(future, callback, io_loop, return_value=_DEFAULT):
if callback:
if not callable(callback):
raise callback_type_error

# Motor's callback convention is "callback(result, error)".
def done_callback(_future):
try:
result = _future.result()
callback(result if return_value is _DEFAULT else return_value,
None)
except Exception as exc:
callback(None, exc)

future.add_done_callback(done_callback)

elif return_value is not _DEFAULT:
chained = _TornadoFuture()

def done_callback(_future):
try:
result = _future.result()
chained.set_result(result if return_value is _DEFAULT
else return_value)
except Exception as exc:
# TODO: exc_info
chained.set_exception(exc)

future.add_done_callback(done_callback)
return chained

else:
return future


def is_future(f):
return isinstance(f, concurrent.Future)

@@ -137,6 +171,26 @@ def _callback(_future):
return wrapper


def pymongo_class_wrapper(f, pymongo_class):
"""Executes the coroutine f and wraps its result in a Motor class.
See WrapAsync.
"""
@functools.wraps(f)
@coroutine
def _wrapper(self, *args, **kwargs):
result = yield f(self, *args, **kwargs)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == pymongo_class:
# Delegate to the current object to wrap the result.
raise gen.Return(self.wrap(result))
else:
raise gen.Return(result)

return _wrapper


def yieldable(future):
# TODO: really explain.
return future
@@ -107,11 +107,15 @@ def call_method():


def motor_coroutine(f):
"""Used by Motor class to mark functions as coroutines.
"""Used by Motor classes to mark functions as coroutines.
create_class_with_framework will decorate the function with a framework-
specific coroutine decorator, like asyncio.coroutine or Tornado's
gen.coroutine.
You cannot return a value from a motor_coroutine, the syntax differences
between Tornado on Python 2 and asyncio with Python 3.5 are impossible to
bridge.
"""
f._is_motor_coroutine = _coro_token
return f
@@ -195,20 +199,8 @@ def __init__(self, prop, original_class):
def create_attribute(self, cls, attr_name):
async_method = self.property.create_attribute(cls, attr_name)
original_class = self.original_class

@functools.wraps(async_method)
@cls._framework.coroutine
def wrapper(self, *args, **kwargs):
future = async_method(self, *args, **kwargs)
result = yield cls._framework.yieldable(future)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == original_class:
# Delegate to the current object to wrap the result.
cls._framework.return_value(self.wrap(result))
else:
cls._framework.return_value(result)

wrapper = cls._framework.pymongo_class_wrapper(async_method,
original_class)
if self.doc:
wrapper.__doc__ = self.doc

0 comments on commit dc19418

Please sign in to comment.
You can’t perform that action at this time.