From 6f52972d82ecd8c71208a8b2af0dd5bd51e4ccee Mon Sep 17 00:00:00 2001 From: Richard Zhu Date: Sun, 19 Jan 2025 12:08:30 +0000 Subject: [PATCH 1/2] add error message for invalid multiprocess --- src/pyper/_core/task.py | 16 +++++++++++++--- tests/test_task.py | 20 +++++++++++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/pyper/_core/task.py b/src/pyper/_core/task.py index e57ad6f..8e3ac70 100644 --- a/src/pyper/_core/task.py +++ b/src/pyper/_core/task.py @@ -48,9 +48,19 @@ def __init__( or inspect.iscoroutinefunction(func.__call__) \ or inspect.isasyncgenfunction(func.__call__) - if self.is_async and multiprocess: - raise ValueError("multiprocess cannot be True for an async task") - + if multiprocess: + # Asynchronous functions cannot be multiprocessed + if self.is_async: + raise ValueError("multiprocess cannot be True for an async task") + + # The function needs to be globally accessible to be multiprocessed + # This excludes objects like lambdas and closures + # We capture these cases to throw a clear error message + module = inspect.getmodule(func) + if module is None or getattr(module, func.__name__, None) is not func: + raise RuntimeError(f"{func} cannot be multiprocessed because it is not globally accessible" + f" -- it must be a globally defined object accessible by the name {func.__name__}") + self.func = func if bind is None else functools.partial(func, *bind[0], **bind[1]) self.branch = branch self.join = join diff --git a/tests/test_task.py b/tests/test_task.py index 0f58372..2e216d1 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -74,7 +74,7 @@ def test_raise_for_invalid_func(): else: raise AssertionError -def test_raise_for_invalid_multiprocess(): +def test_raise_for_async_multiprocess(): try: task(afunc, multiprocess=True) except Exception as e: @@ -82,6 +82,24 @@ def test_raise_for_invalid_multiprocess(): else: raise AssertionError +def test_raise_for_lambda_multiprocess(): + try: + task(lambda x: x, multiprocess=True) + except Exception as e: + assert isinstance(e, RuntimeError) + else: + raise AssertionError + +def test_raise_for_non_global_multiprocess(): + try: + @task(multiprocess=True) + def f(x): + return x + except Exception as e: + assert isinstance(e, RuntimeError) + else: + raise AssertionError + def test_async_task(): p = task(afunc) assert isinstance(p, AsyncPipeline) From aa4e7079193a12311a7661be28e4c677ea704492 Mon Sep 17 00:00:00 2001 From: Richard Zhu Date: Sun, 19 Jan 2025 17:44:54 +0000 Subject: [PATCH 2/2] update docs (discourage @-syntax use of task) --- docs/src/docs/ApiReference/task.md | 19 +++++++------ docs/src/docs/UserGuide/AdvancedConcepts.md | 27 +++++++++---------- docs/src/docs/UserGuide/BasicConcepts.md | 18 +++++++------ docs/src/docs/UserGuide/ComposingPipelines.md | 24 +++++++++++------ docs/src/docs/UserGuide/CreatingPipelines.md | 16 +++-------- 5 files changed, 51 insertions(+), 53 deletions(-) diff --git a/docs/src/docs/ApiReference/task.md b/docs/src/docs/ApiReference/task.md index ac9d446..953bcaa 100644 --- a/docs/src/docs/ApiReference/task.md +++ b/docs/src/docs/ApiReference/task.md @@ -52,11 +52,6 @@ The function or callable object defining the logic of the task. This is a positi ```python from pyper import task -@task -def add_one(x: int): - return x + 1 - -# OR def add_one(x: int): return x + 1 @@ -123,11 +118,9 @@ When `join` is `False`, a producer-consumer takes each individual output from th from typing import Iterable from pyper import task -@task(branch=True) def create_data(x: int): return [x + 1, x + 2, x + 3] -@task(branch=True, join=True) def running_total(data: Iterable[int]): total = 0 for item in data: @@ -135,7 +128,10 @@ def running_total(data: Iterable[int]): yield total if __name__ == "__main__": - pipeline = create_data | running_total + pipeline = ( + task(create_data, branch=True) + | task(running_total, branch=True, join=True) + ) for output in pipeline(0): print(output) #> 1 @@ -190,15 +186,18 @@ The parameter `throttle` determines the maximum size of a task's output queue. T import time from pyper import task -@task(branch=True, throttle=5000) def fast_producer(): for i in range(1_000_000): yield i -@task def slow_consumer(data: int): time.sleep(10) return data + +pipeline = ( + task(fast_consumer, branch=True, throttle=5000) + | task(slow_consumer) +) ``` In the example above, workers on `fast_producer` are paused after `5000` values have been generated, until workers for `slow_consumer` are ready to start processing again. diff --git a/docs/src/docs/UserGuide/AdvancedConcepts.md b/docs/src/docs/UserGuide/AdvancedConcepts.md index c6e4285..02e4adf 100644 --- a/docs/src/docs/UserGuide/AdvancedConcepts.md +++ b/docs/src/docs/UserGuide/AdvancedConcepts.md @@ -72,19 +72,16 @@ Executing CPU-bound tasks concurrently does not improve performance, as CPU-boun The correct way to optimize the performance of CPU-bound tasks is through parallel execution, using multiprocessing. ```python -# Okay -@task(workers=10, multiprocess=True) def long_computation(data: int): for i in range(1, 1_000_000): data *= i return data +# Okay +pipeline = task(long_computation, workers=10, multiprocess=True) + # Bad -- cannot benefit from concurrency -@task(workers=10) -def long_computation(data: int): - for i in range(1, 1_000_000): - data *= i - return data +pipeline = task(long_computation, workers=10) ``` Note, however, that processes incur a very high overhead cost (performance cost in creation and memory cost in inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine. @@ -115,7 +112,6 @@ In Pyper, it is especially important to separate out different types of work int ```python # Bad -- functions not separated -@task(branch=True, workers=20) def get_data(endpoint: str): # IO-bound work r = requests.get(endpoint) @@ -124,23 +120,28 @@ def get_data(endpoint: str): # CPU-bound work for item in data["results"]: yield process_data(item) + +pipeline = task(get_data, branch=True, workers=20) ``` -Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task is blocking and will harm concurrency. +Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task requires holding onto the GIL and will harm concurrency. Instead, `process_data` should be implemented as a separate function: ```python -@task(branch=True, workers=20) def get_data(endpoint: str): # IO-bound work r = requests.get(endpoint) data = r.json() return data["results"] -@task(workers=10, multiprocess=True) def process_data(data): # CPU-bound work return ... + +pipeline = ( + task(get_data, branch=True, workers=20) + | task(workers=10, multiprocess=True) +) ``` ### Resource Management @@ -225,14 +226,12 @@ import typing from pyper import task # Okay -@task(branch=True) def generate_values_lazily() -> typing.Iterable[dict]: for i in range(10_000_000): yield {"data": i} # Bad -- this creates 10 million values in memory -# Subsequent tasks also cannot start executing until the entire list is created -@task(branch=True) +# Within a pipeline, subsequent tasks also cannot start executing until the entire list is created def create_values_in_list() -> typing.List[dict]: return [{"data": i} for i in range(10_000_000)] ``` diff --git a/docs/src/docs/UserGuide/BasicConcepts.md b/docs/src/docs/UserGuide/BasicConcepts.md index 1ad0d5c..b4eadaf 100644 --- a/docs/src/docs/UserGuide/BasicConcepts.md +++ b/docs/src/docs/UserGuide/BasicConcepts.md @@ -19,23 +19,24 @@ Pyper follows the [functional paradigm](https://docs.python.org/3/howto/function * Python functions are the building blocks used to create `Pipeline` objects * `Pipeline` objects can themselves be thought of as functions -For example, to create a simple pipeline, we can wrap a function in the `task` decorator: +For example, to create a simple pipeline, we can wrap a function in the `task` class: ```python from pyper import task -@task def len_strings(x: str, y: str) -> int: return len(x) + len(y) + +pipeline = task(len_strings) ``` -This defines `len_strings` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue: +This defines `pipeline` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue: Diagram **Key Concepts** -* A pipeline is a functional reprentation of data-flow _(Pyper API)_ +* A Pipeline is a representation of data-flow _(Pyper API)_ * A **task** represents a single functional operation within a pipeline _(user defined)_ * Under the hood, tasks pass data along via workers and queues _(Pyper internal)_ @@ -45,21 +46,22 @@ Pipelines are composable components; to create a pipeline which runs multiple ta import time from pyper import task -@task def len_strings(x: str, y: str) -> int: return len(x) + len(y) -@task(workers=3) def sleep(data: int) -> int: time.sleep(data) return data -@task(workers=2) def calculate(data: int) -> bool: time.sleep(data) return data % 2 == 0 -pipeline = len_strings | sleep | calculate +pipeline = ( + task(len_strings) + | task(sleep, workers=3) + | task(calculate, workers=2) +) ``` This defines `pipeline` as a series of tasks, taking the parameters `(x: str, y: str)` and generating `bool` outputs: diff --git a/docs/src/docs/UserGuide/ComposingPipelines.md b/docs/src/docs/UserGuide/ComposingPipelines.md index 925be28..90b44ac 100644 --- a/docs/src/docs/UserGuide/ComposingPipelines.md +++ b/docs/src/docs/UserGuide/ComposingPipelines.md @@ -53,12 +53,10 @@ from typing import Dict, Iterable from pyper import task -@task(branch=True) def step1(limit: int): for i in range(limit): yield {"data": i} -@task def step2(data: Dict): return data | {"hello": "world"} @@ -72,7 +70,7 @@ class JsonFileWriter: json.dump(data_list, f, indent=4) if __name__ == "__main__": - pipeline = step1 | step2 # The pipeline + pipeline = task(step1, branch=True) | task(step2) # The pipeline writer = JsonFileWriter("data.json") # A consumer writer(pipeline(limit=10)) # Run ``` @@ -80,10 +78,18 @@ if __name__ == "__main__": The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes an `Iterable` of inputs) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method. ```python if __name__ == "__main__": - run = step1 | step2 > JsonFileWriter("data.json") + run = ( + task(step1, branch=True) + | task(step2) + > JsonFileWriter("data.json") + ) run(limit=10) # OR - run = step1.pipe(step2).consume(JsonFileWriter("data.json")) + run = ( + task(step1, branch=True).pipe( + task(step2)).consume( + JsonFileWriter("data.json")) + ) run(limit=10) ``` @@ -163,12 +169,10 @@ from typing import AsyncIterable, Dict from pyper import task -@task(branch=True) async def step1(limit: int): for i in range(limit): yield {"data": i} -@task def step2(data: Dict): return data | {"hello": "world"} @@ -182,7 +186,11 @@ class AsyncJsonFileWriter: json.dump(data_list, f, indent=4) async def main(): - run = step1 | step2 > AsyncJsonFileWriter("data.json") + run = ( + task(step1, branch=True) + | task(step2) + > AsyncJsonFileWriter("data.json") + ) await run(limit=10) if __name__ == "__main__": diff --git a/docs/src/docs/UserGuide/CreatingPipelines.md b/docs/src/docs/UserGuide/CreatingPipelines.md index d80b015..7c9786a 100644 --- a/docs/src/docs/UserGuide/CreatingPipelines.md +++ b/docs/src/docs/UserGuide/CreatingPipelines.md @@ -19,26 +19,16 @@ Pyper's `task` decorator is the means by which we instantiate pipelines and cont ```python from pyper import task, Pipeline -@task def func(x: int): return x + 1 -assert isinstance(func, Pipeline) +pipeline = task(func) + +assert isinstance(pipeline, Pipeline) ``` This creates a `Pipeline` object consisting of one 'task' (one step of data transformation). -The `task` decorator can also be used more dynamically, which is preferable in most cases as this separates execution logic from the functional definitions themselves: - -```python -from pyper import task - -def func(x: int): - return x + 1 - -pipeline = task(func) -``` - In addition to functions, anything `callable` in Python can be wrapped in `task` in the same way: ```python