In [83]:
from collections import defaultdict
from typing import Iterable, Callable

def _is_iterable(arg):
    if not isinstance(arg, Iterable) or isinstance(arg, str):
        return (arg,)
    return arg

class PipelineItem:        
    def __init__(self, inputs, outputs, fun):
        self.inputs = _is_iterable(inputs)
        self.outputs = _is_iterable(outputs)
        self.fun = fun
        self.priority = 0
        # self.id = id(fun)
        self.id = fun.__name__
        
    def __repr__(self):
        return f'{self.priority}: {self.inputs} -- {self.fun} --> {self.outputs}'

In [121]:
from collections import defaultdict

class Pipeline:
    def __init__(self):
        self.items = {}
        self._function_per_outputs = defaultdict(list)
        self._function_per_inputs = defaultdict(list)
    
    def __repr__(self):
        return '\n'.join([item.__repr__() for item in self.items.values()])
    
    def _predecessors(self, item_id):
        # return item connected to the inputs (means input of item found in outputs)
        item = self.items[item_id]
        predecessors = []
        for input in item.inputs:
            predecessors.extend(self._function_per_outputs[input])
        
        return predecessors
    
    def _successors(self, item_id):
        # return item connected to the outputs (means output of item found in the inputs)
        item = self.items[item_id]
        successors = []
        for output in item.outputs:
            successors.extend(self._function_per_inputs[output])
            
        return successors

    def _adjust_ranking(self, item_id, priority, mode='forward'):
        self.items[item_id].priority = priority
        func = self._successors if mode == 'forward' else self._predecessors
        offset = 1 if mode == 'forward' else -1

        for neighbor_id in func(item_id):
            self._adjust_ranking(neighbor_id, priority + offset, mode=mode)

    def _sort(self):      
        for item_id in self.items.keys():
            current_priority = self.items[item_id].priority
            self._adjust_ranking(item_id, current_priority, mode='forward')
            self._adjust_ranking(item_id, current_priority, mode='backward')

        rank = list(self.items.items())
        rank.sort(key=lambda x: x[1].priority)
        self.items = dict(rank)
        
        
    def _register_internals(self, item):
        item_id = item.id
        self.items[item_id] = item

        for input in item.inputs:
            self._function_per_inputs[input].append(item_id)
            
        for output in item.outputs:
            self._function_per_outputs[output].append(item_id)
        
    def register(self, inputs: Iterable, outputs: Iterable):
        def inner(fun):
            item = PipelineItem(inputs=inputs, outputs=outputs, fun=fun)
            print(f'Registering {item}')
            self._register_internals(item)
            self._sort()
            print()
            
            return fun

        return inner
    
    def execute(self, start={'A'}):
        values_so_far = start
        print(f'Start with {values_so_far}')
        for item_id, item in self.items.items():
            missing_values = [input for input in item.inputs if input not in values_so_far]
            
            print(item)
            if not missing_values:
                values_so_far = values_so_far.union(item.outputs)
            else:
                print(f'{missing_values} are missing')
                raise 

In [122]:
pipeline = Pipeline()

@pipeline.register(inputs='D', outputs='G')
def aze():
    pass

@pipeline.register(inputs='A', outputs='B')
def foo():
    pass

@pipeline.register(inputs=('B', 'C'), outputs='E')
def clk():
    pass

@pipeline.register(inputs='B', outputs='D')
def fun():
    pass


@pipeline.register(inputs=('A',), outputs='C')
def bar():
    pass

Registering 0: ('D',) -- <function aze at 0x0000018971633250> --> ('G',)

Registering 0: ('A',) -- <function foo at 0x0000018971772B00> --> ('B',)

Registering 0: ('B', 'C') -- <function clk at 0x0000018971772C20> --> ('E',)

Registering 0: ('B',) -- <function fun at 0x0000018971772B90> --> ('D',)

Registering 0: ('A',) -- <function bar at 0x0000018971772CB0> --> ('C',)



In [123]:
pipeline

-2: ('A',) -- <function foo at 0x0000018971772B00> --> ('B',)
-2: ('A',) -- <function bar at 0x0000018971772CB0> --> ('C',)
-1: ('B', 'C') -- <function clk at 0x0000018971772C20> --> ('E',)
-1: ('B',) -- <function fun at 0x0000018971772B90> --> ('D',)
0: ('D',) -- <function aze at 0x0000018971633250> --> ('G',)

In [124]:
pipeline.execute()

Start with {'A'}
-2: ('A',) -- <function foo at 0x0000018971772B00> --> ('B',)
-2: ('A',) -- <function bar at 0x0000018971772CB0> --> ('C',)
-1: ('B', 'C') -- <function clk at 0x0000018971772C20> --> ('E',)
-1: ('B',) -- <function fun at 0x0000018971772B90> --> ('D',)
0: ('D',) -- <function aze at 0x0000018971633250> --> ('G',)


In [85]:
pipeline = Pipeline()

@pipeline.register(inputs='A', outputs='B')
def foo():
    pass

@pipeline.register(inputs=('A',), outputs='C')
def bar():
    pass

@pipeline.register(inputs=('B', 'C'), outputs='E')
def clk():
    pass

@pipeline.register(inputs='B', outputs='D')
def fun():
    pass

@pipeline.register(inputs='D', outputs='G')
def aze():
    pass

Registering 0: ('A',) -- <function foo at 0x00000189715CC4C0> --> ('B',)
foo False
Registering 0: ('A',) -- <function bar at 0x00000189715CE710> --> ('C',)
foo False
bar False
Registering 0: ('B', 'C') -- <function clk at 0x00000189715CE680> --> ('E',)
foo False
bar False
clk True
Registering 0: ('B',) -- <function fun at 0x00000189715CF250> --> ('D',)
foo False
bar False
clk True
fun True
Registering 0: ('D',) -- <function aze at 0x00000189715CF2E0> --> ('G',)
foo False
bar False
clk True
fun True
aze True


In [86]:
pipeline

0: ('A',) -- <function foo at 0x00000189715CC4C0> --> ('B',)
0: ('A',) -- <function bar at 0x00000189715CE710> --> ('C',)
1: ('B', 'C') -- <function clk at 0x00000189715CE680> --> ('E',)
1: ('B',) -- <function fun at 0x00000189715CF250> --> ('D',)
2: ('D',) -- <function aze at 0x00000189715CF2E0> --> ('G',)

In [74]:
pipeline.execute()

Start with {'A'}
0, ('A',) -- <function foo at 0x00000189714F9FC0> --> ('B',)
0, ('A',) -- <function bar at 0x00000189715CC670> --> ('C',)
1, ('B', 'C') -- <function clk at 0x00000189714FADD0> --> ('E',)
1, ('B',) -- <function fun at 0x00000189715CC430> --> ('D',)
2, ('D',) -- <function aze at 0x00000189715CC040> --> ('G',)


# test

In [136]:
import random


pipeline = Pipeline()

def foo():
    pass

def bar():
    pass

def clk():
    pass

def fun():
    pass

def aze():
    pass

register_collection = [(pipeline.register(inputs='A', outputs='B'), foo),
                       (pipeline.register(inputs=('A',), outputs='C'), bar),
                       (pipeline.register(inputs=('B', 'C'), outputs='E'), clk),
                       (pipeline.register(inputs='B', outputs='D'), fun),
                       (pipeline.register(inputs='D', outputs='G'), aze)]

random.shuffle(register_collection)
for item in register_collection:
    item[0](item[1])

pipeline.execute()

Registering 0: ('A',) -- <function foo at 0x0000018971632A70> --> ('B',)

Registering 0: ('A',) -- <function bar at 0x0000018971773400> --> ('C',)

Registering 0: ('B',) -- <function fun at 0x0000018971772D40> --> ('D',)

Registering 0: ('D',) -- <function aze at 0x00000189717732E0> --> ('G',)

Registering 0: ('B', 'C') -- <function clk at 0x0000018971770940> --> ('E',)

Start with {'A'}
0: ('A',) -- <function foo at 0x0000018971632A70> --> ('B',)
0: ('A',) -- <function bar at 0x0000018971773400> --> ('C',)
1: ('B',) -- <function fun at 0x0000018971772D40> --> ('D',)
1: ('B', 'C') -- <function clk at 0x0000018971770940> --> ('E',)
2: ('D',) -- <function aze at 0x00000189717732E0> --> ('G',)


In [62]:
pipeline

-1, ('A',) -- <function foo at 0x00000189714311B0> --> ('B',)
0, ('B', 'C') -- <function clk at 0x00000189714FA560> --> ('E',)
0, ('A',) -- <function bar at 0x00000189714F92D0> --> ('C',)
0, ('B',) -- <function fun at 0x00000189714FB010> --> ('D',)
1, ('D',) -- <function aze at 0x00000189714FB0A0> --> ('G',)

In [63]:
def adjust_neighbors(item_id, current_priority=0, mode='forward'):
    pipeline.items[item_id].priority = current_priority
    func = pipeline._successors if mode == 'forward' else pipeline._predecessors
    offset = 1 if mode == 'forward' else -1
        
    walked[item_id] = True
    print(item_id)
    for neighbor_id in func(item_id):
        if not walked[neighbor_id]:
            adjust_neighbors(neighbor_id, current_priority + offset, mode=mode)

        
walked = dict(zip(pipeline.items.keys(), [False]*len(pipeline.items)))        
for item_id, walk in walked.items():
    if not walk:
        current_priority = pipeline.items[item_id].priority
        adjust_neighbors(item_id, current_priority, mode='forward')
        adjust_neighbors(item_id, current_priority, mode='backward')

1689822368176
1689823192416
1689823195152
1689823195296
1689822368176
1689823187664
1689823187664


In [64]:
pipeline

-1, ('A',) -- <function foo at 0x00000189714311B0> --> ('B',)
0, ('B', 'C') -- <function clk at 0x00000189714FA560> --> ('E',)
0, ('A',) -- <function bar at 0x00000189714F92D0> --> ('C',)
0, ('B',) -- <function fun at 0x00000189714FB010> --> ('D',)
1, ('D',) -- <function aze at 0x00000189714FB0A0> --> ('G',)

In [30]:
rank = list(pipeline.items.items())
rank.sort(key=lambda x: x[1].priority)

In [32]:
dict(rank)

{1689787446560: 0, ('A',) -- <function bar at 0x000001896F2E3520> --> ('C',),
 1689787437776: 0, ('A',) -- <function foo at 0x000001896F2E12D0> --> ('B',),
 1689787445264: 1, ('B',) -- <function fun at 0x000001896F2E3010> --> ('D',),
 1689787441664: 1, ('B', 'C') -- <function clk at 0x000001896F2E2200> --> ('E',),
 1689787448000: 2, ('D',) -- <function aze at 0x000001896F2E3AC0> --> ('G',)}

In [None]:
ranks = dict(zip(pipeline.items.keys(), [0]*len(pipeline.items)))
for key, rank in ranks.items():
    current_priority = pipeline.items[key].priority
    for predecessor in pipeline._predecessors(key):
        pipeline.items[predecessor].priority = current_priority - 1
    for successor in pipeline._successors(key):
        pipeline.items[successor].priority = current_priority + 1