From cd959bb455791f3a524de44d05e7c63e261da7f2 Mon Sep 17 00:00:00 2001 From: Richard Zhu Date: Mon, 9 Dec 2024 03:25:59 +0000 Subject: [PATCH] update readme --- README.md | 233 +++++++++++++++--------------------------------------- 1 file changed, 62 insertions(+), 171 deletions(-) diff --git a/README.md b/README.md index 5394e88..5aa8bfc 100644 --- a/README.md +++ b/README.md @@ -22,15 +22,15 @@ --- -Pyper is a generalized framework for concurrent data-processing, based on functional programming patterns. Used for 🌐 **Data Collection**, 🔀 **ETL systems**, and general-purpose 🛠️ **Python Scripting** +Pyper is a comprehensive framework for concurrent and parallel data-processing, based on functional programming patterns. Used for 🌐 **Data Collection**, 🔀 **ETL Systems**, and general-purpose 🛠️ **Python Scripting** See the [Documentation](https://pyper-dev.github.io/pyper/) Key features: -* 💡**Intuitive API**: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded and asynchronous work. +* 💡**Intuitive API**: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work. * 🚀 **Functional Paradigm**: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally. -* 🛡️ **Safety**: Hides the heavy lifting of underlying task creation and execution. No more worrying about race conditions, memory leaks, and thread-level error handling. +* 🛡️ **Safety**: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling. * ⚡ **Efficiency**: Designed from the ground up for lazy execution, using queues, workers, and generators. * ✨ **Pure Python**: Lightweight, with zero sub-dependencies. @@ -46,235 +46,124 @@ Note that `python-pyper` is the [pypi](https://pypi.org/project/python-pyper) re ## Usage +In Pyper, the `task` decorator is used to transform functions into composable pipelines. + Let's simulate a pipeline that performs a series of transformations on some data. ```python import asyncio import time -from typing import AsyncIterable from pyper import task -def step1(limit: int): - """Generate some data.""" +def get_data(limit: int): for i in range(limit): yield i -async def step2(data: int): - """Simulate some asynchronous work.""" +async def step1(data: int): await asyncio.sleep(1) - print("Finished async sleep") - return data + 1 + print("Finished async wait", data) + return data -def step3(data: int): - """Simulate some IO-bound (non awaitable) work.""" +def step2(data: int): time.sleep(1) - print("Finished sync sleep") - return 2 * data - 1 + print("Finished sync wait", data) + return data -async def print_sum(data: AsyncIterable[int]): - """Print the sum of values from a data stream.""" - total = 0 - async for output in data: - total += output - print("Total ", total) +def step3(data: int): + for i in range(10_000_000): + _ = i*i + print("Finished heavy computation", data) + return data async def main(): # Define a pipeline of tasks using `pyper.task` - run = task(step1) | task(step2, concurrency=20) | task(step3, concurrency=20) > print_sum - await run(limit=20) - - -if __name__ == "__main__": - asyncio.run(main()) # takes ~2 seconds -``` - -Pyper provides an elegant abstraction of the concurrent execution of each function via `pyper.task`, allowing you to focus on building out the **logical** functions of your program. - -In our pipeline: - -* `task(step1)` generates 20 data values - -* `task(step2, concurrency=20)` spins up 20 asynchronous workers, taking each value as input and returning an output - -* `task(step3, concurrency=20)` spins up 20 threaded workers, taking each value as input and returning an output - -The script therefore takes ~2 seconds to complete, as `step2` and `step3` in the pipeline only take the 1 second of sleep time, performed concurrently. If you'd like, experiment with tweaking the `limit` and `concurrency` values for yourself. - ---- - -
-What does the logic translate to in non-concurrent code? - -
- -Having defined the logical operations we want to perform on our data as functions, all we are doing is piping the output of one function to the input of another. In sequential code, this could look like: - -```python -# Analogous to: -# pipeline = task(step1) | task(step2) | task(step3) -async def pipeline(limit: int): - for data in step1(limit): - data = await step2(data) - data = step3(data) - yield data + pipeline = task(get_data, branch=True) \ + | task(step1, workers=20) \ + | task(step2, workers=20) \ + | task(step3, workers=20, multiprocess=True) - -# Analogous to: -# run = pipeline > print_sum -async def run(limit: int): - await print_sum(pipeline(limit)) + # Call the pipeline + total = 0 + async for output in pipeline(limit=20): + total += output + print("Total:", total) -async def main(): - await run(20) # takes ~40 seconds +if __name__ == "__main__": + asyncio.run(main()) ``` -Pyper uses the `|` (motivated by Unix's pipe operator) syntax as a representation of this input-output piping between tasks. +Pyper provides an elegant abstraction of the execution of each function via `pyper.task`, allowing you to focus on building out the **logical** functions of your program. In the `main` function: -
+* `pipeline` defines a function; this takes the parameters of its first task (`get_data`) and yields each output from its last task (`step3`) +* Tasks are piped together using the `|` operator (motivated by Unix's pipe operator) as a syntactic representation of passing inputs/outputs between tasks. -
-What would the implementation look like without Pyper? +In the pipeline, we are executing three different types of work: -
- -Concurrent programming in Python is notoriously difficult to get right. In a concurrent data pipeline, some challenges are: - -* We want producers to concurrently execute tasks and send results to the next stage as soon as it's done processing -* We want consumers to lazily pick up output as soon as it's available from the previous stage -* We need to somehow unify the execution of threads and coroutines, without letting non-awaitable tasks clog up the event-loop +* `task(step1, workers=20)` spins up 20 `asyncio.Task`s to handle asynchronous IO-bound work -The basic approach to doing this is by using queues-- a simplified and very unabstracted implementation could be: +* `task(step2, workers=20)` spins up 20 `threads` to handle synchronous IO-bound work -```python -async def pipeline(limit: int): - q1 = asyncio.Queue() - q2 = asyncio.Queue() - q3 = asyncio.Queue() - - step2_concurrency=20 - step3_concurrency=20 - - async def worker1(): - for data in step1(limit): - await q1.put(data) - for _ in range(step2_concurrency): - await q1.put(None) - - worker2s_finished = 0 - async def worker2(): - nonlocal worker2s_finished - while True: - data = await q1.get() - if data is None: - break - output = await step2(data) - await q2.put(output) - worker2s_finished += 1 - if worker2s_finished == step2_concurrency: - for _ in range(step3_concurrency): - await q2.put(None) - - worker3s_finished = 0 - async def worker3(): - nonlocal worker3s_finished - loop = asyncio.get_running_loop() - while True: - data = await q2.get() - if data is None: - break - # Pyper uses a custom thread group handler instead of run_in_executor - output = await loop.run_in_executor(None, step3, data) - await q3.put(output) - worker3s_finished += 1 - if worker3s_finished == step3_concurrency: - await q3.put(None) - - async with asyncio.TaskGroup() as tg: - # Start all workers in the background - tg.create_task(worker1()) - for _ in range(step2_concurrency): - tg.create_task(worker2()) - for _ in range(step3_concurrency): - tg.create_task(worker3()) - # Yield data until all workers have stopped - while True: - data = await q3.get() - if data is None: - break - yield data - - -async def run(limit: int): - await print_sum(pipeline(limit)) +* `task(step3, workers=20, multiprocess=True)` spins up 20 `processes` to handle synchronous CPU-bound work +`task` acts as one intuitive API for unifying the execution of each different type of function. -async def main(): - await run(20) # takes ~2 seconds -``` +Each task submits their outputs to the next task within the pipeline via queue-based data structures, which is the mechanism underpinning how concurrency and parallelism are achieved. See the [docs](https://pyper-dev.github.io/pyper/docs/UserGuide/BasicConcepts) for a breakdown of what a pipeline looks like under the hood. -This implementation achieves the basic desired concurrent data flow, but still lacks some quality-of-life features that Pyper takes care of, like error handling within threads. - -Pyper handles the complexities of managing queues and workers, so that this code can be reduced to the two-line main function in the example above. +---
-Do I have to use async? +See a non-async example
-No-- not every program is asynchronous, so Pyper pipelines are by default synchronous, as long as their tasks are defined as synchronous functions. For example: +Pyper pipelines are by default non-async, as long as their tasks are defined as synchronous functions. For example: ```python import time -from typing import Iterable from pyper import task -def step1(limit: int): +def get_data(limit: int): for i in range(limit): yield i - -def step2(data: int): +def step1(data: int): time.sleep(1) - return data + 1 - + print("Finished sync wait", data) + return data -def step3(data: int): - time.sleep(1) - return 2 * data - 1 +def step2(data: int): + for i in range(10_000_000): + _ = i*i + print("Finished heavy computation", data) + return data -def print_sum(data: Iterable[int]): +def main(): + pipeline = task(get_data, branch=True) \ + | task(step1, workers=20) \ + | task(step2, workers=20, multiprocess=True) total = 0 - for output in data: + for output in pipeline(limit=20): total += output - print("Total ", total) - - -def main(): - run = task(step1) \ - | task(step2, concurrency=20) \ - | task(step3, concurrency=20) \ - > print_sum - # Run synchronously - run(limit=20) + print("Total:", total) if __name__ == "__main__": - main() # takes ~2 seconds + main() ``` -A pipeline consisting of _at least one asynchronous function_ becomes an `AsyncPipeline`, which exposes the same logical function, provided `async` and `await` syntax in all of the obvious places. This makes it effortless to unify synchronously defined and asynchronously defined functions where need be. +A pipeline consisting of _at least one asynchronous function_ becomes an `AsyncPipeline`, which exposes the same usage API, provided `async` and `await` syntax in the obvious places. This makes it effortless to combine synchronously defined and asynchronously defined functions where need be.
@@ -284,9 +173,11 @@ To explore more of Pyper's features, see some further [examples](https://pyper-d ## Dependencies -Pyper is implemented in pure Python, with no sub-dependencies. It relies heavily on the well-established built-in modules: -* [asyncio](https://docs.python.org/3/library/asyncio.html) for handling async-based concurrency -* [threading](https://docs.python.org/3/library/threading.html) for handling thread-based concurrency +Pyper is implemented in pure Python, with no sub-dependencies. It is built on top of the well-established built-in Python modules: +* [threading](https://docs.python.org/3/library/threading.html) for thread-based concurrency +* [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) for parallelism +* [asyncio](https://docs.python.org/3/library/asyncio.html) for async-based concurrency +* [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) for unifying threads, processes, and async code ## License