In [1]:
from pypekit import Task, Repository, CachedExecutor, Pipeline

class DataLoader(Task):
    input_types = ["source"]
    output_types = ["raw"]

    def run(self, _):
        print(f"Running DataLoader")
        return "output"

class Processor(Task):
    input_types = ["raw", "processed"]
    output_types = ["processed"]
        
    def run(self, _):
        print(f"Running Processor")
        return "output"

class Classifier(Task):
    input_types = ["raw", "processed"]
    output_types = ["sink", "processed"]
        
    def run(self, _):
        print(f"Running Classifier")
        return "output"
    
repository = Repository([
    ("data_loader", DataLoader()),
    ("processor_1", Processor()),
    ("processor_2", Processor()),
    ("classifier_1", Classifier()),
    ("classifier_2", Classifier()),
])
pipeline_dict = repository.build_pipelines()
for pipeline in pipeline_dict.values():
    print(pipeline)

executor = CachedExecutor(pipeline_dict, verbose=True)
executor.run()

executor.results

Pipeline(id=2674897949024, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1'])
Pipeline(id=2674897292816, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1', 'classifier_2'])
Pipeline(id=2674897293136, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_2'])
Pipeline(id=2674897309936, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_2', 'classifier_1'])
Pipeline(id=2674897310240, tasks=['data_loader', 'processor_1', 'classifier_1'])
Pipeline(id=2674897899248, tasks=['data_loader', 'processor_1', 'classifier_1', 'processor_2', 'classifier_2'])
Pipeline(id=2674895868240, tasks=['data_loader', 'processor_1', 'classifier_1', 'classifier_2'])
Pipeline(id=2674897423632, tasks=['data_loader', 'processor_1', 'classifier_2'])
Pipeline(id=2674896764496, tasks=['data_loader', 'processor_1', 'classifier_2', 'processor_2', 'classifier_1'])
Pipeline(id=2674897929296, tasks=['data_loader', 'processor_1', 'classifier_2', 'classifier_1'])
Pipe

{'2674897949024': {'pipeline_id': '2674897949024',
  'output': 'output',
  'tasks': ['data_loader', 'processor_1', 'processor_2', 'classifier_1']},
 '2674897292816': {'pipeline_id': '2674897292816',
  'output': 'output',
  'tasks': ['data_loader',
   'processor_1',
   'processor_2',
   'classifier_1',
   'classifier_2']},
 '2674897293136': {'pipeline_id': '2674897293136',
  'output': 'output',
  'tasks': ['data_loader', 'processor_1', 'processor_2', 'classifier_2']},
 '2674897309936': {'pipeline_id': '2674897309936',
  'output': 'output',
  'tasks': ['data_loader',
   'processor_1',
   'processor_2',
   'classifier_2',
   'classifier_1']},
 '2674897310240': {'pipeline_id': '2674897310240',
  'output': 'output',
  'tasks': ['data_loader', 'processor_1', 'classifier_1']},
 '2674897899248': {'pipeline_id': '2674897899248',
  'output': 'output',
  'tasks': ['data_loader',
   'processor_1',
   'classifier_1',
   'processor_2',
   'classifier_2']},
 '2674895868240': {'pipeline_id': '26748958

In [2]:
n = 4

repository = Repository(
    [(f"data_loader_{i}", DataLoader()) for i in range(n)] +
    [(f"processor_{i}", Processor()) for i in range(n)] +
    [(f"classifier_{i}", Classifier()) for i in range(n)] 
)

from time import time
start = time()
pipelines = repository.build_pipelines()
print(f"Built {len(pipelines)} pipelines in {time() - start:.2f} seconds")

Built 219200 pipelines in 4.09 seconds


In [3]:
pipeline = Pipeline([
    ("data_loader", DataLoader()),
    ("processor_1", Processor()),
    ("processor_2", Processor()),
    ("classifier_1", Classifier()),
    ("classifier_2", Classifier()),
])
pipeline

Pipeline(id=2675002013456, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1', 'classifier_2'])

In [4]:
print(repository)
print(executor)

Repository(tasks=['data_loader_0', 'data_loader_1', 'data_loader_2', 'data_loader_3', 'processor_0', 'processor_1', 'processor_2', 'processor_3', 'classifier_0', 'classifier_1', 'classifier_2', 'classifier_3'], pipelines=219200)
CachedExecutor(pipelines=32)


In [9]:
repository = Repository([
    ("data_loader", DataLoader()),
    ("processor_1", Processor()),
    ("pipeline", pipeline),
    ("classifier_1", Classifier()),
    ("classifier_2", Classifier()),
])
pipelines = repository.build_pipelines()
for pipeline in pipeline_dict.values():
    print(pipeline)


Pipeline(id=2674897949024, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1'])
Pipeline(id=2674897292816, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_1', 'classifier_2'])
Pipeline(id=2674897293136, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_2'])
Pipeline(id=2674897309936, tasks=['data_loader', 'processor_1', 'processor_2', 'classifier_2', 'classifier_1'])
Pipeline(id=2674897310240, tasks=['data_loader', 'processor_1', 'classifier_1'])
Pipeline(id=2674897899248, tasks=['data_loader', 'processor_1', 'classifier_1', 'processor_2', 'classifier_2'])
Pipeline(id=2674895868240, tasks=['data_loader', 'processor_1', 'classifier_1', 'classifier_2'])
Pipeline(id=2674897423632, tasks=['data_loader', 'processor_1', 'classifier_2'])
Pipeline(id=2674896764496, tasks=['data_loader', 'processor_1', 'classifier_2', 'processor_2', 'classifier_1'])
Pipeline(id=2674897929296, tasks=['data_loader', 'processor_1', 'classifier_2', 'classifier_1'])
Pipe