In [1]:
import networkx as nx
import json
import os
import pathlib
import matplotlib.pyplot as plt

# cur_path = pathlib.Path(__file__).parent.resolve()
cur_path = os.path.abspath(os.curdir)


file_name = os.path.join(cur_path, "sample_flow.json")

with open(file_name, 'r') as f:
    data = f.read()


flow_json = json.loads(data)

def load_template(file_path=None):
    if not file_path:
        file_name = os.path.join(cur_path, "sample_flow.json")
    else:
        file_name = file_path
    with open(file_name, 'r') as f:
        data = f.read()

    flow_json = json.loads(data)
    return flow_json



In [2]:
class DAG:
    # this class all related DAG
    def __init__(self) -> None:
        self._json_data = load_template()
        self.dg = nx.DiGraph()

    def create_dag(self,):
        comp_list = self._json_data['components']['1']
        node_list = [(x['id'], x) for x in comp_list]
        # first add node list
        self.dg.add_nodes_from(node_list)

        # add edge order
        for source_id, meta in node_list:
            if len(meta['connections']) > 0:
                cons = meta['connections'].get('0')
                for con in cons:
                    next_id = con['id']
                    self.dg.add_edge(source_id, next_id)
        return self.dg

    
dg = DAG().create_dag()


In [3]:
top_order = list(nx.topological_sort(dg))
print(top_order)

['1688621680859', '1688623653352', '1688623054188']


In [4]:
dg.nodes[top_order[0]]

{'component': 'file_source',
 'state': {'text': '', 'color': ''},
 'x': 94,
 'y': 120,
 'tab': '1688621678511',
 'connections': {'0': [{'index': '0', 'id': '1688623653352'}]},
 'id': '1688621680859',
 'isnew': True,
 'disabledio': {'input': [], 'output': []},
 'name': 'Hive Source',
 'phase': 1,
 'options': {'sourceCSI': '146184',
  'ID': '1688621680859',
  'database': 'data',
  'tableName': 'test'}}

In [19]:
import sys

sys.path.append(r'D:\work\code_related\my_codes\llm_implement')
from tranforms import *
from data_sink import *
from data_source import *

func_mapping = {
    'hive_source':'',
    'select': Select,
    'lit': Lit,
    'file_source': FileSource,
    'file_sink': FileSink
}

In [20]:
s = func_mapping.get(dg.nodes[top_order[0]].get('component'))
s().process()

DataFrame[value: string]

In [22]:
def process_pipeline(top_order):
    data_mapping = {}

    for i, id in enumerate(top_order):
        node_params =  dg.nodes[id]
        component = node_params.get('component')
        options = node_params.get('options')
        func_class = func_mapping.get(component)(**options)
        if isinstance(func_class, DataSource):
            df = func_class.process()
        else:
            pre_df = data_mapping.get(top_order[i-1])
            df = func_class.process(pre_df)
        data_mapping[id] = df
    return data_mapping

process_pipeline(top_order=top_order)
    

{'1688621680859': DataFrame[value: string],
 '1688623653352': DataFrame[value: string],
 '1688623054188': None}