Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

**Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API.

It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.
It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead.

**Key Features:**

Expand All @@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou

- **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly.

- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset.

- **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one.

- **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing.
Expand Down Expand Up @@ -197,6 +199,110 @@ results = (
)
```

### Pipeline Branching (Fan-out Processing)

```python
from laygo import Pipeline
from laygo.transformers.transformer import createTransformer

# Sample data: customer orders
orders = [
{"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"},
{"id": 2, "customer": "Bob", "amount": 25, "product": "book"},
{"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"},
{"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"},
{"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"},
]

# Create different analysis branches
high_value_analysis = (
createTransformer(dict)
.filter(lambda order: order["amount"] > 100)
.map(lambda order: {
"customer": order["customer"],
"amount": order["amount"],
"category": "high_value"
})
)

product_summary = (
createTransformer(dict)
.map(lambda order: {"product": order["product"], "count": 1})
# Group by product and sum counts (simplified example)
)

customer_spending = (
createTransformer(dict)
.map(lambda order: {
"customer": order["customer"],
"total_spent": order["amount"]
})
)

# Branch the pipeline into multiple concurrent analyses
results = Pipeline(orders).branch({
"high_value_orders": high_value_analysis,
"products": product_summary,
"customer_totals": customer_spending
})

print("High value orders:", results["high_value_orders"])
# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'},
# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}]

print("Product analysis:", len(results["products"]))
# 5 (all products processed)

print("Customer spending:", len(results["customer_totals"]))
# 5 (all customers processed)
```

### Advanced Branching with Error Isolation

```python
from laygo import Pipeline
from laygo.transformers.transformer import createTransformer

# Data with potential issues
mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8]

# Branch 1: Safe numeric processing
safe_numbers = (
createTransformer(int | str | None)
.filter(lambda x: isinstance(x, int) and x is not None)
.map(lambda x: x * 2)
)

# Branch 2: String processing with error handling
string_processing = (
createTransformer(int | str | None)
.filter(lambda x: isinstance(x, str))
.map(lambda x: f"processed_{x}")
.catch(lambda t: t.map(lambda x: "error_handled"))
)

# Branch 3: Statistical analysis
stats_analysis = (
createTransformer(int | str | None)
.filter(lambda x: isinstance(x, int) and x is not None)
.map(lambda x: x) # Pass through for stats
)

# Execute all branches concurrently
results = Pipeline(mixed_data).branch({
"numbers": safe_numbers,
"strings": string_processing,
"stats": stats_analysis
}, batch_size=100)

print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16]
print("Processed strings:", results["strings"]) # ['processed_invalid']
print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8]

# Each branch processes the complete dataset independently
# Errors in one branch don't affect others
```

### Error Handling and Recovery

```python
Expand Down
137 changes: 121 additions & 16 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ class Pipeline[T]:
A Pipeline provides a high-level interface for data processing by chaining
transformers together. It automatically manages a multiprocessing-safe
shared context that can be accessed by all transformers in the chain.

The Pipeline supports both streaming and batch processing patterns, with
built-in support for buffering, branching (fan-out), and parallel processing.

Example:
>>> data = [1, 2, 3, 4, 5]
>>> result = (Pipeline(data)
... .transform(lambda t: t.filter(lambda x: x % 2 == 0))
... .transform(lambda t: t.map(lambda x: x * 2))
... .to_list())
>>> result # [4, 8]

Note:
Most pipeline operations consume the internal iterator, making the
pipeline effectively single-use unless the data source is re-initialized.
"""

def __init__(self, *data: Iterable[T]) -> None:
Expand Down Expand Up @@ -64,14 +79,22 @@ def __del__(self) -> None:
def context(self, ctx: PipelineContext) -> "Pipeline[T]":
"""Update the pipeline context and store a reference to the original context.

When the pipeline finishes processing, the original context will be updated
with the final pipeline context data.
The provided context will be used during pipeline execution and any
modifications made by transformers will be synchronized back to the
original context when the pipeline finishes processing.

Args:
ctx: The pipeline context to use for this pipeline execution.
ctx: The pipeline context dictionary to use for this pipeline execution.
This should be a mutable dictionary-like object that transformers
can use to share state and communicate.

Returns:
The pipeline instance for method chaining.

Note:
Changes made to the context during pipeline execution will be
automatically synchronized back to the original context object
when the pipeline is destroyed or processing completes.
"""
# Store reference to the original context
self._original_context_ref = ctx
Expand All @@ -96,13 +119,21 @@ def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "
"""Apply a transformation using a lambda function.

Creates a Transformer under the hood and applies it to the pipeline.
This is a shorthand method for simple transformations.
This is a shorthand method for simple transformations that allows
chaining transformer operations in a functional style.

Args:
t: A callable that takes a transformer and returns a transformed transformer.
Typically used with lambda expressions like:
`lambda t: t.map(func).filter(predicate)`

Returns:
A new Pipeline with the transformed data.
A new Pipeline with the transformed data type.

Example:
>>> pipeline = Pipeline([1, 2, 3, 4, 5])
>>> result = pipeline.transform(lambda t: t.filter(lambda x: x % 2 == 0).map(lambda x: x * 2))
>>> result.to_list() # [4, 8]
"""
# Create a new transformer and apply the transformation function
transformer = t(Transformer[T, T]())
Expand All @@ -125,17 +156,28 @@ def apply[U](
) -> "Pipeline[U]":
"""Apply a transformer to the current data source.

The pipeline's managed context is passed down to the transformer.
This method accepts various types of transformers and applies them to
the pipeline data. The pipeline's managed context is automatically
passed to context-aware transformers.

Args:
transformer: Either a Transformer instance or a callable function
that processes the data.
transformer: One of the following:
- A Transformer instance (preferred for complex operations)
- A callable function that takes an iterable and returns an iterator
- A context-aware callable that takes an iterable and context

Returns:
A new Pipeline with the transformed data.
The same Pipeline instance with transformed data (for method chaining).

Raises:
TypeError: If the transformer is not a supported type.

Example:
>>> pipeline = Pipeline([1, 2, 3])
>>> # Using a Transformer instance
>>> pipeline.apply(createTransformer(int).map(lambda x: x * 2))
>>> # Using a simple function
>>> pipeline.apply(lambda data: (x * 2 for x in data))
"""
match transformer:
case Transformer():
Expand All @@ -157,7 +199,34 @@ def branch(
max_batch_buffer: int = 1,
use_queue_chunks: bool = True,
) -> dict[str, list[Any]]:
"""Forks the pipeline into multiple branches for concurrent, parallel processing."""
"""Forks the pipeline into multiple branches for concurrent, parallel processing.

This is a **terminal operation** that implements a fan-out pattern where
the entire dataset is copied to each branch for independent processing.
Each branch processes the complete dataset concurrently using separate
transformers, and results are collected and returned in a dictionary.

Args:
branches: A dictionary where keys are branch names (str) and values
are `Transformer` instances of any subtype.
batch_size: The number of items to batch together when sending data
to branches. Larger batches can improve throughput but
use more memory. Defaults to 1000.
max_batch_buffer: The maximum number of batches to buffer for each
branch queue. Controls memory usage and creates
backpressure. Defaults to 1.
use_queue_chunks: Whether to use passthrough chunking for the
transformers. When True, batches are processed
as chunks. Defaults to True.

Returns:
A dictionary where keys are the branch names and values are lists
of all items processed by that branch's transformer.

Note:
This operation consumes the pipeline's iterator, making subsequent
operations on the same pipeline return empty results.
"""
if not branches:
self.consume()
return {}
Expand Down Expand Up @@ -258,48 +327,84 @@ def _producer() -> None:
def __iter__(self) -> Iterator[T]:
"""Allow the pipeline to be iterated over.

This makes the Pipeline compatible with Python's iterator protocol,
allowing it to be used in for loops, list comprehensions, and other
contexts that expect an iterable.

Returns:
An iterator over the processed data.

Note:
This operation consumes the pipeline's iterator, making subsequent
operations on the same pipeline return empty results.
"""
yield from self.processed_data

def to_list(self) -> list[T]:
"""Execute the pipeline and return the results as a list.

This is a terminal operation that consumes the pipeline's iterator
and materializes all results into memory.

Returns:
A list containing all processed items from the pipeline.

Note:
This operation consumes the pipeline's iterator, making subsequent
operations on the same pipeline return empty results.
"""
return list(self.processed_data)

def each(self, function: PipelineFunction[T]) -> None:
"""Apply a function to each element (terminal operation).

This is a terminal operation that processes each element for side effects
and consumes the pipeline's iterator without returning results.

Args:
function: The function to apply to each element.
function: The function to apply to each element. Should be used for
side effects like logging, updating external state, etc.

Note:
This operation consumes the pipeline's iterator, making subsequent
operations on the same pipeline return empty results.
"""
for item in self.processed_data:
function(item)

def first(self, n: int = 1) -> list[T]:
"""Get the first n elements of the pipeline (terminal operation).

This is a terminal operation that consumes up to n elements from the
pipeline's iterator and returns them as a list.

Args:
n: The number of elements to retrieve.
n: The number of elements to retrieve. Must be at least 1.

Returns:
A list containing the first n elements.
A list containing the first n elements, or fewer if the pipeline
contains fewer than n elements.

Raises:
AssertionError: If n is less than 1.

Note:
This operation partially consumes the pipeline's iterator. Subsequent
operations will continue from where this operation left off.
"""
assert n >= 1, "n must be at least 1"
return list(itertools.islice(self.processed_data, n))

def consume(self) -> None:
"""Consume the pipeline without returning results.
"""Consume the pipeline without returning results (terminal operation).

This is a terminal operation that processes all elements in the pipeline
for their side effects without materializing any results. Useful when
the pipeline operations have side effects and you don't need the results.

This is useful when you want to execute the pipeline for side effects
without collecting the results.
Note:
This operation consumes the pipeline's iterator, making subsequent
operations on the same pipeline return empty results.
"""
for _ in self.processed_data:
pass
Loading
Loading