Skip to content

Commit

Permalink
MOTOR-86 Can't return values from internal coros.
Browse files Browse the repository at this point in the history
Consequences of PEP 479.

Motor not only supports users' coroutines, it uses coroutines to implement of
some of its own features, like client.open() and gridfs.put(). Some of those
coroutines return values.

In Tornado and Python 2.6, Motor's internal coros could return values with
"raise gen.Return(value)", and in Python 2.7 could alternatively use "raise
StopIteration(value)". The latter also works in Python 3.3+ and asyncio, so
this worked everywhere:

@motor_coroutine
def f():
    raise framework.return_value(value)

... where framework.return_value is gen.Return or StopIteration.

In Python 3.5 with asyncio, however, raising StopIteration to return from a
coroutine is prohibited (PEP 492 says, "PEP 479 is enabled by default for
coroutines"). Only "return value" works.

It's possible there's a workaround for now, but it will break in Python 3.7
(PEP 479 transition plan: "Python 3.7: Enable new semantics everywhere"). For
the future's sake, stop returning values from Motor's internal coroutes and use
callbacks and explicit Futures to implement returns.

Still TODO: reimplement parallel_collection_scan.
  • Loading branch information
ajdavis committed Sep 27, 2015
1 parent 5ed98b6 commit dc19418
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 110 deletions.
77 changes: 49 additions & 28 deletions motor/core.py
Expand Up @@ -522,8 +522,7 @@ def __init__(self, *args, **kwargs):
# 'MotorClient' that create_class_with_framework created. # 'MotorClient' that create_class_with_framework created.
super(self.__class__, self).__init__(io_loop, *args, **kwargs) super(self.__class__, self).__init__(io_loop, *args, **kwargs)


@motor_coroutine def open(self, callback=None):
def open(self):
"""Connect to the server. """Connect to the server.
Takes an optional callback, or returns a Future that resolves to Takes an optional callback, or returns a Future that resolves to
Expand All @@ -548,8 +547,10 @@ def open(self):
:class:`MotorClient` now opens itself on demand, calling ``open`` :class:`MotorClient` now opens itself on demand, calling ``open``
explicitly is now optional. explicitly is now optional.
""" """
yield self._framework.yieldable(self._ensure_connected()) return self._framework.future_or_callback(self._ensure_connected(),
self._framework.return_value(self) callback,
self.get_io_loop(),
self)


def _get_member(self): def _get_member(self):
# TODO: expose the PyMongo Member, or otherwise avoid this. # TODO: expose the PyMongo Member, or otherwise avoid this.
Expand Down Expand Up @@ -600,8 +601,7 @@ def __init__(self, *args, **kwargs):
# 'MotorClient' that create_class_with_framework created. # 'MotorClient' that create_class_with_framework created.
super(self.__class__, self).__init__(io_loop, *args, **kwargs) super(self.__class__, self).__init__(io_loop, *args, **kwargs)


@motor_coroutine def open(self, callback=None):
def open(self):
"""Connect to the server. """Connect to the server.
Takes an optional callback, or returns a Future that resolves to Takes an optional callback, or returns a Future that resolves to
Expand All @@ -626,11 +626,10 @@ def open(self):
:class:`MotorReplicaSetClient` now opens itself on demand, calling :class:`MotorReplicaSetClient` now opens itself on demand, calling
``open`` explicitly is now optional. ``open`` explicitly is now optional.
""" """
yield self._framework.yieldable(self._ensure_connected(True)) return self._framework.future_or_callback(self._ensure_connected(),
primary = self._get_member() callback,
if not primary: self.get_io_loop(),
raise pymongo.errors.AutoReconnect('no primary is available') self)
self._framework.return_value(self)


def _get_member(self): def _get_member(self):
# TODO: expose the PyMongo RSC members, or otherwise avoid this. # TODO: expose the PyMongo RSC members, or otherwise avoid this.
Expand Down Expand Up @@ -1218,18 +1217,18 @@ def fetch_next(self):
.. _`large batches`: http://docs.mongodb.org/manual/core/read-operations/#cursor-behaviors .. _`large batches`: http://docs.mongodb.org/manual/core/read-operations/#cursor-behaviors
""" """
future = self._framework.get_future(self.get_io_loop())

if not self._buffer_size() and self.alive: if not self._buffer_size() and self.alive:
# Return the Future, which resolves to number of docs fetched or 0. # Return the Future, which resolves to number of docs fetched or 0.
return self._get_more() return self._get_more()
elif self._buffer_size(): elif self._buffer_size():
future = self._framework.get_future(self.get_io_loop())
future.set_result(True) future.set_result(True)
return future return future
else: else:
# Dead # Dead
future = self._framework.get_future(self.get_io_loop())
future.set_result(False) future.set_result(False)
return future return future


def next_object(self): def next_object(self):
"""Get a document from the most recently fetched batch, or ``None``. """Get a document from the most recently fetched batch, or ``None``.
Expand Down Expand Up @@ -1320,8 +1319,7 @@ def _each_got_more(self, callback, future):
self.get_io_loop(), self.get_io_loop(),
functools.partial(callback, None, None)) functools.partial(callback, None, None))


@motor_coroutine def to_list(self, length, callback=None):
def to_list(self, length):
"""Get a list of documents. """Get a list of documents.
.. testsetup:: to_list .. testsetup:: to_list
Expand Down Expand Up @@ -1370,24 +1368,47 @@ def to_list(self, length):
raise pymongo.errors.InvalidOperation( raise pymongo.errors.InvalidOperation(
"Can't call to_list on tailable cursor") "Can't call to_list on tailable cursor")


the_list = [] future = self._framework.get_future(self.get_io_loop())
collection = self.collection
fix_outgoing = collection.database.delegate._fix_outgoing # Run future_or_callback's type checking before we change anything.
retval = self._framework.future_or_callback(future,
callback,
self.get_io_loop())


self.started = True self.started = True
while True: the_list = []
yield self._framework.yieldable(self._refresh()) self._refresh(callback=functools.partial(self._to_list,
while (self._buffer_size() > 0 and length,
(length is None or len(the_list) < length)): the_list,
future))


doc = self._data().popleft() return retval
the_list.append(fix_outgoing(doc, collection))
def _to_list(self, length, the_list, future, result, error):
if error:
# TODO: lost exc_info
future.set_exception(error)
else:
collection = self.collection
fix_outgoing = collection.database.delegate._fix_outgoing

if length is None:
n = result
else:
n = min(length, result)

for _ in range(n):
the_list.append(fix_outgoing(self._data().popleft(),
collection))


reached_length = (length is not None and len(the_list) >= length) reached_length = (length is not None and len(the_list) >= length)
if reached_length or not self.alive: if reached_length or not self.alive:
break future.set_result(the_list)

else:
self._framework.return_value(the_list) self._refresh(callback=functools.partial(self._to_list,
length,
the_list,
future))


def get_io_loop(self): def get_io_loop(self):
return self.collection.get_io_loop() return self.collection.get_io_loop()
Expand Down
49 changes: 44 additions & 5 deletions motor/frameworks/asyncio.py
Expand Up @@ -39,15 +39,34 @@ def check_event_loop(loop):
"not %r" % loop) "not %r" % loop)




def return_value(value):
# In Python 3.3, StopIteration can accept a value.
raise StopIteration(value)


def get_future(loop): def get_future(loop):
return asyncio.Future(loop=loop) return asyncio.Future(loop=loop)




_DEFAULT = object()


def future_or_callback(future, callback, loop, return_value=_DEFAULT):
if callback:
raise NotImplementedError("Motor with asyncio prohibits callbacks")

if return_value is _DEFAULT:
return future

chained = asyncio.Future(loop=loop)

def done_callback(_future):
try:
result = _future.result()
chained.set_result(result if return_value is _DEFAULT
else return_value)
except Exception as exc:
chained.set_exception(exc)

future.add_done_callback(done_callback)
return chained


def is_future(f): def is_future(f):
return isinstance(f, asyncio.Future) return isinstance(f, asyncio.Future)


Expand Down Expand Up @@ -92,6 +111,26 @@ def close_resolver(resolver):
coroutine = asyncio.coroutine coroutine = asyncio.coroutine




def pymongo_class_wrapper(f, pymongo_class):
"""Executes the coroutine f and wraps its result in a Motor class.
See WrapAsync.
"""
@functools.wraps(f)
@asyncio.coroutine
def _wrapper(self, *args, **kwargs):
result = yield from f(self, *args, **kwargs)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == pymongo_class:
# Delegate to the current object to wrap the result.
return self.wrap(result)
else:
return result

return _wrapper


def yieldable(future): def yieldable(future):
# TODO: really explain. # TODO: really explain.
return next(iter(future)) return next(iter(future))
Expand Down
62 changes: 58 additions & 4 deletions motor/frameworks/tornado.py
Expand Up @@ -51,10 +51,6 @@ def check_event_loop(loop):
"io_loop must be instance of IOLoop, not %r" % loop) "io_loop must be instance of IOLoop, not %r" % loop)




def return_value(value):
raise gen.Return(value)


# Beginning in Tornado 4, TracebackFuture is a deprecated alias for Future. # Beginning in Tornado 4, TracebackFuture is a deprecated alias for Future.
# Future-proof Motor in case TracebackFuture is some day removed. Remove this # Future-proof Motor in case TracebackFuture is some day removed. Remove this
# Future-proofing once we drop support for Tornado 3. # Future-proofing once we drop support for Tornado 3.
Expand All @@ -68,6 +64,44 @@ def get_future(loop):
return _TornadoFuture() return _TornadoFuture()




_DEFAULT = object()


def future_or_callback(future, callback, io_loop, return_value=_DEFAULT):
if callback:
if not callable(callback):
raise callback_type_error

# Motor's callback convention is "callback(result, error)".
def done_callback(_future):
try:
result = _future.result()
callback(result if return_value is _DEFAULT else return_value,
None)
except Exception as exc:
callback(None, exc)

future.add_done_callback(done_callback)

elif return_value is not _DEFAULT:
chained = _TornadoFuture()

def done_callback(_future):
try:
result = _future.result()
chained.set_result(result if return_value is _DEFAULT
else return_value)
except Exception as exc:
# TODO: exc_info
chained.set_exception(exc)

future.add_done_callback(done_callback)
return chained

else:
return future


def is_future(f): def is_future(f):
return isinstance(f, concurrent.Future) return isinstance(f, concurrent.Future)


Expand Down Expand Up @@ -137,6 +171,26 @@ def _callback(_future):
return wrapper return wrapper




def pymongo_class_wrapper(f, pymongo_class):
"""Executes the coroutine f and wraps its result in a Motor class.
See WrapAsync.
"""
@functools.wraps(f)
@coroutine
def _wrapper(self, *args, **kwargs):
result = yield f(self, *args, **kwargs)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == pymongo_class:
# Delegate to the current object to wrap the result.
raise gen.Return(self.wrap(result))
else:
raise gen.Return(result)

return _wrapper


def yieldable(future): def yieldable(future):
# TODO: really explain. # TODO: really explain.
return future return future
Expand Down
22 changes: 7 additions & 15 deletions motor/metaprogramming.py
Expand Up @@ -107,11 +107,15 @@ def call_method():




def motor_coroutine(f): def motor_coroutine(f):
"""Used by Motor class to mark functions as coroutines. """Used by Motor classes to mark functions as coroutines.
create_class_with_framework will decorate the function with a framework- create_class_with_framework will decorate the function with a framework-
specific coroutine decorator, like asyncio.coroutine or Tornado's specific coroutine decorator, like asyncio.coroutine or Tornado's
gen.coroutine. gen.coroutine.
You cannot return a value from a motor_coroutine, the syntax differences
between Tornado on Python 2 and asyncio with Python 3.5 are impossible to
bridge.
""" """
f._is_motor_coroutine = _coro_token f._is_motor_coroutine = _coro_token
return f return f
Expand Down Expand Up @@ -195,20 +199,8 @@ def __init__(self, prop, original_class):
def create_attribute(self, cls, attr_name): def create_attribute(self, cls, attr_name):
async_method = self.property.create_attribute(cls, attr_name) async_method = self.property.create_attribute(cls, attr_name)
original_class = self.original_class original_class = self.original_class

wrapper = cls._framework.pymongo_class_wrapper(async_method,
@functools.wraps(async_method) original_class)
@cls._framework.coroutine
def wrapper(self, *args, **kwargs):
future = async_method(self, *args, **kwargs)
result = yield cls._framework.yieldable(future)

# Don't call isinstance(), not checking subclasses.
if result.__class__ == original_class:
# Delegate to the current object to wrap the result.
cls._framework.return_value(self.wrap(result))
else:
cls._framework.return_value(result)

if self.doc: if self.doc:
wrapper.__doc__ = self.doc wrapper.__doc__ = self.doc


Expand Down

0 comments on commit dc19418

Please sign in to comment.