# Basic Example

In [7]:
from pypekit import Task, Repository, CachedExecutor, Pipeline

class DataLoader(Task):
    input_types = ["source"]
    output_types = ["raw"]

    def run(self, _):
        print(f"Running DataLoader")
        return "output"

class Processor(Task):
    input_types = ["raw", "processed"]
    output_types = ["processed"]

    def run(self, _):
        print(f"Running Processor")
        return "output"

class Classifier(Task):
    input_types = ["processed"]
    output_types = ["sink"]

    def run(self, _):
        print(f"Running Classifier")
        return "output"

repository = Repository([
    ("data_loader", DataLoader()),
    ("processor_1", Processor()),
    ("processor_2", Processor()),
    ("classifier_1", Classifier()),
    ("classifier_2", Classifier()),
])
pipeline_dict = repository.build_pipelines()
for p in pipeline_dict.values():
    print(p)

Pipeline(id=b59c5d9098aa49439d6407c031e22b66, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1'])
Pipeline(id=111065e9b0f842039de94f8a0a0139e0, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_2'])
Pipeline(id=ad1015b4dbd14503acb9b445828a7c7f, tasks=['data_loader', 'processor_1', 'classifier_1'])
Pipeline(id=ccd1883f11df49f59172cdfa292bed5b, tasks=['data_loader', 'processor_1', 'classifier_2'])
Pipeline(id=372722bf8bbf47329c2ecc8653360d13, tasks=['data_loader', 'processor_2', 'processor_1', 'classifier_1'])
Pipeline(id=cde99c5c84c44b588ad9efd0d65cafbe, tasks=['data_loader', 'processor_2', 'processor_1', 'classifier_2'])
Pipeline(id=ab981b9f25054cb4b3705efc10812c65, tasks=['data_loader', 'processor_2', 'classifier_1'])
Pipeline(id=fc6f9ae930114ba2aaf425393a81ddc1, tasks=['data_loader', 'processor_2', 'classifier_2'])


In [8]:
executor = CachedExecutor(pipeline_dict, verbose=True)
results = executor.run()

Running DataLoader
Running Processor
Running Processor
Running Classifier
Ran pipeline b59c5d9098aa49439d6407c031e22b66. Runtime: 0.00s. 1/8 pipelines completed.
Running Classifier
Ran pipeline 111065e9b0f842039de94f8a0a0139e0. Runtime: 0.00s. 2/8 pipelines completed.
Running Classifier
Ran pipeline ad1015b4dbd14503acb9b445828a7c7f. Runtime: 0.00s. 3/8 pipelines completed.
Running Classifier
Ran pipeline ccd1883f11df49f59172cdfa292bed5b. Runtime: 0.00s. 4/8 pipelines completed.
Running Processor
Running Processor
Running Classifier
Ran pipeline 372722bf8bbf47329c2ecc8653360d13. Runtime: 0.00s. 5/8 pipelines completed.
Running Classifier
Ran pipeline cde99c5c84c44b588ad9efd0d65cafbe. Runtime: 0.00s. 6/8 pipelines completed.
Running Classifier
Ran pipeline ab981b9f25054cb4b3705efc10812c65. Runtime: 0.00s. 7/8 pipelines completed.
Running Classifier
Ran pipeline fc6f9ae930114ba2aaf425393a81ddc1. Runtime: 0.00s. 8/8 pipelines completed.


In [9]:
for r in results.values():
    print(r)

{'pipeline_id': 'b59c5d9098aa49439d6407c031e22b66', 'output': 'output', 'runtime': 9.860000000000424e-05, 'tasks': ['data_loader', 'processor_1', 'processor_2', 'classifier_1']}
{'pipeline_id': '111065e9b0f842039de94f8a0a0139e0', 'output': 'output', 'runtime': 9.839999999999849e-05, 'tasks': ['data_loader', 'processor_1', 'processor_2', 'classifier_2']}
{'pipeline_id': 'ad1015b4dbd14503acb9b445828a7c7f', 'output': 'output', 'runtime': 9.159999999996948e-05, 'tasks': ['data_loader', 'processor_1', 'classifier_1']}
{'pipeline_id': 'ccd1883f11df49f59172cdfa292bed5b', 'output': 'output', 'runtime': 9.209999999998386e-05, 'tasks': ['data_loader', 'processor_1', 'classifier_2']}
{'pipeline_id': '372722bf8bbf47329c2ecc8653360d13', 'output': 'output', 'runtime': 9.26999999999456e-05, 'tasks': ['data_loader', 'processor_2', 'processor_1', 'classifier_1']}
{'pipeline_id': 'cde99c5c84c44b588ad9efd0d65cafbe', 'output': 'output', 'runtime': 9.329999999996286e-05, 'tasks': ['data_loader', 'processor

# Reusing Cache

In [10]:
new_executor = CachedExecutor(pipeline_dict, cache=executor.cache, verbose=True)
new_executor.run();

Ran pipeline b59c5d9098aa49439d6407c031e22b66. Runtime: 0.00s. 1/8 pipelines completed.
Ran pipeline 111065e9b0f842039de94f8a0a0139e0. Runtime: 0.00s. 2/8 pipelines completed.
Ran pipeline ad1015b4dbd14503acb9b445828a7c7f. Runtime: 0.00s. 3/8 pipelines completed.
Ran pipeline ccd1883f11df49f59172cdfa292bed5b. Runtime: 0.00s. 4/8 pipelines completed.
Ran pipeline 372722bf8bbf47329c2ecc8653360d13. Runtime: 0.00s. 5/8 pipelines completed.
Ran pipeline cde99c5c84c44b588ad9efd0d65cafbe. Runtime: 0.00s. 6/8 pipelines completed.
Ran pipeline ab981b9f25054cb4b3705efc10812c65. Runtime: 0.00s. 7/8 pipelines completed.
Ran pipeline fc6f9ae930114ba2aaf425393a81ddc1. Runtime: 0.00s. 8/8 pipelines completed.


# Custom Pipelines

In [11]:
pipeline = Pipeline([
    ("processor_1", Processor()),
    ("processor_2", Processor()),
])
pipeline

Pipeline(id=289fabf2824444d8be358f2e1d5762bf, tasks=['processor_1', 'processor_2'])

# Pipelines as Tasks

In [12]:
repository = Repository([
    ("data_loader", DataLoader()),
    ("processor_1", Processor()),
    ("pipeline", pipeline),
    ("classifier_1", Classifier()),
    ("classifier_2", Classifier()),
])
pipeline_dict = repository.build_pipelines()
for pipeline in pipeline_dict.values():
    print(pipeline)


Pipeline(id=e97c3a761a8f46c18ea06dfeb08219db, tasks=['data_loader', 'processor_1', 'pipeline', 'classifier_1'])
Pipeline(id=616045114efc43dfa2c6549382e1012b, tasks=['data_loader', 'processor_1', 'pipeline', 'classifier_2'])
Pipeline(id=053a6c19b4c04a9da2fb98f4eba35b77, tasks=['data_loader', 'processor_1', 'classifier_1'])
Pipeline(id=5688d17fddde4496ba3f0131368f25dc, tasks=['data_loader', 'processor_1', 'classifier_2'])
Pipeline(id=f801f9c39bfd4a8eac4c4ce452f2c9bf, tasks=['data_loader', 'pipeline', 'processor_1', 'classifier_1'])
Pipeline(id=bd6622e7acb448fc8959b339fb64b93f, tasks=['data_loader', 'pipeline', 'processor_1', 'classifier_2'])
Pipeline(id=e002c0e61a2d485daa82f3958f0e6253, tasks=['data_loader', 'pipeline', 'classifier_1'])
Pipeline(id=c4a4fafca6de48db8f02916e45cf8518, tasks=['data_loader', 'pipeline', 'classifier_2'])
