# Basic Example

In [1]:
from pypekit import Task, Repository, CachedExecutor, Pipeline

class DataLoader(Task):
    input_types = {"source"}
    output_types = {"raw"}

    def run(self, _):
        print(f"Running DataLoader")
        return "output"

class Processor1(Task):
    input_types = {"raw", "processed"}
    output_types = {"processed"}

    def run(self, _):
        print(f"Running Processor")
        return "output"
    
class Processor2(Task):
    input_types = {"raw", "processed"}
    output_types = {"processed"}

    def run(self, _):
        print(f"Running Processor")
        return "output"

class Classifier1(Task):
    input_types = {"raw", "processed"}
    output_types = {"sink"}

    def run(self, _):
        print(f"Running Classifier")
        return "output"
    
class Classifier2(Task):
    input_types = {"raw", "processed"}
    output_types = {"sink"}

    def run(self, _):
        print(f"Running Classifier")
        return "output"

repository = Repository({
    DataLoader,
    Processor1,
    Processor2,
    Classifier1,
    Classifier2
})

repository.build_tree()
print(repository.build_tree_string())

└── Root
    └── DataLoader
        ├── Classifier2
        ├── Processor2
        │   ├── Classifier2
        │   ├── Processor1
        │   │   ├── Classifier2
        │   │   └── Classifier1
        │   └── Classifier1
        ├── Processor1
        │   ├── Classifier2
        │   ├── Processor2
        │   │   ├── Classifier2
        │   │   └── Classifier1
        │   └── Classifier1
        └── Classifier1



In [2]:
pipelines = repository.build_pipelines()
pipelines

[Pipeline(tasks=['DataLoader', 'Classifier2']),
 Pipeline(tasks=['DataLoader', 'Processor2', 'Classifier2']),
 Pipeline(tasks=['DataLoader', 'Processor2', 'Processor1', 'Classifier2']),
 Pipeline(tasks=['DataLoader', 'Processor2', 'Processor1', 'Classifier1']),
 Pipeline(tasks=['DataLoader', 'Processor2', 'Classifier1']),
 Pipeline(tasks=['DataLoader', 'Processor1', 'Classifier2']),
 Pipeline(tasks=['DataLoader', 'Processor1', 'Processor2', 'Classifier2']),
 Pipeline(tasks=['DataLoader', 'Processor1', 'Processor2', 'Classifier1']),
 Pipeline(tasks=['DataLoader', 'Processor1', 'Classifier1']),
 Pipeline(tasks=['DataLoader', 'Classifier1'])]

In [3]:
executor = CachedExecutor(pipelines, verbose=True)
results = executor.run()

Running DataLoader
Running Classifier
Pipeline 1/10 completed. Runtime: 0.00s.
Running Processor
Running Classifier
Pipeline 2/10 completed. Runtime: 0.00s.
Running Processor
Running Classifier
Pipeline 3/10 completed. Runtime: 0.00s.
Running Classifier
Pipeline 4/10 completed. Runtime: 0.00s.
Running Classifier
Pipeline 5/10 completed. Runtime: 0.00s.
Running Processor
Running Classifier
Pipeline 6/10 completed. Runtime: 0.00s.
Running Processor
Running Classifier
Pipeline 7/10 completed. Runtime: 0.00s.
Running Classifier
Pipeline 8/10 completed. Runtime: 0.00s.
Running Classifier
Pipeline 9/10 completed. Runtime: 0.00s.
Running Classifier
Pipeline 10/10 completed. Runtime: 0.00s.


In [4]:
for r in results:
    print(r)

{'output': 'output', 'runtime': 7.217799975478556e-05, 'tasks': ['DataLoader', 'Classifier2']}
{'output': 'output', 'runtime': 7.419000030495226e-05, 'tasks': ['DataLoader', 'Processor2', 'Classifier2']}
{'output': 'output', 'runtime': 8.929999967222102e-05, 'tasks': ['DataLoader', 'Processor2', 'Processor1', 'Classifier2']}
{'output': 'output', 'runtime': 8.872800026438199e-05, 'tasks': ['DataLoader', 'Processor2', 'Processor1', 'Classifier1']}
{'output': 'output', 'runtime': 7.484199886675924e-05, 'tasks': ['DataLoader', 'Processor2', 'Classifier1']}
{'output': 'output', 'runtime': 7.367999933194369e-05, 'tasks': ['DataLoader', 'Processor1', 'Classifier2']}
{'output': 'output', 'runtime': 7.861099948058836e-05, 'tasks': ['DataLoader', 'Processor1', 'Processor2', 'Classifier2']}
{'output': 'output', 'runtime': 7.832999835954979e-05, 'tasks': ['DataLoader', 'Processor1', 'Processor2', 'Classifier1']}
{'output': 'output', 'runtime': 7.440199806296732e-05, 'tasks': ['DataLoader', 'Proces

# Reusing Cache

In [5]:
new_executor = CachedExecutor(pipelines, cache=executor.cache, verbose=True)
new_executor.run();

Pipeline 1/10 completed. Runtime: 0.00s.
Pipeline 2/10 completed. Runtime: 0.00s.
Pipeline 3/10 completed. Runtime: 0.00s.
Pipeline 4/10 completed. Runtime: 0.00s.
Pipeline 5/10 completed. Runtime: 0.00s.
Pipeline 6/10 completed. Runtime: 0.00s.
Pipeline 7/10 completed. Runtime: 0.00s.
Pipeline 8/10 completed. Runtime: 0.00s.
Pipeline 9/10 completed. Runtime: 0.00s.
Pipeline 10/10 completed. Runtime: 0.00s.


# Custom Pipelines

In [6]:
pipeline = Pipeline([
    DataLoader(),
    Processor2(),
    Classifier1(),
])
pipeline

Pipeline(tasks=['DataLoader', 'Processor2', 'Classifier1'])