Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
njsmith committed Oct 7, 2018
1 parent 3d3b8c7 commit 41c96dd
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 16 deletions.
110 changes: 102 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,115 @@
:target: https://codecov.io/gh/python-trio/trimeter
:alt: Test coverage

Warning
=======

This library isn't ready for release yet. (It even depends on an
unreleased version of trio!) Feedback welcome!


Trimeter
========

Trio is a friendly Python library for async concurrency and
networking. Trimeter is a simple but powerful job scheduler for
programs using Trio.
programs using Trio, released under your choice of the MIT or Apache 2
licenses.

Trimeter's core purpose is to make it easy to execute lots tasks
concurrently, with rich options to **control the degree of
concurrency** and to **collect the task results**.

Say you have 1000 urls that you want to fetch and process somehow:

.. code-block:: python3
# Old slow way
for url in urls:
await fetch_and_process(url)
That's slow, so you want to do several at the same time... but to
avoid overloading the network, you want to limit it to at most 5 calls
at once. Oh, and there's a request quota, so we have to throttle it
down to 1 per second. No problem:

.. code-block:: python3
# New and fancy way
await trimeter.run_on_each(
fetch_and_process, urls, max_at_once=5, max_per_second=1
)
What if we want to get the result from each call as it finishes, so we
can do something further with it?

.. code-block:: python3
async with trimeter.amap(fetch_and_process, urls, ...) as results:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for result in results:
...
What if we want to use the `outcome library
<https://outcome.readthedocs.io/>`__ to capture exceptions, so one
call crashing doesn't terminate the whole program? And also, we want
to pass through the original url alongside each result, so we know
which result is which?

.. code-block:: python3
async with trimeter.amap(
fetch_and_process,
urls,
capture_outcome=True,
include_value=True,
) as outcomes:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for url, outcome in outcomes:
try:
return_value = outcome.unwrap()
except Exception as exc:
print(f"error while processing {url}: {exc!r}")
What if we just want to call a few functions in parallel and then get
the results as a list, like `asyncio.gather
<https://docs.python.org/3/library/asyncio-task.html#asyncio.gather>`__
or `Promise.all
<https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/all>`__?

.. code-block:: python3
return_values = await trimeter.run_all([
async_fn1,
async_fn2,
functools.partial(async_fn3, extra_arg, kwarg="yeah"),
])
Of course, this takes all the same options as the other functions, so
you can control the degree of parallelism, use ``capture_outcome`` to
capture exceptions, and so forth.

For more details, see `the fine manual
<https://trimeter.readthedocs.io>`__.


Can you summarize that in iambic trimeter?
------------------------------------------

`Iambic trimeter <https://en.wikipedia.org/wiki/Iambic_trimeter>`__?
No problem:

| Trimeter gives you tools
| for running lots of tasks
| to do your work real fast
| but not so fast you crash.
Or, in `iambic trimeter
<https://en.wikipedia.org/wiki/Iambic_trimeter>`__:

| Trimeter gives you tools
| for running lots of tasks
| to do your work real fast
| but not so fast you crash.
Code of conduct
---------------

License: Your choice of MIT or Apache License 2.0
Contributors are requested to follow our `code of conduct
<https://trio.readthedocs.io/en/latest/code-of-conduct.html>`__ in all
project spaces.
42 changes: 34 additions & 8 deletions trimeter/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
# has infinite buffer so it doesn't want to apply backpressure)
# - keyed meters
# - user-defined meters
# - maybe make max_at_once not required, b/c people may prefer to pass
# meters? maybe require at least 1 meter?
# - maybe require at least 1 meter? kind of annoying for run_all maybe...
# - support multiple iterables (like map)?
# - interaction between KeyboardInterrupt and capture_outcome?
# - meaningful task names
Expand Down Expand Up @@ -171,7 +170,7 @@ async def run_on_each(
async_fn,
iterable,
*,
max_at_once,
max_at_once=None,
max_per_second=None,
max_burst=1,
iterable_is_async="guess",
Expand All @@ -181,8 +180,10 @@ async def run_on_each(
include_value=False,
):
try:
# XX: allow users to pass in their own custom meters
meters = []
meter_states.append(MaxMeter(max_at_once))
if max_at_once is not None:
meter_states.append(MaxMeter(max_at_once))
if max_per_second is not None:
meters.append(TokenBucketMeter(max_per_second, max_burst))
meter_states = [meter.new_state() for meter in meters]
Expand Down Expand Up @@ -222,21 +223,46 @@ async def run_on_each(

@asynccontextmanager
@async_generator
async def amap(async_fn, iterable, *, max_buffer_size=0, **kwargs):
async def amap(
async_fn,
iterable,
*,
max_at_once=None,
max_per_second=None,
max_burst=1,
iterable_is_async="guess",
capture_outcome=False,
include_index=False,
include_value=False,
max_buffer_size=0
):
send_channel, receive_channel = trio.open_memory_channel(max_buffer_size)
async with receive_channel:
async with trio.open_nursery() as nursery:
kwargs["send_to"] = send_channel
nursery.start_soon(
partial(run_on_each, async_fn, iterable, **kwargs)
partial(
run_on_each,
# Pass through:
async_fn,
iterable,
max_at_once=max_at_once,
max_per_second=max_per_second,
max_burst=max_burst,
iterable_is_async=iterable_is_async,
capture_outcome=capture_outcome,
include_index=include_index,
include_value=include_value,
# Not a simple pass-through:
send_to=send_channel,
)
)
await yield_(receive_channel)


async def run_all(
async_fns,
*,
max_at_once,
max_at_once=None,
max_per_second=None,
max_burst=1,
iterable_is_async="guess",
Expand Down

0 comments on commit 41c96dd

Please sign in to comment.