In [3]:
from functools import wraps, cached_property
from dataclasses import dataclass
import networkx as nx
from typing import Iterable, Callable
from itertools import product

In [4]:
class Pipeline():
    def __init__(self,):
        self.graph = nx.DiGraph()
        self.line_graph = None
        self.node_order = None
        self.funcs = {}
        self.executed = {}
        
    def register(self, inputs: str | Iterable[str], outputs: str | Iterable[str]):
        inputs = list(inputs)
        outputs = list(outputs)
        def inner(func):
            for edge in product(inputs, outputs):
                func_id = id(func)
                self.graph.add_edge(*edge, func=func_id)
                self.funcs[func_id] = func
                self.executed[func_id] = False

        return inner
    
    def _sort(self):
        H = nx.line_graph(self.graph)
        H.add_nodes_from((node, self.graph.edges[node]) for node in H)
        self.line_graph = H
        self.node_order = nx.topological_sort(H)
        
    def _clear(self):
        for func_id in self:
            self.executed[func_id] = False
        
    def execute(self):
        if self.line_graph is None:
            self._sort()
            
        for func_id in self:
            print(func_id)
            if not self.executed[func_id]:
                self.executed[func_id] = True
                print(self.funcs[func_id]) 
        
        self._clear()
                
    def __iter__(self):
        if self.line_graph is None:
            self._sort()
        for node in self.node_order:
             yield self.line_graph.nodes[node]['func']

In [5]:
pipeline = Pipeline()

In [6]:
@pipeline.register(inputs='D', outputs='G')
def aze():
    pass

@pipeline.register(inputs='A', outputs='B')
def foo():
    pass

@pipeline.register(inputs=('B', 'C'), outputs='E')
def clk():
    pass

@pipeline.register(inputs='B', outputs='D')
def fun():
    pass


@pipeline.register(inputs=('A',), outputs='C')
def bar():
    pass

In [7]:
pipeline.execute()

140149517280464
<function foo at 0x7f771a3164d0>
140149517280896
<function bar at 0x7f771a316680>
140149517280608
<function clk at 0x7f771a316560>
140149517280752
<function fun at 0x7f771a3165f0>
140149517280608
140149517280320
<function aze at 0x7f771a316440>


In [8]:
G = nx.path_graph(4)
G.add_edges_from((u, v, {"tot": u+v}) for u, v in G.edges)
G.edges(data=True)

EdgeDataView([(0, 1, {'tot': 1}), (1, 2, {'tot': 3}), (2, 3, {'tot': 5})])

In [9]:
H = nx.line_graph(pipeline.graph)
H.add_nodes_from((node, pipeline.graph.edges[node]) for node in H)
H.nodes(data=True)

NodeDataView({('D', 'G'): {'func': 140149517280320}, ('A', 'B'): {'func': 140149517280464}, ('B', 'E'): {'func': 140149517280608}, ('B', 'D'): {'func': 140149517280752}, ('A', 'C'): {'func': 140149517280896}, ('C', 'E'): {'func': 140149517280608}})

In [10]:
list(nx.topological_sort(nx.line_graph(pipeline.graph)))

[('A', 'B'), ('A', 'C'), ('B', 'E'), ('B', 'D'), ('C', 'E'), ('D', 'G')]

In [11]:
list(nx.topological_sort(H))

[('A', 'B'), ('A', 'C'), ('B', 'E'), ('B', 'D'), ('C', 'E'), ('D', 'G')]

In [12]:
H.nodes(data=True)[('A', 'B')]

{'func': 140149517280464}