# tinypipe Demo Notebook

This notebook demonstrates the key features of **tinypipe**, a lightweight, Pydantic-powered library for sequential pipelines in Python. tinypipe is ideal for data processing, scientific workflows (e.g., ligand-binding assays), and any task needing a clear, linear sequence of steps.

## Features Demonstrated
- **Sequential Workflows**: Run steps in a fixed order.
- **Decorator API**: Define steps with `@pipestep` for simplicity.
- **Pydantic Context**: Type-safe state management with `PipelineContext`.
- **Smart Caching**: Skip unchanged steps using `fingerprint_keys`.
- **Flexible Control**: Use `ctx.abort_pass()` or `ctx.abort_pipeline()`.
- **Composable Steps**: Nest workflow steps.
- **Bio-Inspired Workflow**: Example for ligand-binding assay (LBA) processing.
- **Observability**: Inspect timing and cache hits via `step_meta`.

Let's get started!

In [None]:
# Import tinypipe components
from tinypipe import Step, pipestep, Pipeline, PipelineContext
import logging

# Configure logging to see pipeline execution
logging.basicConfig(level=logging.INFO, format="%(message)s")

## 1. Sequential Workflows

tinypipe runs steps in a fixed order within a `Pipeline`. Here, we create a simple pipeline with three steps to process data sequentially.

In [None]:
@pipestep(name="load_data")
def load_data(ctx: PipelineContext) -> PipelineContext:
    ctx.data = [1.0, 2.0, 3.0]  # Simulated input data
    return ctx


@pipestep(name="double")
def double(ctx: PipelineContext) -> PipelineContext:
    ctx.data = [x * 2 for x in ctx.data]
    return ctx


@pipestep(name="offset")
def offset(ctx: PipelineContext) -> PipelineContext:
    ctx.data = [x + 1 for x in ctx.data]
    return ctx


# Create and run the pipeline
pipeline = Pipeline([load_data, double, offset], name="simple_pipeline")
ctx = PipelineContext()
pipeline.run(ctx)

print(f"Result: {ctx.data}")  # Expected: [3.0, 5.0, 7.0]

## 2. Decorator API and Pydantic Context

The `@pipestep` decorator simplifies step definitions, while `PipelineContext` (Pydantic-based) ensures type-safe state with flexible attributes (`extra="allow"`). Here, we use `@pipestep` to clean data and store results in `ctx.output`.

In [None]:
@pipestep(name="clean_data", fingerprint_keys=("input",))
def clean_data(ctx: PipelineContext) -> PipelineContext:
    ctx.output = [x for x in ctx.input if x is not None]
    return ctx


# Run a single step
ctx = PipelineContext(input=[1, None, 3, None, 5])
clean_data.run(ctx)
print(f"Cleaned data: {ctx.output}")  # Expected: [1, 3, 5]

## 3. Smart Caching

tinypipe uses `fingerprint_keys` to cache step results based on input state. If inputs haven't changed, the step is skipped (cache hit). Let's run the `clean_data` step again with the same input.

In [None]:
# Run twice with same input
ctx = PipelineContext(input=[1, None, 3, None, 5])
clean_data.run(ctx)
print(f"Cleaned data: {ctx.output}")  # Expected: [1, 3, 5]
clean_data.run(ctx)  # Should log "Skipping clean_data (cache hit)"
print(f"Cleaned data (cached): {ctx.output}")

# Change input to bypass cache
ctx = PipelineContext(input=[1, None, 4, None, 5])
clean_data.run(ctx)  # Should recompute
print(f"Cleaned data (new input): {ctx.output}")

## 4. Flexible Control with Pipeline Signals

Use the context helpers:
- `ctx.abort_pass()` to stop the current pass and let the `Pipeline` run another (up to `max_passes`).
- `ctx.abort_pipeline()` to stop everything immediately.

Here, we demonstrate retrying a pipeline if data is invalid.

In [None]:
@pipestep(name="load_with_error")
def load_with_error(ctx: PipelineContext) -> PipelineContext:
    ctx.attempt = getattr(ctx, "attempt", 0) + 1
    if ctx.attempt == 1:
        ctx.data = [-1.0, 2.0, 3.0]  # Invalid data (negative value)
    else:
        ctx.data = [1.0, 2.0, 3.0]  # Fixed data
    return ctx


@pipestep(name="validate", fingerprint_keys=("data",))
def validate(ctx: PipelineContext) -> PipelineContext:
    if any(x < 0 for x in ctx.data):
        ctx.abort_pass()  # Retry another pass
    return ctx


pipeline = Pipeline([load_with_error, validate], name="retry_pipeline", max_passes=3)
ctx = PipelineContext()
pipeline.run(ctx)

print(f"Final data: {ctx.data}")  # Expected: [1.0, 2.0, 3.0]
print(f"Attempts: {ctx.attempt}")  # Expected: 2

## 5. Composable Steps

Steps can be nested. This example nests calibration and validation within a processing step.

In [None]:
@pipestep(name="calibrate", fingerprint_keys=("data",))
def calibrate(ctx: PipelineContext) -> PipelineContext:
    ctx.calibrated = [x * 1.5 for x in ctx.data]  # Calibration factor
    return ctx


# Reuse validate from above
process = Step("process", children=[calibrate, validate])
pipeline = Pipeline([load_data, process], name="nested_pipeline")
ctx = PipelineContext()
pipeline.run(ctx)

print(f"Calibrated data: {ctx.calibrated}")  # Expected: [1.5, 3.0, 4.5]

## 6. Bio-Inspired Workflow (LBA)

tinypipe was initially designed for ligand-binding assay (LBA) pipelines. This example mimics loading, calibrating, and validating assay data.

In [None]:
@pipestep(name="load_assay_data")
def load_assay_data(ctx: PipelineContext) -> PipelineContext:
    ctx.assay_data = [0.1, 0.2, 0.3]  # Simulated absorbance readings
    return ctx


@pipestep(name="calibrate_assay", fingerprint_keys=("assay_data",))
def calibrate_assay(ctx: PipelineContext) -> PipelineContext:
    # Apply calibration curve (simplified)
    ctx.concentrations = [x * 100 for x in ctx.assay_data]  # Convert to ng/mL
    return ctx


@pipestep(name="validate_assay", fingerprint_keys=("concentrations",))
def validate_assay(ctx: PipelineContext) -> PipelineContext:
    if any(x < 0 for x in ctx.concentrations):
        ctx.abort_pass()  # Retry if invalid
    return ctx


lba_process = Step("lba_process", children=[calibrate_assay, validate_assay])
pipeline = Pipeline([load_assay_data, lba_process], name="lba_pipeline", max_passes=3)
ctx = PipelineContext()
pipeline.run(ctx)

print(f"Concentrations: {ctx.concentrations}")  # Expected: [10.0, 20.0, 30.0]

## 7. Observability with step_meta

The `step_meta` dictionary tracks timing, cache hits, and status for each step. Let's inspect it after running the LBA pipeline.

In [None]:
print("Step metadata:")
for step_name, meta in ctx.step_meta.items():
    print(f"{step_name}: {meta}")
    # Example: {'input_hash': '...', 'status': 'ok', 'duration': 0.001}

## 8. Error Handling and Pipeline Abort

Steps can raise exceptions or use `ctx.abort_pipeline()` to stop execution. Here, we simulate a fatal error.

In [None]:
@pipestep(name="faulty_step")
def faulty_step(ctx: PipelineContext) -> PipelineContext:
    ctx.abort_pipeline()  # Stop the pipeline
    return ctx


pipeline = Pipeline([load_data, faulty_step], name="error_pipeline")
ctx = PipelineContext()
pipeline.run(ctx)  # Logs: aborted after N pass(es)

print(f"Data after abort: {getattr(ctx, 'data', None)}")  # Expected: [1.0, 2.0, 3.0]

## Summary

tinypipe provides a lightweight, type-safe way to build sequential pipelines with caching, nesting, and flexible control. Its `@pipestep` decorator simplifies step creation, while `PipelineContext` helpers (`abort_pass`, `abort_pipeline`) ensure robust flow control.