Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions docs/src/docs/ApiReference/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ The function or callable object defining the logic of the task. This is a positi
```python
from pyper import task

@task
def add_one(x: int):
return x + 1

# OR
def add_one(x: int):
return x + 1

Expand Down Expand Up @@ -123,19 +118,20 @@ When `join` is `False`, a producer-consumer takes each individual output from th
from typing import Iterable
from pyper import task

@task(branch=True)
def create_data(x: int):
return [x + 1, x + 2, x + 3]

@task(branch=True, join=True)
def running_total(data: Iterable[int]):
total = 0
for item in data:
total += item
yield total

if __name__ == "__main__":
pipeline = create_data | running_total
pipeline = (
task(create_data, branch=True)
| task(running_total, branch=True, join=True)
)
for output in pipeline(0):
print(output)
#> 1
Expand Down Expand Up @@ -190,15 +186,18 @@ The parameter `throttle` determines the maximum size of a task's output queue. T
import time
from pyper import task

@task(branch=True, throttle=5000)
def fast_producer():
for i in range(1_000_000):
yield i

@task
def slow_consumer(data: int):
time.sleep(10)
return data

pipeline = (
task(fast_consumer, branch=True, throttle=5000)
| task(slow_consumer)
)
```

In the example above, workers on `fast_producer` are paused after `5000` values have been generated, until workers for `slow_consumer` are ready to start processing again.
Expand Down
27 changes: 13 additions & 14 deletions docs/src/docs/UserGuide/AdvancedConcepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,16 @@ Executing CPU-bound tasks concurrently does not improve performance, as CPU-boun
The correct way to optimize the performance of CPU-bound tasks is through parallel execution, using multiprocessing.

```python
# Okay
@task(workers=10, multiprocess=True)
def long_computation(data: int):
for i in range(1, 1_000_000):
data *= i
return data

# Okay
pipeline = task(long_computation, workers=10, multiprocess=True)

# Bad -- cannot benefit from concurrency
@task(workers=10)
def long_computation(data: int):
for i in range(1, 1_000_000):
data *= i
return data
pipeline = task(long_computation, workers=10)
```

Note, however, that processes incur a very high overhead cost (performance cost in creation and memory cost in inter-process communication). Specific cases should be benchmarked to fine-tune the task parameters for your program / your machine.
Expand Down Expand Up @@ -115,7 +112,6 @@ In Pyper, it is especially important to separate out different types of work int

```python
# Bad -- functions not separated
@task(branch=True, workers=20)
def get_data(endpoint: str):
# IO-bound work
r = requests.get(endpoint)
Expand All @@ -124,23 +120,28 @@ def get_data(endpoint: str):
# CPU-bound work
for item in data["results"]:
yield process_data(item)

pipeline = task(get_data, branch=True, workers=20)
```

Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task is blocking and will harm concurrency.
Whilst it makes sense to handle the network request concurrently, the call to `process_data` within the same task requires holding onto the GIL and will harm concurrency.
Instead, `process_data` should be implemented as a separate function:

```python
@task(branch=True, workers=20)
def get_data(endpoint: str):
# IO-bound work
r = requests.get(endpoint)
data = r.json()
return data["results"]

@task(workers=10, multiprocess=True)
def process_data(data):
# CPU-bound work
return ...

pipeline = (
task(get_data, branch=True, workers=20)
| task(workers=10, multiprocess=True)
)
```

### Resource Management
Expand Down Expand Up @@ -225,14 +226,12 @@ import typing
from pyper import task

# Okay
@task(branch=True)
def generate_values_lazily() -> typing.Iterable[dict]:
for i in range(10_000_000):
yield {"data": i}

# Bad -- this creates 10 million values in memory
# Subsequent tasks also cannot start executing until the entire list is created
@task(branch=True)
# Within a pipeline, subsequent tasks also cannot start executing until the entire list is created
def create_values_in_list() -> typing.List[dict]:
return [{"data": i} for i in range(10_000_000)]
```
Expand Down
18 changes: 10 additions & 8 deletions docs/src/docs/UserGuide/BasicConcepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@ Pyper follows the [functional paradigm](https://docs.python.org/3/howto/function
* Python functions are the building blocks used to create `Pipeline` objects
* `Pipeline` objects can themselves be thought of as functions

For example, to create a simple pipeline, we can wrap a function in the `task` decorator:
For example, to create a simple pipeline, we can wrap a function in the `task` class:

```python
from pyper import task

@task
def len_strings(x: str, y: str) -> int:
return len(x) + len(y)

pipeline = task(len_strings)
```

This defines `len_strings` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue:
This defines `pipeline` as a pipeline consisting of a single task. It takes the parameters `(x: str, y: str)` and generates `int` outputs from an output queue:

<img src="../../assets/img/diagram1.png" alt="Diagram" style="height: 250px; width: auto;">

**Key Concepts**

* A <b style="color:#3399FF;">pipeline</b> is a functional reprentation of data-flow _(Pyper API)_
* A <b style="color:#3399FF;">Pipeline</b> is a representation of data-flow _(Pyper API)_
* A **task** represents a single functional operation within a pipeline _(user defined)_
* Under the hood, tasks pass data along via <b style="color:#FF8000;">workers</b> and <b style="color:#FF8000;">queues</b> _(Pyper internal)_

Expand All @@ -45,21 +46,22 @@ Pipelines are composable components; to create a pipeline which runs multiple ta
import time
from pyper import task

@task
def len_strings(x: str, y: str) -> int:
return len(x) + len(y)

@task(workers=3)
def sleep(data: int) -> int:
time.sleep(data)
return data

@task(workers=2)
def calculate(data: int) -> bool:
time.sleep(data)
return data % 2 == 0

pipeline = len_strings | sleep | calculate
pipeline = (
task(len_strings)
| task(sleep, workers=3)
| task(calculate, workers=2)
)
```

This defines `pipeline` as a series of tasks, taking the parameters `(x: str, y: str)` and generating `bool` outputs:
Expand Down
24 changes: 16 additions & 8 deletions docs/src/docs/UserGuide/ComposingPipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ from typing import Dict, Iterable

from pyper import task

@task(branch=True)
def step1(limit: int):
for i in range(limit):
yield {"data": i}

@task
def step2(data: Dict):
return data | {"hello": "world"}

Expand All @@ -72,18 +70,26 @@ class JsonFileWriter:
json.dump(data_list, f, indent=4)

if __name__ == "__main__":
pipeline = step1 | step2 # The pipeline
pipeline = task(step1, branch=True) | task(step2) # The pipeline
writer = JsonFileWriter("data.json") # A consumer
writer(pipeline(limit=10)) # Run
```

The `>` operator (again inspired by UNIX syntax) is used to pipe a `Pipeline` into a consumer function (any callable that takes an `Iterable` of inputs) returning simply a function that handles the 'run' operation. This is syntactic sugar for the `Pipeline.consume` method.
```python
if __name__ == "__main__":
run = step1 | step2 > JsonFileWriter("data.json")
run = (
task(step1, branch=True)
| task(step2)
> JsonFileWriter("data.json")
)
run(limit=10)
# OR
run = step1.pipe(step2).consume(JsonFileWriter("data.json"))
run = (
task(step1, branch=True).pipe(
task(step2)).consume(
JsonFileWriter("data.json"))
)
run(limit=10)
```

Expand Down Expand Up @@ -163,12 +169,10 @@ from typing import AsyncIterable, Dict

from pyper import task

@task(branch=True)
async def step1(limit: int):
for i in range(limit):
yield {"data": i}

@task
def step2(data: Dict):
return data | {"hello": "world"}

Expand All @@ -182,7 +186,11 @@ class AsyncJsonFileWriter:
json.dump(data_list, f, indent=4)

async def main():
run = step1 | step2 > AsyncJsonFileWriter("data.json")
run = (
task(step1, branch=True)
| task(step2)
> AsyncJsonFileWriter("data.json")
)
await run(limit=10)

if __name__ == "__main__":
Expand Down
16 changes: 3 additions & 13 deletions docs/src/docs/UserGuide/CreatingPipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,16 @@ Pyper's `task` decorator is the means by which we instantiate pipelines and cont
```python
from pyper import task, Pipeline

@task
def func(x: int):
return x + 1

assert isinstance(func, Pipeline)
pipeline = task(func)

assert isinstance(pipeline, Pipeline)
```

This creates a `Pipeline` object consisting of one 'task' (one step of data transformation).

The `task` decorator can also be used more dynamically, which is preferable in most cases as this separates execution logic from the functional definitions themselves:

```python
from pyper import task

def func(x: int):
return x + 1

pipeline = task(func)
```

In addition to functions, anything `callable` in Python can be wrapped in `task` in the same way:

```python
Expand Down
16 changes: 13 additions & 3 deletions src/pyper/_core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ def __init__(
or inspect.iscoroutinefunction(func.__call__) \
or inspect.isasyncgenfunction(func.__call__)

if self.is_async and multiprocess:
raise ValueError("multiprocess cannot be True for an async task")

if multiprocess:
# Asynchronous functions cannot be multiprocessed
if self.is_async:
raise ValueError("multiprocess cannot be True for an async task")

# The function needs to be globally accessible to be multiprocessed
# This excludes objects like lambdas and closures
# We capture these cases to throw a clear error message
module = inspect.getmodule(func)
if module is None or getattr(module, func.__name__, None) is not func:
raise RuntimeError(f"{func} cannot be multiprocessed because it is not globally accessible"
f" -- it must be a globally defined object accessible by the name {func.__name__}")

self.func = func if bind is None else functools.partial(func, *bind[0], **bind[1])
self.branch = branch
self.join = join
Expand Down
20 changes: 19 additions & 1 deletion tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,32 @@ def test_raise_for_invalid_func():
else:
raise AssertionError

def test_raise_for_invalid_multiprocess():
def test_raise_for_async_multiprocess():
try:
task(afunc, multiprocess=True)
except Exception as e:
assert isinstance(e, ValueError)
else:
raise AssertionError

def test_raise_for_lambda_multiprocess():
try:
task(lambda x: x, multiprocess=True)
except Exception as e:
assert isinstance(e, RuntimeError)
else:
raise AssertionError

def test_raise_for_non_global_multiprocess():
try:
@task(multiprocess=True)
def f(x):
return x
except Exception as e:
assert isinstance(e, RuntimeError)
else:
raise AssertionError

def test_async_task():
p = task(afunc)
assert isinstance(p, AsyncPipeline)
Expand Down