# Podium Dataflow Dependency Tree

## Introduction

This Jupyter Notebook demonstrates how it is possible to generate QDC dataflow / source / entity dependency tree from the QDC metadata database (in this case PostgreSQL) using Python.

The starting point for the example output (below) is the name of a dataflow "prod_stg_compsych_member_t", this starting point should be the dataflow that you are intrested in determining it's predecessor dataflows.

The process will look at the LOADERs in the starting point dataflow and look back up the dependency tree to determine the source of each of the LOADERs. 

These sources can be a QDC Source / Entity or the output from a prior dataflow. 

Where the source is the output from a prior dataflow, the process recurses on itself until:

* There are only QDC Source / Entities remaining or
* The source dataflow is in a stop list of dataflow names provided to the process.

The result of the process is a [NetworkX](https://networkx.github.io) Graph where the nodes are dataflows, sources and entities, the edges reflect the relationships.

The final Graph is converted to a [DOT](https://en.wikipedia.org/wiki/DOT_(graph_description_language)) file and rendered to the png shown here using the dot.exe command installed with [Graphviz](https://www.graphviz.org/).

```bash
dot.exe -Tpng -o prod_stg_compsych_member_t.png prod_stg_compsych_member_t.dot
```

## Sample output

![dataflow](./prod_stg_compsych_member_t.png)

## Python Notes

Access to the PostgreSQL database is through Python SQLAlchemy, the coding style chosen here for SQLAlchemy was chosen for clarity and simplicity. 

I am sure that there are many improvements and optimizations that could be made.

The Python version used here was 3.6.5 installed from the Anaconda distribution which means that all the packages imported were pre installed.

In [1]:
from sqlalchemy import create_engine
from sqlalchemy import MetaData , Table, Column, Integer, Numeric, String, DateTime, ForeignKey, Text
from sqlalchemy import select, desc
from sqlalchemy import and_, or_, not_
from sqlalchemy import text
from sqlalchemy.sql import func

In [2]:
import networkx as nx
import json
import datetime as dt
import os
import yaml
import sys

In [3]:
import pydot
from networkx.drawing.nx_pydot import write_dot

## QDC Database Connection and Styles

The QDC repository database connection details are specified in a yaml format and parsed by the `get_podium_cfg()` function below.

The input to the function is a stream, this can be a string or an open file handle.

This version of the config file contains style attributes that are used in the resulting `dot.exe` generated output file.

The expected yaml stream should be:

```yaml
---
pd_connect:
    dr:
        user: 'user'
        pwd: 'pwd'
        db: 'podium_md'
        host: 'host.host.com'
        port: 5436
    dev:
        pd_connect:
        user: 'user'
        pwd: 'pwd'
        db: 'podium_md'
        host: 'host.host'
        port: 5436

styles:
    meta:
        fontname: 'arial'
        fontsize: 10
    meta_edge:
        fontname: 'arial'
        fontsize: 10
    node:
        fontname: 'arial'
        fontsize: 12
        bgcolor: '"#ECF8B9"'
    source:
        shape: 'house'
        style: 'filled'
        fillcolor: '"#D4E4F7"'
    entity_source:
        shape: 'ellipse'
        style: 'filled'
        fillcolor: '"#3A84D9"'
    entity_target:
        shape: 'ellipse'
        style: 'filled'
        fillcolor: '"#DAC344"'
    bundle:
        shape: 'box'
        style: '"filled,rounded"'
        fillcolor: '"#E9446A"'
    edge:
        loader:
            color: '"#66B032"'
        store:
            color: '"#347B98"'
        source:
            color: '"#092834"'

```

## Helper Functions

Most of these functions are helper functions in accessing the QDC metadata database, the function where the actual work is done is the `get_bundle()` function.

In [25]:
def get_podium_cfg(yaml_stream):
    """Loads the Podium connection parameters from the input stream"""
    
    try:
        pd_cfg = yaml.load(yaml_stream)
    except:
        raise "Unexpected error reading yaml stream"
        
    return pd_cfg

In [26]:
def connect(user, password, db, host, port):
    '''Returns a connection and a metadata object'''
    
    url = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}'

    # The return value of create_engine() is our connection object
    con = create_engine(url, client_encoding='utf8')

    # We then bind the connection to MetaData()
    meta = MetaData(bind=con)

    return con, meta

In [27]:
def get_source(con, meta, source_nid):
    """Retrieve Podium Source row by nid
    
    Parameters
    ==========
    
    con : SQLAlchemy connection
    meta : SQL Alchemy Meta Object
    source_nid : Integer source nid
    
    Returns
    =======
    
    Uses ResultProxy first() to retrieve one row and close the result set.
    
    """
    
    assert isinstance(source_nid, int)
    
    pd_source = Table('podium_core.pd_source', meta)
    
    s = pd_source.select()
    s = s.where(pd_source.c.nid == source_nid)
    
    r = con.execute(s)
    
    assert r.rowcount == 1
    
    return r.first()

In [28]:
def get_source_byname(con, meta, source_name, source_type = 'PODIUM_INTERNAL'):
    """Returns the Podium source row for a named Source"""

    lc_source_name = source_name.lower()
    
    pd_source = Table('podium_core.pd_source', meta)
    
    s = select([pd_source.c.nid,
                pd_source.c.sname])
    
    s = s.where(and_(func.lower(pd_source.c.sname) == lc_source_name,
                     pd_source.c.source_type == source_type))

    rp = con.execute(s)
    
    return rp.first()

In [29]:
def get_entity(con, meta, entity_nid):
    """Fetches the entity record by the entity nid, returns 1 row at most"""
    
    assert isinstance(entity_nid, int)
    
    pd_entity = Table('podium_core.pd_entity', meta)
    
    s = select([pd_entity.c.sname,
                pd_entity.c.source_nid])
    
    s = s.where(pd_entity.c.nid == entity_nid)
    
    rp = con.execute(s)
    
    return rp.first()

In [30]:
def get_entity_byname(con, meta, source_nid, entity_name):
    """Fetches the entity record by entity name, returns 1 row at most"""
    
    assert isinstance(source_nid, int)
    
    lc_entity_name = entity_name.lower()
    
    pd_entity = Table('podium_core.pd_entity', meta)
    
    s = select([pd_entity.c.nid,
                pd_entity.c.sname])
    
    s = s.where(and_(func.lower(pd_entity.c.sname) == lc_entity_name,
                     pd_entity.c.source_nid == source_nid))
    
    rp = con.execute(s)
    
    return rp.first()

In [31]:
def get_entity_store(con, meta, entity_id):
    """Fetches the dataflows (if any) that STORE the passed entity id"""
    
    assert isinstance(entity_id, int)
    
    pd_prep_package = Table('podium_core.pd_prep_package', meta)
    pd_bundle = Table('podium_core.pd_bundle', meta)
    
    s = select([pd_bundle.c.nid, pd_bundle.c.sname])
         
    s = s.select_from(pd_prep_package.join(pd_bundle, pd_prep_package.c.bundle_nid == pd_bundle.c.nid))
    
    s = s.where(and_(pd_prep_package.c.entity_id == entity_id,
                     pd_prep_package.c.package_type == 'STORE'))
    
    rp = con.execute(s, eid=entity_id).fetchall()
    
    return rp

In [32]:
def get_bundle_id(con, meta, sname):
    """Get the bundle id of the passed Prepare Workflow name.
    
    This is a case insensitive match and can only return a single row
    or None
    """
    
    pd_bundle = Table('podium_core.pd_bundle', meta)
    
    lc_sname = sname.lower()
    
    s = pd_bundle.select()
    s = s.where(func.lower(pd_bundle.c.sname) == lc_sname)
    
    rp = con.execute(s)
    
    r = rp.first()
    
    return r


In [33]:
def get_bundle_gui_state(con, meta, nid):
    """Get the bundle gui state record datflow nid.
    
    """
    pd_bundle_gui_state = Table('podium_core.pd_bundle_gui_state', meta)
    
    gui_cols = [pd_bundle_gui_state.c.nid,
                pd_bundle_gui_state.c.created_ttz,
                pd_bundle_gui_state.c.modified_ttz,
                pd_bundle_gui_state.c.version,
                pd_bundle_gui_state.c.modifiedby,
                pd_bundle_gui_state.c.createdby]
    
    s = select(gui_cols)
    
    s = s.where(pd_bundle_gui_state.c.nid == nid)
    
    rp = con.execute(s)
    
    return rp.first()


In [34]:
def get_bundle_last_execution(con, meta, bundle_nid, count=10):
    """Get the last count execution details of the specified bundle.
    
    """
    pd_prepare_execution_workorder = Table('podium_core.pd_prepare_execution_workorder', meta)
    
    wo_cols = [pd_prepare_execution_workorder.c.nid,
               pd_prepare_execution_workorder.c.record_count,
               pd_prepare_execution_workorder.c.start_time,
               pd_prepare_execution_workorder.c.end_time]
    
    e = select(wo_cols)
    
    e = e.where(and_(pd_prepare_execution_workorder.c.bundle_nid == bundle_nid, 
                     pd_prepare_execution_workorder.c.end_time.isnot(None),
                     pd_prepare_execution_workorder.c.workorder_status == "FINISHED"))
    
    e = e.order_by(desc(pd_prepare_execution_workorder.c.end_time))
    
    e = e.limit(count)
    
    rp = con.execute(e)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r


In [35]:
def get_entity_last_load(con, meta, source_nid, entity_name, n=1):
    """Get the last execution details of the specified bundle.
    """
    
    pd_source = Table('podium_core.pd_source', meta)
    pd_entity = Table('podium_core.pd_entity', meta)
    pd_workorder = Table('podium_core.pd_workorder', meta)
    
    #print entity_name
    
    parent_source = get_source(con, meta, source_nid)
    
    src = pd_source.select()
    src = src.where(pd_source.c.sname == parent_source.sname)
    
    srp = con.execute(src)
    
    orig_source_id = None
    
    for r in srp:
        print(f'Source: {r.sname}, Source Type: {r.source_type}, nid: {r.nid}')
        if r.source_type != 'PODIUM_INTERNAL':
            orig_source_id = r.nid
            break
    
    print(f'orig_source_id: {orig_source_id}')
    
    if orig_source_id is None:
        return None
    
    ety = pd_entity.select()
    ety = ety.where(and_(pd_entity.c.source_nid == orig_source_id, 
                         pd_entity.c.sname == entity_name))

    rp = con.execute(ety)
    
    orig_entity = rp.first()
    
    if orig_entity is not None:
        
        orig_entity_nid = orig_entity.nid

        wo = select([pd_workorder.c.nid,
                     pd_workorder.c.start_time,
                     pd_workorder.c.end_time,
                     pd_workorder.c.record_count,
                     pd_workorder.c.good_count,
                     pd_workorder.c.bad_count,
                     pd_workorder.c.ugly_count])

        wo = wo.where(and_(pd_workorder.c.entity_nid == orig_entity_nid,
                           pd_workorder.c.workorder_status == 'FINISHED'))
        wo = wo.order_by(desc(pd_workorder.c.end_time))
        wo = wo.limit(n) 

        rp = con.execute(wo)

        r = rp.first()
    
    else:
        r = None
    
    return r


In [36]:
def get_package_nodes(con, meta, bundle_nid):
    
    pd_prep_package = Table('podium_core.pd_prep_package', meta)
    
    s = pd_prep_package.select()
    s = s.where(pd_prep_package.c.bundle_nid == bundle_nid)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

## podium_core Tables Used 

![ER](prep_dep_er.png)

In [37]:
prep_tables = ('pd_bundle',
               'pd_bundle_gui_state',
               'pd_prep_package',
               'pd_entity',
               'pd_source',
               'pd_prepare_execution_workorder',
               'pd_workorder'
               )

## Establish connection to podium_core and fetch used tables metadata

Enter the correct yaml file name (or stream) in the call to the `get_podium_cfg()` function.

In [38]:

cfg = get_podium_cfg(open('pd_cfg.yaml', 'r'))

con_cfg = cfg['pd_connect']['dev']

  """


In [39]:
con, meta = connect(con_cfg['user'], con_cfg['pwd'], con_cfg['db'], con_cfg['host'], con_cfg['port'])

In [40]:
meta.reflect(bind=con, schema='podium_core', only=prep_tables)

## Main get_bundle() Function

This function is called with a single dataflow name (sname) and for each LOADER in the dataflow recurses backwards looking for dataflows that STORE that entity.

If a dataflow is found the function self recurses.

To prevent being caught in circular references, as each dataflow is visited it's name is stored in the wf_list, this list is checked each time the function is entered.

The stop list "STOPPERS[]" is a list of dataflow names that will also stop the recursion process.

The Networkx DiGraph is built up throughout the process adding nodes for Sources, Entities and Dataflows as they are first encountered.

The node ids are the nid's from the related `pd_bundle` (Dataflow), `pd_source` (Source) and `pd_entity` (Entity) tables prefixes with the characters `b_`, `s_` and `e_` respectively.

Edges are created between nodes to show the node relationships.

In [41]:
def get_bundle(con, meta, sname: str, world_graph, wf_list, styles: dict, stop_wf = []):
    """Build bundle dependency digraph"""
    
    # Check if dataflow is in stop list
    if (sname.lower() in stop_wf):
        print(f'Dataflow {sname} is in stop list\n')
        return
    
    source_styles = styles['source']
    entity_styles = styles['entity_source']
    target_styles = styles['entity_target']
    bundle_styles = styles['bundle']
    edge_styles = styles['edge']
    
    
    print(bundle_styles)
    
    bundle = get_bundle_id(con, meta, sname)
    
    #import pdb; pdb.set_trace()
    
    print(f'Current dataflow {bundle.sname} ({bundle.nid})')
    
    if bundle:
        
        bundle_nid = bundle.nid
        bundle_description = bundle.description
        bundle_sname = bundle.sname
        
        bundle_gui_state = get_bundle_gui_state(con, meta, bundle.bundle_gui_state_nid)
        
        if bundle_gui_state:
            bundle_mod_dt = bundle_gui_state.modified_ttz
            bundle_mod_by = bundle_gui_state.modifiedby
            bundle_version = bundle_gui_state.version
        else:
            bundle_mod_dt = 'Unknown'
            bundle_mod_by = 'Unknown'
            bundle_version = 'Unknown'
        
        # To-do - check if output file for version already exists
        #         if so then bypass
        
        bundle_exec = get_bundle_last_execution(con, meta, bundle_nid)
        
        if bundle_exec:
            
            exec_stats = []
            
            for i, r in enumerate(bundle_exec):
                if i == 0:
                    last_record_count = r.record_count
                    last_start_time = r.start_time
                    last_end_time = r.end_time
                    
                exec_stats.append(({'start_time': str(r.start_time), 
                                    'end_time': str(r.end_time),
                                    'records': r.record_count}))
                
        else:
            last_record_count = 0
            last_start_time = ''
            last_end_time = ''
        
        print(f'\t{bundle_nid}, {bundle_description}, {bundle_sname} records {last_record_count}')
        print(f'\tModified by: {bundle_mod_by}, Modified Date: {bundle_mod_dt}, Version: {bundle_version}')
        print(f'\tLast Start: {last_start_time}, Last End: {last_end_time}')
        
    else:
        print(f'Package: {sname}, not found')
        return None
    
    # add bundle to "world" graph
    bundle_node_key = f'b_{bundle_nid}'
    
    W.add_node(bundle_node_key,
               nid=bundle_nid,
               # sname=bundle_sname, 
               label=bundle_sname, 
               n_type='bundle',
               **bundle_styles)
     
    # Add LOADER / STORE nodes
    p = get_package_nodes(con, meta, bundle_nid)
    
    # Add LOADER and STORE nodes to graph
    for n in p:
        
        id = n.nid
        n_type = n.package_type
     
        if n_type in ('LOADER','STORE'):
            
            entity_id = n.entity_id
            
            entity_node_key = f'e_{entity_id}'
            
            if n_type == 'LOADER':
                
                l = get_entity_store(con, meta, entity_id)
                
                if len(l) == 0:
                    print(f'No STORE found for {entity_id}')
                else:
                    for i, ldr in enumerate(l):
                        print(f'{entity_id} ({i}) STORE by {ldr.sname}')
                        
                        if not (ldr.sname.lower() in wf_list):
                            wf_list.append(ldr.sname.lower())
                            get_bundle(con, meta, ldr.sname, world_graph, wf_list, styles, stop_wf)
            
            if (not W.has_node(entity_node_key)):
            
                entity = get_entity(con, meta, entity_id)
                
                source_id = entity.source_nid
                source = get_source(con, meta, source_id)
                
                if n_type == 'LOADER':
                    W.add_node(entity_node_key,
                               n_type='entity',
                               nid=entity_id,
                               snid=source_id,
                               #sname=entity.sname,
                               label=entity.sname,
                               **entity_styles)
                
                if n_type == 'STORE':
                    W.add_node(entity_node_key,
                               n_type='entity',
                               nid=entity_id,
                               snid=source_id,
                               #sname=entity.sname,
                               label=entity.sname,
                               **target_styles)
                
                source_node_key = f's_{source_id}'
                
                if (not W.has_node(source_node_key)):
                    
                    W.add_node(source_node_key,
                           n_type='source',
                           nid=source_id,
                           #sname=source.sname,
                           label=source.sname,
                           **source_styles)
                    
                W.add_edge(source_node_key, 
                           entity_node_key,
                           **edge_styles['source'])
                
            else:
                source_nid = W.node[entity_node_key]['snid']
                source_node_key = f's_{source_nid}'
                print(f"Graph already has entity {entity_id}, {W.node[source_node_key]['label']}.{W.node[entity_node_key]['label']}")

            if (n_type == 'STORE'):                    
                W.add_edge(bundle_node_key, entity_node_key, **edge_styles['store'])
            elif (n_type == 'LOADER'):
                W.add_edge(entity_node_key, bundle_node_key, **edge_styles['loader'])
            else:
                print(f'ERROR {bundle_node_key}, {source_node_key}, {entity_node_key}')

In [42]:
def subg(g, sg, n_type):
    """Create a record type subgraph of the passsed node type"""
    
    
    label_list = [g.node[n]['label'] for n in g.nodes if g.node[n]['n_type'] == n_type]
    
    label_list.sort(key=lambda x: x.lower())
    
    # Start subgraph and header record of number of lines
    print(f'subgraph {sg} {{')
    
    print(f'r_{sg} [shape=record,label="{{')
    print(f'{len(label_list)} {n_type}')
    
    for i, label in enumerate(label_list):
        print(f'| {label}')
    
    # Close subgraph
    print('}"];}')

In [43]:
def write_record_dot(g, output_file=None):
    
    print("digraph g {")

    subg(g, "s1", "bundle")
    subg(g, "s2", "source")
    subg(g, "s3", "entity")
    
    print('}')
    

## Main

In [44]:
# final dataflow name and "stop list"
SEED = "prod_stg_compsych_member_t" # "prod_stg_source_member_t" # 
#STOPPERS = ('je70_chess_history_cdc_v',
#               'je70_chess_init_member_history_v',
#               'je70_chess_copy_member_history_v')


# The stop list should be zero or more dataflow names that are stoppers in the
# recursion. If a dataflow name in te STOPPER list is hit then the recursion will
# stop at that point.
STOPPERS = ()

In [45]:
if __name__ == "__main__":

    W = nx.DiGraph()
    
    node_styles = cfg['styles']['node']
    edge_styles = cfg['styles']['meta_edge']
    meta_styles = cfg['styles']['meta']
    
    now = dt.datetime.now()
    
    
    meta_dict = {'shape': 'record', 'label': f'{{{now.strftime("%Y-%m-%d %H:%M")} | {{<f0> SEED | <f1> {SEED}}}}}', **meta_styles}
    
    W.add_node('node', **node_styles)
    
    
    W.add_node('meta', **meta_dict)
    W.add_node('source', **cfg['styles']['source'])
    W.add_node('entity', **cfg['styles']['entity_source'])
    W.add_node('dataflow', **cfg['styles']['bundle'])
    W.add_node('target', **cfg['styles']['entity_target'])
    
    W.add_edges_from((('entity','source', {'label': 'belongs_to', **edge_styles}),
                      ('entity','dataflow', {'label': 'LOADER', **edge_styles}),
                      ('dataflow','target', {'label': 'STORE', **edge_styles})))
        

                 
    wf_list = []

    wf_list.append(SEED.lower())
    print(f'Processing starting point, dataflow {SEED}')
    
    get_bundle(con, meta, SEED, W, wf_list, cfg['styles'], STOPPERS)
    

    print(f'{len(W.nodes)} nodes added to DiGraph')

    # Write output dot file
    write_dot(W, f'{SEED}.dot')
    
    # Write output GraphML file
    with open(f"{SEED}.graphml", "wb") as ofile:
        nx.write_graphml(W, ofile)
    
    print("Finished")

Processing starting point, dataflow prod_stg_compsych_member_t
{'shape': 'box', 'style': '"filled,rounded"', 'fillcolor': '"#E9446A"'}
Current dataflow prod_stg_Compsych_Member_T (52599)
	52599, , prod_stg_Compsych_Member_T records 692816
	Modified by: SP17, Modified Date: 2018-11-13 12:58:03.577000-05:00, Version: 4
	Last Start: 2018-12-03 12:36:07.258000-05:00, Last End: 2018-12-03 12:36:50.809000-05:00
18091 (0) STORE by init_stg_Compsych_Member_T
{'shape': 'box', 'style': '"filled,rounded"', 'fillcolor': '"#E9446A"'}
Current dataflow init_stg_Compsych_Member_T (54399)
	54399, , init_stg_Compsych_Member_T records 731560
	Modified by: SP17, Modified Date: 2018-12-04 13:05:33.594000-05:00, Version: 8
	Last Start: 2018-12-04 13:55:31.350000-05:00, Last End: 2018-12-04 14:01:00.242000-05:00
No STORE found for 18266
18091 (1) STORE by prod_copy_stg_compsych_sso_eligibility_t
{'shape': 'box', 'style': '"filled,rounded"', 'fillcolor': '"#E9446A"'}
Current dataflow prod_copy_stg_compsych_ss