-
Notifications
You must be signed in to change notification settings - Fork 37
/
_asyncio.py
59 lines (48 loc) · 2.04 KB
/
_asyncio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
from asyncio import ensure_future, Future, iscoroutine
from pyee._base import BaseEventEmitter
__all__ = ['AsyncIOEventEmitter']
class AsyncIOEventEmitter(BaseEventEmitter):
"""An event emitter class which can run asyncio coroutines in addition to
synchronous blocking functions. For example::
@ee.on('event')
async def async_handler(*args, **kwargs):
await returns_a_future()
On emit, the event emitter will automatically schedule the coroutine using
``asyncio.ensure_future`` and the configured event loop (defaults to
``asyncio.get_event_loop()``).
Unlike the case with the BaseEventEmitter, all exceptions raised by
event handlers are automatically emitted on the ``error`` event. This is
important for asyncio coroutines specifically but is also handled for
synchronous functions for consistency.
When ``loop`` is specified, the supplied event loop will be used when
scheduling work with ``ensure_future``. Otherwise, the default asyncio
event loop is used.
For asyncio coroutine event handlers, calling emit is non-blocking.
In other words, you do not have to await any results from emit, and the
coroutine is scheduled in a fire-and-forget fashion.
"""
def __init__(self, loop=None):
super(AsyncIOEventEmitter, self).__init__()
self._loop = loop
def _emit_run(self, f, args, kwargs):
try:
coro = f(*args, **kwargs)
except Exception as exc:
self.emit('error', exc)
else:
if iscoroutine(coro):
if self._loop:
f = ensure_future(coro, loop=self._loop)
else:
f = ensure_future(coro)
elif isinstance(coro, Future):
f = coro
else:
f = None
if f:
@f.add_done_callback
def _callback(f):
exc = f.exception()
if exc:
self.emit('error', exc)