Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Submitting a coroutine to an asyncio event loop #273

Closed
wants to merge 35 commits into from

Conversation

vxgmichel
Copy link

This PR follows up on this conversation on the python-ideas mailing list. It is based on the implementation I posted and commented here, but discarding the executor.

This PR includes:

  • a connect_futures function, to safely connect two futures regardless of their types
  • a submit_to_loop function, to submit a coroutine from a thread to a given event loop
  • a test case for submit_to_loop (runs in about 15 ms)

It also affects wrap_future, refactored to use connect_futures and avoid code duplication.

I tried to gather my changes as much as possible. However, you might want to have a look at:

  • import asyncio (in futures.py). I needed ensure_future for submit_in_loop and couldn't import it with from . import tasks (because of circular import).
  • _copy_state (in futures.py). The code in Future._copy_state should probably move in _copy_state as I pointed out in a comment.
  • _schedule works exactly like ensure_future but adds an optional destination argument. There could be some refactoring here, though you probably don't want to change the ensure_future prototype.

New names and prototypes might also need to be rethought, along with docstrings and type testing. I'm not sure either about how to use the TestLoop, though the tests seem to pass properly.

@ajdavis
Copy link

ajdavis commented Sep 28, 2015

Tornado's chain_future is equivalent to your connect_futures so I propose using Tornado's name for this feature.

@vxgmichel
Copy link
Author

@ajdavis All right, let me know if you have other suggestions

@gvanrossum
Copy link
Member

Can you fill out the PSF contributor form at https://www.python.org/psf/contrib/contrib-form/ ? This is a one-time chore. :-)

"""Copy state from a future to another future.
Compatible with both asyncio.Future and concurrent.futures.Future."""
# Future._copy_state code should probably move in here
return Future._copy_state.__get__(destination)(source)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you refactor this so that the actual code is here and Future._copy_state invokes this? Or even better make it just a helper function -- the one place where Future._copy_state is invoked you can just change it to call the function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Actually I see you removed the direct call site. So you can just move the code here.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!
I just pushed a commit to make _copy_state a static method, is it ok? I had to update the tests and I noticed this line:

# Test the internal _copy_state method since it's being directly
# invoked in other modules.

I couldn't find any other usage in asyncio, is another project using this method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That comment may be old, but it's best to have an explicit test for this function anyway.

Anyway, I hate static methods, can you just make it a function?

@gvanrossum
Copy link
Member

Thanks! This looks promising. I've left a bunch of refactoring suggestions. Let me know when you're ready for another code review.

@vxgmichel
Copy link
Author

All right, it should look better now!
I also changed the way chain_future decides whether to use call_soon_threadsafe or not. Now it compares the loops instead of checking the type of the futures.

@@ -360,25 +360,6 @@ def set_exception(self, exception):

# Truly internal methods.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is obsolete; just delete it.

@1st1
Copy link
Member

1st1 commented Sep 30, 2015

@vxgmichel @gvanrossum

Sure, what about submit_coroutine? It implies that the coroutine will run in an event loop (since it is the only place it can run in) and submit refers to Executor.submit that returns a concurrent.futures.Future.

I like submit_coroutine better, but still, it doesn't reflect that an instance of a concurrent.futures.Future is returned. If I'm not mistaken we don't have any other API that returns non-asyncio futures.

How about schedule_concurrent or schedule_coroutine_concurrently or run_coroutine_concurrently? They are very verbose, but descriptive.

chain_future is a generic version of wrap_future, and since chain_future already exists in tornado, I thought it made sense to expose it. However, it is true that chain_future and wrap_future are not directly useful since submit_coroutine and run_in_executor should cover most of the use cases.

It was requested by someone on bugs.python.org to better document wrap_future, since we decided not to support awaiting on concurrent futures in asyncio. I think it's a valid use case, in fact there is an issue to document this function.

chain_future is hardcoded to only accept concurrent futures and asyncio futures, it won't accept one from Tornado, for instance. Its implementation is rather complex, and it's hard to understand what it does. I'd really prefer to keep this function private if possible.

@vxgmichel
Copy link
Author

@1st1 @gvanrossum

How about schedule_concurrent or schedule_coroutine_concurrently or run_coroutine_concurrently? They are very verbose, but descriptive.

I'll let you guys settle on a name, I changed it to submit_coroutine for the moment.

I'd really prefer to keep this function private if possible.

All right, I removed chain_future from __all__. Should we rename it to _chain_future as well?

return
assert not destination.done()
if source.cancelled():
destination.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually doesn't work for a concurrent Future. Its cancel() method is a no-op once the state is RUNNING (which it is once set_running_or_notify_cancel() returns).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this implementation of _copy_state is trickier than what I expected... I came up with this:

def _copy_state(source, destination):
    """Copy state from a future to another future.

    Compatible with both asyncio.Future and concurrent.futures.Future.
    """
    # Check source cancellation
    assert source.done()
    if source.cancelled():
        destination.cancel()
        return
    # Check destination cancellation
    if isinstance(destination, concurrent.futures.Future):
        if not destination.set_running_or_notify_cancel():
            return
    elif destination.cancelled():
        return
    assert not destination.done()
    # Set exception or result
    exception = source.exception()
    if exception is not None:
        destination.set_exception(exception)
    else:
        result = source.result()
        destination.set_result(result)

It should be ok to check the source cancellation before since a cancel method should never fail, regardless of the future type. I also wrote a new test to cover the case you described.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gvanrossum
I thought about what you said and separating the two _copy_state implementations. What about keeping the current implementation of Future._copy_state and changing _copy_state to:

def _copy_state(source, destination):
    # Use Future._copy_state implementation
    if isinstance(destination, Future):
        return destination._copy_state(source)
    # Implements _copy_state for concurrent.future.Futures
    assert source.done()
    if source.cancelled():
        destination.cancel()
    if not destination.set_running_or_notify_cancel():
        return
    exception = source.exception()
    if exception is not None:
        destination.set_exception(exception)
    else:
        result = source.result()
        destination.set_result(result)

Pros:

  • Each implementation can be maintained separately
  • The second part can easily be moved somewhere else if needed

Cons:

  • A bit of code duplication

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea. I would go even further -- I would have a _copy_asyncio_future_state_to_concurrent_future() method (maybe use a shorter name :-) and decide which one to use when registering the callback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the source type doesn't matter since the source is done already: the only thing we need to know about it is if it's cancelled, done with a result or done with an exception. This is why _copy_state fits very well as a method, since the only type that matters is the type of the instance itself. I thought of _set_concurrent_future_state with the following prototype:

def _set_concurrent_future_state(concurrent, other):

I think it's less confusing and more similar to:

    def _copy_state(self, other):

I'll push it in a second, please let me know what you think about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, but please name the other argument source.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, should I rename it in Future._copy_state too for consistency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please leave that alone. Consistency is overrated. ;-)

@gvanrossum
Copy link
Member

Thanks! I consider this done except for Python docs. I've manually applied your patch and pushed it to the repo. I'm in the middle of committing the patch to the CPython repo. I would love it if you volunteered doc updates for CPython! (Starting with 3.4.) See you in bugs.python.org for that...

@gvanrossum gvanrossum closed this Oct 3, 2015
@gvanrossum
Copy link
Member

Cross-reference CPython issue: http://bugs.python.org/issue25304

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants