Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ def get_running_loop():

This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
loop = _get_running_loop()
if loop is None:
raise RuntimeError('no running event loop')
Expand All @@ -664,6 +665,7 @@ def _get_running_loop():
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and pid == os.getpid():
return running_loop
Expand All @@ -675,6 +677,7 @@ def _set_running_loop(loop):
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
_running_loop.loop_pid = (loop, os.getpid())


Expand Down Expand Up @@ -711,6 +714,7 @@ def get_event_loop():
If there is no running event loop set, the function will return
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
Expand All @@ -736,3 +740,26 @@ def set_child_watcher(watcher):
"""Equivalent to calling
get_event_loop_policy().set_child_watcher(watcher)."""
return get_event_loop_policy().set_child_watcher(watcher)


# Alias pure-Python implementations for testing purposes.
_py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop


try:
# get_event_loop() is one of the most frequently called
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
get_running_loop, get_event_loop)
except ImportError:
pass
else:
# Alias C implementations for testing purposes.
_c__get_running_loop = _get_running_loop
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
133 changes: 113 additions & 20 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import asyncio
from asyncio import coroutines
from asyncio import events
from asyncio import proactor_events
from asyncio import selector_events
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -2145,23 +2146,6 @@ def tearDown(self):
asyncio.set_child_watcher(None)
super().tearDown()

def test_get_event_loop_new_process(self):
# Issue bpo-32126: The multiprocessing module used by
# ProcessPoolExecutor is not functional when the
# multiprocessing.synchronize module cannot be imported.
support.import_module('multiprocessing.synchronize')
async def main():
pool = concurrent.futures.ProcessPoolExecutor()
result = await self.loop.run_in_executor(
pool, _test_get_event_loop_new_process__sub_proc)
pool.shutdown()
return result

self.unpatch_get_running_loop()

self.assertEqual(
self.loop.run_until_complete(main()),
'hello')

if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
Expand Down Expand Up @@ -2722,17 +2706,95 @@ def test_set_event_loop_policy(self):
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)


class GetEventLoopTestsMixin:

_get_running_loop_impl = None
_set_running_loop_impl = None
get_running_loop_impl = None
get_event_loop_impl = None

def setUp(self):
self._get_running_loop_saved = events._get_running_loop
self._set_running_loop_saved = events._set_running_loop
self.get_running_loop_saved = events.get_running_loop
self.get_event_loop_saved = events.get_event_loop

events._get_running_loop = type(self)._get_running_loop_impl
events._set_running_loop = type(self)._set_running_loop_impl
events.get_running_loop = type(self).get_running_loop_impl
events.get_event_loop = type(self).get_event_loop_impl

asyncio._get_running_loop = type(self)._get_running_loop_impl
asyncio._set_running_loop = type(self)._set_running_loop_impl
asyncio.get_running_loop = type(self).get_running_loop_impl
asyncio.get_event_loop = type(self).get_event_loop_impl

super().setUp()

self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(self.loop)
asyncio.set_child_watcher(watcher)

def tearDown(self):
try:
asyncio.set_child_watcher(None)
super().tearDown()
finally:
self.loop.close()
asyncio.set_event_loop(None)

events._get_running_loop = self._get_running_loop_saved
events._set_running_loop = self._set_running_loop_saved
events.get_running_loop = self.get_running_loop_saved
events.get_event_loop = self.get_event_loop_saved

asyncio._get_running_loop = self._get_running_loop_saved
asyncio._set_running_loop = self._set_running_loop_saved
asyncio.get_running_loop = self.get_running_loop_saved
asyncio.get_event_loop = self.get_event_loop_saved

if sys.platform != 'win32':

def test_get_event_loop_new_process(self):
# Issue bpo-32126: The multiprocessing module used by
# ProcessPoolExecutor is not functional when the
# multiprocessing.synchronize module cannot be imported.
support.import_module('multiprocessing.synchronize')

async def main():
pool = concurrent.futures.ProcessPoolExecutor()
result = await self.loop.run_in_executor(
pool, _test_get_event_loop_new_process__sub_proc)
pool.shutdown()
return result

self.assertEqual(
self.loop.run_until_complete(main()),
'hello')

def test_get_event_loop_returns_running_loop(self):
class TestError(Exception):
pass

class Policy(asyncio.DefaultEventLoopPolicy):
def get_event_loop(self):
raise NotImplementedError

loop = None
raise TestError

old_policy = asyncio.get_event_loop_policy()
try:
asyncio.set_event_loop_policy(Policy())
loop = asyncio.new_event_loop()

with self.assertRaises(TestError):
asyncio.get_event_loop()
asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()

with self.assertRaisesRegex(RuntimeError, 'no running'):
self.assertIs(asyncio.get_running_loop(), None)
self.assertIs(asyncio._get_running_loop(), None)
Expand All @@ -2743,6 +2805,15 @@ async def func():
self.assertIs(asyncio._get_running_loop(), loop)

loop.run_until_complete(func())

asyncio.set_event_loop(loop)
with self.assertRaises(TestError):
asyncio.get_event_loop()

asyncio.set_event_loop(None)
with self.assertRaises(TestError):
asyncio.get_event_loop()

finally:
asyncio.set_event_loop_policy(old_policy)
if loop is not None:
Expand All @@ -2754,5 +2825,27 @@ async def func():
self.assertIs(asyncio._get_running_loop(), None)


class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):

_get_running_loop_impl = events._py__get_running_loop
_set_running_loop_impl = events._py__set_running_loop
get_running_loop_impl = events._py_get_running_loop
get_event_loop_impl = events._py_get_event_loop


try:
import _asyncio # NoQA
except ImportError:
pass
else:

class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):

_get_running_loop_impl = events._c__get_running_loop
_set_running_loop_impl = events._c__set_running_loop
get_running_loop_impl = events._c_get_running_loop
get_event_loop_impl = events._c_get_event_loop


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Implement asyncio._get_running_loop() and get_event_loop() in C. This makes
them 4x faster.
Loading