# Getting Started

`pypekit` is a Python library for designing and running data-processing pipelines.  
Describe each task once and `pypekit` will automatically construct every valid pipeline for you and execute them with caching.


## Installation

```
pip install pypekit
```


## Defining Tasks

Create a task by subclassing `Task`, overriding the `input_types` and `output_types` properties, and implementing the `run()` method.

| Property           | Purpose                                                              |
| ------------------ | -------------------------------------------------------------------- |
| **`input_types`**  | A list of strings representing input types that the task can accept. |
| **`output_types`** | A list of strings representing output types that the task produces.  |

`run()`: This method is where the task's logic is implemented. It processes the input and produces the output.

### Pipeline Construction Rules

* Pipelines start with tasks that have `"source"` in **`input_types`** and end with tasks that have `"sink"` in **`output_types`**.
* All other types are intermediate and dictate how tasks can be chained.
* Two tasks connect when **at least one** output type of the upstream task matches an input type of the downstream task.


In [1]:
from pypekit import Task

class Source(Task):
    input_types = ["source"]
    output_types = ["a"]

    def run(self, _):
        print("Running Source")
        return "source"

class Transform1(Task):
    input_types = ["a"]
    output_types = ["b"]

    def run(self, x):
        print("Running Transform1")
        return x + "_transformed-1"
    
class Transform2(Task):
    input_types = ["a", "b"]
    output_types = ["b"]

    def run(self, x):
        print("Running Transform2")
        return x + "_transformed-2"
    
class Sink1(Task):
    input_types = ["b"]
    output_types = ["sink"]

    def run(self, x):
        print("Running Sink1")
        return x + "_sink-1"
    
class Sink2(Task):
    input_types = ["b"]
    output_types = ["sink"]

    def run(self, x):
        print("Running Sink2")
        return x + "_sink-2"


## Building a Repository

Collect your task classes in a `Repository`, then call `build_tree()` to let `pypekit` construct a tree of tasks based on the defined `input_types` and `output_types`.
The resulting tree represents every source-to-sink pathway implied by your task definitions.


In [2]:
from pypekit import Repository

repository = Repository([
    Source,
    Transform1,
    Transform2,
    Sink1,
    Sink2
])

root = repository.build_tree()

## Inspect Tree

To inspect the tree structure, call `build_tree_string()`, which will return a string representation of the tree.

In [3]:
tree_representation = repository.build_tree_string()
print(tree_representation)

└── Root()
    └── Source()
        ├── Transform1()
        │   ├── Transform2()
        │   │   ├── Sink1()
        │   │   └── Sink2()
        │   ├── Sink1()
        │   └── Sink2()
        └── Transform2()
            ├── Sink1()
            └── Sink2()



## Build Pipelines

To get all viable pipelines from the tree, call `build_pipelines()`.

In [4]:
pipelines = repository.build_pipelines()
for p in pipelines:
    print(p)

Pipeline(tasks=[Source(), Transform1(), Transform2(), Sink1()])
Pipeline(tasks=[Source(), Transform1(), Transform2(), Sink2()])
Pipeline(tasks=[Source(), Transform1(), Sink1()])
Pipeline(tasks=[Source(), Transform1(), Sink2()])
Pipeline(tasks=[Source(), Transform2(), Sink1()])
Pipeline(tasks=[Source(), Transform2(), Sink2()])


## Execute Pipelines with Caching

Running many similar pipelines can be wasteful if they share sub-chains. 
To cache intermediate results, pass a list of pipelines to the `CachedExecutor` and call the `run()` method.

The executor only runs tasks that have not been executed with the same input before, and reuses cached results for the rest.

In [5]:
from pypekit import CachedExecutor

executor = CachedExecutor(pipelines, verbose=True)
results = executor.run()

Running Source
Running Transform1
Running Transform2
Running Sink1
Pipeline 1/6 completed. Runtime: 0.00s.
Running Sink2
Pipeline 2/6 completed. Runtime: 0.00s.
Running Sink1
Pipeline 3/6 completed. Runtime: 0.00s.
Running Sink2
Pipeline 4/6 completed. Runtime: 0.00s.
Running Transform2
Running Sink1
Pipeline 5/6 completed. Runtime: 0.00s.
Running Sink2
Pipeline 6/6 completed. Runtime: 0.00s.


## Inspect Results

After `run()` finishes, `executor.results` is a nested dict whose keys are pipeline IDs.
Each entry records:

* **`output`** – the output of the pipeline,
* **`runtime`** – cumulative seconds spent (for cached tasks, the runtime is also taken from the cache),
* **`tasks`** – the task list that formed the pipeline.

In [6]:
import json

for r in results.values():
    print(json.dumps(r, indent=2))

{
  "output": "source_transformed-1_transformed-2_sink-1",
  "runtime": 7.836699933250202e-05,
  "tasks": [
    "Source()",
    "Transform1()",
    "Transform2()",
    "Sink1()"
  ]
}
{
  "output": "source_transformed-1_transformed-2_sink-2",
  "runtime": 8.094199893093901e-05,
  "tasks": [
    "Source()",
    "Transform1()",
    "Transform2()",
    "Sink2()"
  ]
}
{
  "output": "source_transformed-1_sink-1",
  "runtime": 7.130399899324402e-05,
  "tasks": [
    "Source()",
    "Transform1()",
    "Sink1()"
  ]
}
{
  "output": "source_transformed-1_sink-2",
  "runtime": 7.185499907791382e-05,
  "tasks": [
    "Source()",
    "Transform1()",
    "Sink2()"
  ]
}
{
  "output": "source_transformed-2_sink-1",
  "runtime": 6.648499856964918e-05,
  "tasks": [
    "Source()",
    "Transform2()",
    "Sink1()"
  ]
}
{
  "output": "source_transformed-2_sink-2",
  "runtime": 6.663499880232848e-05,
  "tasks": [
    "Source()",
    "Transform2()",
    "Sink2()"
  ]
}


# Reusing Cache

If you already have a cache from a previous run, you can reuse it by passing the `cache` argument to the `CachedExecutor`.

In [7]:
new_executor = CachedExecutor(pipelines, cache=executor.cache, verbose=True)
new_executor.run();

Pipeline 1/6 completed. Runtime: 0.00s.
Pipeline 2/6 completed. Runtime: 0.00s.
Pipeline 3/6 completed. Runtime: 0.00s.
Pipeline 4/6 completed. Runtime: 0.00s.
Pipeline 5/6 completed. Runtime: 0.00s.
Pipeline 6/6 completed. Runtime: 0.00s.


# Instances, Parameters and Pipelines

A repository can mix and match several flavours of “tasks”:

| What you pass                                | How the repository treats it                          |
| -------------------------------------------- | ----------------------------------------------------- |
| A **class**                                  | Instantiated at every node of the tree.               |
| An **instance** (`Task()`)                   | Re-use the same instance everywhere it’s needed.      |
| A **tuple** (`Task`, `kwargs`)               | Task class with kwargs to use on instantiation.       |
| An existing **Pipeline**                     | Used as an instance of a task.                        |

This flexibility lets you

* reuse heavyweight objects (e.g. a loaded ML model),
* scan hyper-parameters by specifying multiple (class, kwargs) tuples,
* embed pre-fabricated sub-pipelines inside larger graphs,

In [None]:
from pypekit import Pipeline

# New task that takes arguments
class Transform3(Task):
    input_types = ["a"]
    output_types = ["b"]

    def __init__(self, **kwargs):
        self.test = kwargs.get("test", False)

    def run(self, x):
        print("Running Transform3 with test =", self.test)
        return x + "_transformed-3" + ("-test" if self.test else "")

pipeline = Pipeline([
    Transform1(),
    Sink2()
])

repository = Repository([
    Source,
    Transform1,
    (Transform3, {"test": True}),   # Transform3 will be instantiated with the argument test=True every time it is used
    Sink1(),                        # Every node with the task Sink1 will have the same instance
    pipeline                        # Pipeline instance as task
])

repository.build_tree()
print(repository.build_tree_string())

└── Root()
    └── Source()
        ├── Transform1()
        │   └── Sink1()
        ├── Transform3(test=True)
        │   └── Sink1()
        └── Pipeline(tasks=[Transform1(), Sink2()])

