Skip to content

Commit

Permalink
AsyncIOThreadEngine replaced by AsyncIOExecutorEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
sirkonst committed Sep 7, 2016
1 parent 6cfbdcc commit abe03c5
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -2,5 +2,6 @@ sudo: false
language: python
python:
- "3.5"
install: pip install tox-travis
install:
- pip install -U tox tox-travis pbr setuptools
script: make test
5 changes: 5 additions & 0 deletions ChangeLog
@@ -1,6 +1,11 @@
CHANGES
=======

0.3.0
-----

* AsyncIOThreadEngine replaced by AsyncIOExecutorEngine

0.2.0
-----

Expand Down
19 changes: 13 additions & 6 deletions README.rst
Expand Up @@ -69,23 +69,30 @@ Default engine for concurrently run code as asyncio coroutines::
await fetch_urls()


AsyncIOThreadEngine
-------------------
AsyncIOExecutorEngine
---------------------

Concurrently run code in system threads by use asyncio executor::
Concurrently run code by asyncio executor::

from concurrently import concurrently, AsyncIOThreadEngine
from concurrent.futures import ThreadPoolExecutor
from concurrently import concurrently, AsyncIOExecutorEngine

...
@concurrently(2, engine=AsyncIOThreadEngine)
my_pool = ThreadPoolExecutor()

@concurrently(2, engine=AsyncIOExecutorEngine, loop=loop, executor=my_pool)
def fetch_urls(): # not async def
...

await fetch_urls()

If ``executor`` is ``None`` or not set will using default asyncio executor.

Note: ``ProcessPoolExecutor`` is not supported.


ThreadEngine
----------------
------------

Concurrently run code in system threads::

Expand Down
4 changes: 2 additions & 2 deletions concurrently/__init__.py
@@ -1,10 +1,10 @@
from concurrently._concurrently import concurrently, set_default_engine
from concurrently.engines.asyncio import AsyncIOEngine, AsyncIOThreadEngine
from concurrently.engines.asyncio import AsyncIOEngine, AsyncIOExecutorEngine
from concurrently.engines.thread_pool import ThreadPoolEngine
from concurrently.engines.thread import ThreadEngine
from concurrently.engines.process import ProcessEngine

__all__ = 'concurrently', 'AsyncIOEngine', 'AsyncIOThreadEngine', \
__all__ = 'concurrently', 'AsyncIOEngine', 'AsyncIOExecutorEngine', \
'ThreadPoolEngine', 'ThreadEngine', 'ProcessEngine'

set_default_engine(AsyncIOEngine)
11 changes: 7 additions & 4 deletions concurrently/engines/asyncio.py
@@ -1,5 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, List

from . import AbstractEngine, AbstractWaiter
Expand Down Expand Up @@ -42,11 +42,14 @@ def waiter_factory(self, fs: List[asyncio.Future]):
return AsyncIOWaiter(fs=fs, loop=self.loop)


class AsyncIOThreadEngine(AsyncIOEngine):
class AsyncIOExecutorEngine(AsyncIOEngine):

def __init__(self, *, loop=None, executor=None):
assert not isinstance(executor, ProcessPoolExecutor), \
'ProcessPoolExecutor is not supported'

def __init__(self, *, loop=None):
super().__init__(loop=loop)
self.pool = ThreadPoolExecutor() # TODO: shutdown pool
self.pool = executor

def create_task(self, fn: Callable[[], None]) -> asyncio.Future:
return self.loop.run_in_executor(self.pool, fn)
@@ -1,37 +1,56 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from queue import Queue

import pytest

from concurrently import concurrently, AsyncIOThreadEngine
from concurrently import concurrently, AsyncIOExecutorEngine


def process(data):
time.sleep(data)
return time.time()


paramz_executor = pytest.mark.parametrize(
'executor', [None, ThreadPoolExecutor()],
ids=lambda v: 'exec %s' % type(v).__name__
)


@pytest.mark.asyncio(forbid_global_loop=True)
@pytest.mark.parametrize(
'conc_count', range(1, 5), ids=lambda v: 'counc %s' % v
)
@pytest.mark.parametrize(
'data_count', range(1, 5), ids=lambda v: 'data %s' % v
)
async def test_concurrently(conc_count, data_count, event_loop):
@paramz_executor
async def test_concurrently(conc_count, data_count, executor, event_loop):
data = range(data_count)
i_data = iter(data)
results = {}
q_data = Queue()
for d in data:
q_data.put(d)
q_results = Queue()
start_time = time.time()

@concurrently(conc_count, engine=AsyncIOThreadEngine, loop=event_loop)
@concurrently(
conc_count, engine=AsyncIOExecutorEngine, loop=event_loop,
executor=executor
)
def _parallel():
for d in i_data:
while not q_data.empty():
d = q_data.get()
res = process(d)
results[d] = res
q_results.put({d: res})

await _parallel()

results = {}
while not q_results.empty():
results.update(q_results.get())

def calc_delta(n):
if n // conc_count == 0:
return n
Expand All @@ -44,13 +63,16 @@ def calc_delta(n):


@pytest.mark.asyncio(forbid_global_loop=True)
async def test_stop(event_loop):
@paramz_executor
async def test_stop(executor, event_loop):
data = range(3)
i_data = iter(data)
results = {}
start_time = time.time()

@concurrently(2, engine=AsyncIOThreadEngine, loop=event_loop)
@concurrently(
2, engine=AsyncIOExecutorEngine, loop=event_loop, executor=executor
)
def _parallel():
for d in i_data:
r = process(d)
Expand All @@ -64,13 +86,16 @@ def _parallel():


@pytest.mark.asyncio(forbid_global_loop=True)
async def test_exception(event_loop):
@paramz_executor
async def test_exception(executor, event_loop):
data = range(2)
i_data = iter(data)
results = {}
start_time = time.time()

@concurrently(2, engine=AsyncIOThreadEngine, loop=event_loop)
@concurrently(
2, engine=AsyncIOExecutorEngine, loop=event_loop, executor=executor
)
def _parallel():
for d in i_data:
if d == 1:
Expand All @@ -86,3 +111,17 @@ def _parallel():
exc_list = _parallel.exceptions()
assert len(exc_list) == 1
assert isinstance(exc_list[0], RuntimeError)


@pytest.mark.asyncio(forbid_global_loop=True)
async def test_processpool_isnot_supported(event_loop):

with pytest.raises(AssertionError) as exc:
@concurrently(
engine=AsyncIOExecutorEngine, loop=event_loop,
executor=ProcessPoolExecutor()
)
def _parallel():
pass

assert str(exc.value) == 'ProcessPoolExecutor is not supported'

0 comments on commit abe03c5

Please sign in to comment.