In [1]:
from sqlalchemy import create_engine
from sqlalchemy import MetaData , Table, Column, Integer, Numeric, String, DateTime, ForeignKey, Text
from sqlalchemy import select, desc
from sqlalchemy import and_, or_, not_
from sqlalchemy import text

# engine = create_engine('postgresql+psycopg2://@hdpedge001.ca.sunlife:5432/podium_md')

In [2]:
import networkx as nx
import json
import datetime as dt
import os
#import matplotlib.pyplot as plt

In [3]:
# Constants

OUTPUT_DIR = r'C:\Projects\CEL\Dataflows\Extracts'

In [4]:
def connect(user, password, db, host='hdpedge001.ca.sunlife', port=5432):
    '''Returns a connection and a metadata object'''
    
    url = 'postgresql+psycopg2://{}:{}@{}:{}/{}'
    url = url.format(user, password, host, port, db)

    # The return value of create_engine() is our connection object
    con = create_engine(url, client_encoding='utf8')

    # We then bind the connection to MetaData()
    meta = MetaData(bind=con)

    return con, meta

In [5]:
def get_join_condition(con, meta, joiner_nid):
    
    pd_prep_joiner_condition = Table('podium_core.pd_prep_joiner_condition', meta)
    
    s = pd_prep_joiner_condition.select()
    s = s.where(pd_prep_joiner_condition.c.joiner_nid == joiner_nid)
    s = s.order_by(pd_prep_joiner_condition.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [6]:
def get_join_master(con, meta, joiner_nid):
    
    pd_prep_joiner_master_field = Table('podium_core.pd_prep_joiner_master_field', meta)
    
    s = pd_prep_joiner_master_field.select()
    s = s.where(pd_prep_joiner_master_field.c.joiner_nid == joiner_nid)
    s = s.order_by(pd_prep_joiner_master_field.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [7]:
def get_join_detail(con, meta, joiner_nid):
    
    pd_prep_joiner_detail_field = Table('podium_core.pd_prep_joiner_detail_field', meta)
    
    s = pd_prep_joiner_detail_field.select()
    s = s.where(pd_prep_joiner_detail_field.c.joiner_nid == joiner_nid)
    s = s.order_by(pd_prep_joiner_detail_field.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [8]:
def get_trans_expression(con, meta, transformer_nid):
    
    pd_prep_transformer_expression = Table('podium_core.pd_prep_transformer_expression', meta)
    
    s = pd_prep_transformer_expression.select()
    s = s.where(pd_prep_transformer_expression.c.transformer_nid == transformer_nid)
    s = s.order_by(pd_prep_transformer_expression.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [9]:
def get_aggregator_expression(con, meta, aggregator_nid):
    
    pd_prep_aggregator_expression = Table('podium_core.pd_prep_aggregator_expression', meta)
    
    s = pd_prep_aggregator_expression.select()
    s = s.where(pd_prep_aggregator_expression.c.aggregator_nid == aggregator_nid)
    s = s.order_by(pd_prep_aggregator_expression.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [10]:
def get_aggregator_group(con, meta, aggregator_nid):
    
    pd_prep_aggregator_group = Table('podium_core.pd_prep_aggregator_group', meta)
    
    s = pd_prep_aggregator_group.select()
    s = s.where(pd_prep_aggregator_group.c.aggregator_nid == aggregator_nid)
    s = s.order_by(pd_prep_aggregator_group.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [11]:
def get_sort_condition(con, meta, sorter_nid):
    
    pd_prep_sort_condition = Table('podium_core.pd_prep_sort_condition', meta)
    
    s = pd_prep_sort_condition.select()
    s = s.where(pd_prep_sort_condition.c.sorter_nid == sorter_nid)
    s = s.order_by(pd_prep_sort_condition.c.idx)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [12]:
def get_router_expressions(con, meta, router_nid):
    
    pd_prep_router_expression = Table('podium_core.pd_prep_router_expression', meta)
    
    rtr = pd_prep_router_expression.select()
    rtr = rtr.where(pd_prep_router_expression.c.router_nid == router_nid)
    rtr = rtr.order_by(pd_prep_router_expression.c.idx)
    
    rp = con.execute(rtr)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [13]:
def get_custom_expression(con, meta, custom_nid):
    
    pd_custom_expression = Table('podium_core.pd_prep_package', meta)
    
    cust = pd_custom_expression.select()
    cust = cust.where(pd_custom_expression.nid == custom_nid)
    
    rp = con.execute(cust)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [14]:
def get_compare_condition(con, meta, compare_nid):
    
    pd_prep_compare_condition = Table('podium_core.pd_prep_compare_condition', meta)
    
    cmp = pd_prep_compare_condition.select()
    cmp = cmp.where(pd_prep_compare_condition.c.compare_nid == compare_nid)
    cmp = cmp.order_by(pd_prep_compare_condition.c.idx)
    
    rp = con.execute(cmp)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [15]:
def get_compare_join(con, meta, compare_nid):
    
    pd_prep_compare_join = Table('podium_core.pd_prep_compare_join', meta)
    
    cmj = pd_prep_compare_join.select()
    cmj = cmj.where(pd_prep_compare_join.c.compare_nid == compare_nid)
    cmj = cmj.order_by(pd_prep_compare_join.c.idx)
    
    rp = con.execute(cmj)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [16]:
def get_source(con, meta, source_nid):
    
    pd_source = Table('podium_core.pd_source', meta)
    
    s = pd_source.select()
    s = s.where(pd_source.c.nid == source_nid)
    
    r = con.execute(s)
    
    return r.first()

In [17]:
def get_entity(con, meta, entity_nid):
    """Fetches the entity record, returns 1 row at most"""
    
    pd_entity = Table('podium_core.pd_entity', meta)
    
    s = select([pd_entity.c.sname,
                pd_entity.c.source_nid])
    s = s.where(pd_entity.c.nid == entity_nid)
    
    r = con.execute(s)
    
    return r.first()

In [18]:
def get_bundle_id(con, meta, sname):
    """Get the bundle id of the passed Prepare Workflow name.
    
    This is a case sensitive match and can only return a single row
    or None
    """
    pd_bundle = Table('podium_core.pd_bundle', meta)
    
    sname = sname.lower()
    
    s = pd_bundle.select()
    s = s.where(text("lower(sname) = :lc_sname"))
    
    rp = con.execute(s, lc_sname=sname)
    
    r = rp.first()
    
    return r


In [19]:
def get_bundle_gui_state(con, meta, nid):
    """Get the bundle id of the passes Prepare Workflow name.
    
    This is a case sensitive match and can only return a single row
    or None
    """
    pd_bundle_gui_state = Table('podium_core.pd_bundle_gui_state', meta)
    
    s = pd_bundle_gui_state.select()
    s = s.where(pd_bundle_gui_state.c.nid == nid)
    
    rp = con.execute(s)
    
    r = rp.first()
    
    return r


In [20]:
def get_bundle_last_execution(con, meta, bundle_nid, count=10):
    """Get the last count execution details of the specified bundle.
    
    """
    pd_prepare_execution_workorder = Table('podium_core.pd_prepare_execution_workorder', meta)
    
    # e = pd_prepare_execution_workorder.select()
    e = select([pd_prepare_execution_workorder.c.nid,
               pd_prepare_execution_workorder.c.record_count,
               pd_prepare_execution_workorder.c.start_time,
               pd_prepare_execution_workorder.c.end_time])
    e = e.where(and_(pd_prepare_execution_workorder.c.bundle_nid == bundle_nid, 
                     pd_prepare_execution_workorder.c.end_time.isnot(None),
                     pd_prepare_execution_workorder.c.workorder_status == "FINISHED"))
    e = e.order_by(desc(pd_prepare_execution_workorder.c.end_time))
    e = e.limit(count)
    
    # print(str(e))
    
    rp = con.execute(e)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r


In [21]:
def get_entity_last_load(con, meta, source_nid, entity_name, n=1):
    """Get the last execution details of the specified bundle.
    
    """
    pd_source = Table('podium_core.pd_source', meta)
    pd_entity = Table('podium_core.pd_entity', meta)
    pd_workorder = Table('podium_core.pd_workorder', meta)
    
    #print entity_name
    
    parent_source = get_source(con, meta, source_nid)
    
    src = pd_source.select()
    src = src.where(pd_source.c.sname == parent_source.sname)
    
    srp = con.execute(src)
    
    orig_source_id = None
    
    for r in srp:
        print("Source: {}, Source Type: {}, nid: {}".format(r.sname, r.source_type, r.nid ))
        if r.source_type != 'PODIUM_INTERNAL':
            orig_source_id = r.nid
            break
    
    #print r
    print("orig_source_id: {}".format(orig_source_id))
    
    if orig_source_id is None:
        return None
    
    ety = pd_entity.select()
    ety = ety.where(and_(pd_entity.c.source_nid == orig_source_id, pd_entity.c.sname == entity_name))

    erp = con.execute(ety)
    
    orig_entity = erp.first()
    
    
    if orig_entity is not None:
        
        orig_entity_nid = orig_entity.nid

        wo = select([pd_workorder.c.nid,
                     pd_workorder.c.start_time,
                     pd_workorder.c.end_time,
                     pd_workorder.c.record_count,
                     pd_workorder.c.good_count,
                     pd_workorder.c.bad_count,
                     pd_workorder.c.ugly_count])

        wo = wo.where(and_(pd_workorder.c.entity_nid == orig_entity_nid, pd_workorder.c.workorder_status == 'FINISHED'))
        wo = wo.order_by(desc(pd_workorder.c.end_time))
        wo = wo.limit(n) 

        rp = con.execute(wo)

        r = rp.first()
    
    else:
        r = None
    
    #print r
    
    return r


In [22]:
def get_field_formats(con, meta, package_nid):
    """Get the inbound / outbound field formats for the passed package_nid
    
    Returns a dictionary of the form
    
    {INBOUND:[[],[],...],
     OUTBOUND:[[],[],...]}
     
     Typically the OUTBOUND list would have only 1 member list.
    """
    package_field_formats = {'INBOUND':[], 'OUTBOUND':[]}
    
    pd_prep_port = Table('podium_core.pd_prep_port', meta)
    pd_prep_field_format = Table('podium_core.pd_prep_field_format', meta)
    
    #for c in pd_prep_package.c:
    #    print c
    
    s = pd_prep_port.select()
    s = s.where(pd_prep_port.c.package_nid == package_nid)

    rp = con.execute(s)
    
    for p in rp:
        
        record_format_nid = p.record_format_nid
        f = pd_prep_field_format.select()
        f = f.where(pd_prep_field_format.c.record_format_nid == record_format_nid)
        f = f.order_by(pd_prep_field_format.c.idx)
        
        fp = con.execute(f)
        
        field_formats = fp.fetchall()
        
        # Convert to list of lists
        ff = [list(f) for f in field_formats]
        
        if p.port_type == 'INBOUND':
            package_field_formats['INBOUND'].append(ff)
        else:
            package_field_formats['OUTBOUND'].append(ff)
            
        fp.close()
    
    rp.close()
    
    return package_field_formats

In [23]:
def get_package_nodes(con, meta, bundle_nid):
    
    pd_prep_package = Table('podium_core.pd_prep_package', meta)
    
    #for c in pd_prep_package.c:
    #    print c
    
    s = pd_prep_package.select()
    s = s.where(pd_prep_package.c.bundle_nid == bundle_nid)
    
    rp = con.execute(s)
    
    r = rp.fetchall()
    
    rp.close()
    
    return r

In [24]:
def get_package_connectors(con, meta, bundle_nid, G):
    """Get all the connectors between the various transforms.
    
    There is a connecter between a node and its source data (inbound data).
    
    This function constructs a list of (from, to) tuples that describe the
    DiGraph edges and their direction.
    """
    
    p_edges = []
    
    OP_TEMPLATE = "From: {}, To: {}, ob_idx: {}, Outbound: {}, ib_idx: {}, Inbound: {}"
    
    pd_prep_connector = Table('podium_core.pd_prep_connector', meta)
    pd_prep_port = Table('podium_core.pd_prep_port', meta)
    pd_prep_package = Table('podium_core.pd_prep_package', meta)
    
    s = pd_prep_connector.select()
    s = s.where(pd_prep_connector.c.bundle_nid == bundle_nid)
    
    c_rp = con.execute(s)
    
    # con_rows = c_rp.fetchall()
    
    for c in c_rp:
        
        ob_port = c.outbound_port_nid
        ib_port = c.inbound_port_nid
        
        p = pd_prep_port.select()
        p = p.where(pd_prep_port.c.nid.in_([ob_port, ib_port]))
        
        p_rp = con.execute(p)
        p_rows = p_rp.fetchall()
        
        # node outbound port -> connector outbound port -> connector inbound port -> node inbound port
        for port in p_rows:
            
            #pkg = pd_prep_package.select()
            #pkg = pkg.where(pd_prep_package.c.nid == port.package_nid)
            #pkg_rp = con.execute(pkg)
            #pkg_rows = pkg_rp.fetchall()
            
            #for package in pkg_rows:
            #    package_type = package.package_type
            package_type = G.node[port.package_nid]['n_type']
            
            e_type = port.port_type
            
            if e_type == 'OUTBOUND':
                e_from_nid = port.package_nid
                e_ob_idx = port.idx
                e_outbound_type = package_type
            elif e_type == 'INBOUND':
                e_to_nid = port.package_nid
                e_ib_idx = port.idx
                e_inbound_type = package_type
        
        p_rp.close()
        
        print(OP_TEMPLATE.format(e_from_nid, e_to_nid, e_ob_idx, e_outbound_type, e_ib_idx, e_inbound_type))
        
        p_edges.append((e_from_nid, e_to_nid, e_ob_idx, e_outbound_type, e_ib_idx, e_inbound_type))

    c_rp.close()
    
    return p_edges

In [25]:
# This closure function us used to keep a records of
# the max y values at a level in the graph
def make_free():
    free = {}
    def get_free(x):
        nonlocal free
        if x in free:
            free[x] = free[x] + 10
        else:
            free[x] = 1
        return free[x]
    return get_free

In [26]:
def set_level(G, node_id, free, curr_level, curr_free):
    """Set the level of the node for use in the visjs Network"""
    
    if G.node[node_id]['level'] < curr_level:
        G.node[node_id]['level'] = curr_level
        
    G.node[node_id]['level_free'] = curr_free
    
    print("node_id: {}, curr_level: {}, curr_free: {}".format(node_id, curr_level, curr_free))
    curr_level += 1
    
    p_predecessors = G.predecessors(node_id)
    
    for e, pre in enumerate(p_predecessors):
        curr_free = free(curr_level)
        set_level(G, pre, free, curr_level, curr_free)
    

In [27]:
def create_graph_nodes(con, meta, G, bundle_package):
    """Create a new DiGraph from the package nodes
    
    Node data attributes / node type (n_type):
    
       n_type
       
       LOADER / STORE
       --------------
       entity_id
       data_selection
       source_id
       source_name
       
       JOINER
       ------
       join_condition
       
       TRANSFORMER
       -----------
       
       AGGREGATOR
       ----------
       aggregator_expression
       aggregator_group
       
       SORTER
       ------
       sort-condition
       
       FILTER
       ------
       filter-expression
       
       UNION
       -----
       union_type
       
       ROUTER
       ------
       router_expressions
       
    """
    
    node_count = len(bundle_package)
    
    print("{:d} nodes in ".format(node_count))
    
    #pd_entity = Table('podium_core.pd_entity', meta)
    #pd_source = Table('podium_core.pd_source', meta)
    
    sources = {}
    
    for row in bundle_package:
        
        id = row.nid
        
        n_type = row.package_type
        
        print("{:4d} {}".format(id, n_type))
        
        G.add_node(id)
        
        G.node[id]['n_type'] = n_type
        G.node[id]['comments'] = row.package_comments
        G.node[id]['field_formats'] = get_field_formats(con, meta, id)
        G.node[id]['level'] = 0
    
        if n_type in ('LOADER','STORE'):
            
            entity_id = row.entity_id
            G.node[id]['entity_id'] = entity_id
            G.node[id]['data_selection'] = row.data_selection
            
            entity = get_entity(con, meta, row.entity_id)
                
            entity_name = entity.sname
            source_id = entity.source_nid
            
            # to-do Tie workflow entity (INTERNAL) back to source
            if n_type == 'LOADER':                
                workorder = get_entity_last_load(con, meta, source_id, entity_name)
            else:
                workorder = None
                
            G.node[id]['entity_name'] = entity_name
            G.node[id]['source_id'] = source_id 
            
            if workorder:
                G.node[id]['start_time'] = str(workorder.start_time)
                G.node[id]['end_time'] = str(workorder.end_time)
                G.node[id]['record_count'] = workorder.record_count
                G.node[id]['good_count'] = workorder.good_count
                G.node[id]['bad_count'] = workorder.bad_count
                G.node[id]['ugly_count'] = workorder.ugly_count
            else:
                G.node[id]['record_count'] = 0
                G.node[id]['good_count'] = 0
                G.node[id]['bad_count'] = 0
                G.node[id]['ugly_count'] = 0
                
            if source_id in sources:
                pass
            else:
                parent_source = get_source(con, meta, source_id)
                source_name = parent_source.sname
                    
                sources[source_id] = {'sname':source_name}
                
            source_name = sources[source_id]['sname']
                
            G.node[id]['source_name'] = source_name
                
        elif n_type == 'JOINER':
            
            G.node[id]['join_type'] = row.mode
            
            j_rows = get_join_condition(con, meta, id)
            
            join_condition = []
            
            for j in j_rows:
                idx = j.idx
                join_condition.append((j.master_expression, j.join_operator, j.detail_expression))
            
            G.node[id]['join_condition'] = join_condition
            
            master_fields = []
            
            m_rows = get_join_master(con, meta, id)
            
            for m in m_rows:
                master_fields.append((m.field_name, m.idx))
             
            G.node[id]['master_fields'] = master_fields
                
            detail_fields = []
            
            d_rows = get_join_detail(con, meta, id)
            
            for d in d_rows:
                detail_fields.append((d.field_name, d.idx))
             
            G.node[id]['detail_fields'] = detail_fields
            
            
        elif n_type == 'TRANSFORMER':
            
            trans_expression = []
            t_rows = get_trans_expression(con, meta, id)
            
            for t in t_rows:
                trans_expression.append(t.expression)
                
            G.node[id]['transformer_expression'] = trans_expression
            
        elif n_type == 'AGGREGATOR':
            
            aggregator_expression = []
            ae_rows = get_aggregator_expression(con, meta, id)
            
            for ae in ae_rows:
                aggregator_expression.append(ae.expression)
                
            G.node[id]['aggregator_expression'] = aggregator_expression
            
            aggregator_group = []
            ag_rows = get_aggregator_group(con, meta, id)
            
            for ag in ag_rows:
                aggregator_group.append(ag.expression)
                
            G.node[id]['aggregator_group'] = aggregator_group
            
        elif n_type == 'SORT':
            
            sort_condition = []
            
            s_rows = get_sort_condition(con, meta, id)
            
            for s in s_rows:
                sort_condition.append([[s.sort_field],[s.sort_direction],[s.idx]])
                
            G.node[id]['sort_condition'] = sort_condition
            
        elif n_type == 'FILTER':
            
            filter_expression =  row.expression
            
            G.node[id]['filter_expression'] = filter_expression
            
        elif n_type == 'UNION':
            
            G.node[id]['union_type'] = row.mode
            
        elif n_type in ('ROUTER'):
            
            G.node[id]['mode'] = row.mode
            
            exp_rows = get_router_expressions(con, meta, id)
            
            router_expressions = []
            
            for e in exp_rows:
                router_expressions.append((e.expression))

            G.node[id]['router_expressions'] = router_expressions
            
        elif n_type in ('COMPARE'):
            
            print(">>> CDC COMPARE found")
            
            j_rows = get_compare_join(con, meta, id)
            
            join_condition = []
            
            for j in j_rows:
                idx = j.idx
                join_condition.append((j.master_expression, j.join_operator, j.detail_expression))
            
            G.node[id]['join_condition'] = join_condition
            
            compare_fields = []
            
            c_rows = get_compare_condition(con, meta, id)
            
            for c in c_rows:
                compare_fields.append(((c.master_expression, c.join_operator, c.detail_expression)))
             
            G.node[id]['compare_fields'] = compare_fields
                
            #detail_fields = []
            
            #d_rows = get_join_detail(con, meta, id)
            
            #for d in d_rows:
            #    detail_fields.append((d.field_name, d.idx))
             
            #G.node[id]['detail_fields'] = detail_fields
            
        elif n_type in ('CUSTOM'):
            
            print('>>> CUSTOM Transform Found')
            
            G.node[id]['expression'] = row.expression
            
        else:
            
            print(">>> Unexpected Package Tpe: {} - Found".format(n_type))

            


In [28]:
prep_tables = ('pd_bundle',
               'pd_bundle_gui_state',
               'pd_prep_package',
               'pd_prep_connector',
               'pd_prep_port',
               'pd_prep_aggregator_expression',
               'pd_prep_aggregator_group',
               'pd_prep_compare_condition',
               'pd_prep_compare_join',
               'pd_prep_field_format',
               'pd_prep_joiner_condition',
               'pd_prep_joiner_detail_field',
               'pd_prep_joiner_master_field',
               'pd_prep_parameter', 
               'pd_prep_record_format',
               'pd_prep_router_expression',
               'pd_prep_sort_condition',
               'pd_prep_transformer_expression',
               'pd_entity',
               'pd_source',
               'pd_prepare_execution_workorder',
               'pd_workorder'
               )

In [29]:
# con, meta = connect('je70', '2tb;U;{[>4i34&x', 'podium_md')

In [30]:
# host='cl11148.sunlifecorp.com'
con, meta = connect('je70', 'wharf3dal3', 'podium_md', host='pg_dev5436.sunlifecorp.com', port=5436)

In [31]:
meta.reflect(bind=con, schema='podium_core', only=prep_tables)

In [32]:
for t in meta.tables:
    print(t)

podium_core.pd_bundle
podium_core.pd_bundle_gui_state
podium_core.pd_prep_package
podium_core.pd_prep_connector
podium_core.pd_prep_port
podium_core.pd_prep_record_format
podium_core.pd_prep_aggregator_expression
podium_core.pd_prep_aggregator_group
podium_core.pd_prep_compare_condition
podium_core.pd_prep_compare_join
podium_core.pd_prep_field_format
podium_core.pd_prep_joiner_condition
podium_core.pd_prep_joiner_detail_field
podium_core.pd_prep_joiner_master_field
podium_core.pd_prep_parameter
podium_core.pd_prep_router_expression
podium_core.pd_prep_sort_condition
podium_core.pd_prep_transformer_expression
podium_core.pd_entity
podium_core.pd_source_conn
podium_core.pd_source
podium_core.pd_source_hier
podium_core.pd_prepare_execution_workorder
podium_core.pd_workorder


In [46]:
def get_bundle(con, meta, sname, world_graph):
    """Build bundle digraph and write to file"""
    
    bundle = get_bundle_id(con, meta, sname)
    
    print(bundle)
    
    if bundle:
        
        bundle_nid = bundle.nid
        bundle_description = bundle.description
        bundle_sname = bundle.sname
        
        bundle_gui_state = get_bundle_gui_state(con, meta, bundle.bundle_gui_state_nid)
        
        bundle_mod_dt = bundle_gui_state.modified_ttz
        bundle_mod_by = bundle_gui_state.modifiedby
        bundle_version = bundle_gui_state.version
        
        # To-do - check if output file for version already exists
        #         if so then bypass
        
        bundle_exec = get_bundle_last_execution(con, meta, bundle_nid)
        
        if bundle_exec:
            
            exec_stats = []
            
            for i, r in enumerate(bundle_exec):
                if i == 0:
                    last_record_count = r.record_count
                    last_start_time = r.start_time
                    last_end_time = r.end_time
                    
                exec_stats.append(({'start_time': str(r.start_time), 
                                    'end_time': str(r.end_time),
                                    'records': r.record_count}))
                
        else:
            last_record_count = 0
            last_start_time = ''
            last_end_time = ''
        
        print("{}, {}, {} records {}".format(bundle_nid, bundle_description, bundle_sname, last_record_count))
        print("Modified by: {}, Modified Date: {}, Version: {}".format(bundle_mod_by, bundle_mod_dt, bundle_version))
        print("{}, {}".format(last_start_time, last_end_time))
        
    else:
        print("Package: {}, not found".format(sname))
        return None
    
    # add bundle to "world" graph
    bundle_node_key = f'b_{bundle_nid}'
    
    W.add_node(bundle_node_key,
               nid=bundle_nid, 
               sname=bundle_sname, 
               n_type='bundle')
    
    # Create Networkx DiGraph
    G = nx.DiGraph(name=bundle_sname, 
                   id=bundle_nid) 
    
    # Add nodes
    p = get_package_nodes(con, meta, bundle_nid)
    
    # Add LOADER and STORE nodes to world map
    for n in p:
        
        id = n.nid
        n_type = n.package_type
     
        if n_type in ('LOADER','STORE'):
            
            entity_id = n.entity_id
            
            entity_node_key = f'e_{entity_id}'
            
            if (not W.has_node(entity_node_key)):
            
                entity = get_entity(con, meta, entity_id)
                
                source_id = entity.source_nid
                source = get_source(con, meta, source_id)
                
                W.add_node(entity_node_key,
                           n_type='entity',
                           nid=entity_id,
                           snid=source_id,
                           sname=entity.sname)
                
                source_node_key = f's_{source_id}'
                
                if (not W.has_node(source_node_key)):
                    
                    W.add_node(source_node_key,
                           n_type='source',
                           nid=source_id,
                           sname=source.sname)
                    
                W.add_edge(source_node_key, entity_node_key)
                
                if (n_type == 'STORE'):                    
                    W.add_edge(bundle_node_key, entity_node_key)
                elif (n_type == 'LOADER'):
                    W.add_edge(entity_node_key, bundle_node_key)
                else:
                    print(f'ERROR {bundle_node_key}, {source_node_key}, {entity_node_key}')
                
            else:
                source_nid = W.node[entity_node_key]['snid']
                source_node_key = f's_{source_nid}'
                print(f"World map already has entity {entity_id}, {W.node[source_node_key]['sname']}.{W.node[entity_node_key]['sname']}")
                # print(f"World map already has entity {entity_id}")

            
            

    # Resume bundle graph
    
    print("{} nodes in bundle".format(len(p)))
    
    create_graph_nodes(con, meta, G, p)
    
    # Add Edges
    c = get_package_connectors(con, meta, bundle_nid, G)
    
    for e in c:
        G.add_edge(e[0], e[1], ob_idx=e[2], inbound_type=e[3], ib_idx=e[4], outbound_type=e[5])
        
    # Get the target  STORE, this will get level=1 attribute
    # level used by visjs Network for drawing a hierarchy
    for n in G.nodes():
        n_type = G.node[n]['n_type']
        if n_type == 'STORE':
            target_node = G.node[n]
            break
        
    print("Target node is {}".format(n))

    # Set levels for all other nodes
    n_level = 0

    target_node = n
    
    free = make_free()
    
    set_level(G, target_node, free, 0, 1)
    
    # At this point the levels are max -> 0
    # so now invert
    max_level = 0
    
    print("Longest path by dag_longest_path_length: {}".format(nx.algorithms.dag.dag_longest_path_length(G)))

    for n in G.nodes():
        level = G.node[n]['level']
        if level > max_level:
            max_level = level

    print("max level: {}".format(max_level))
    
    for n in G.nodes():
        new_level = max_level - G.node[n]['level']
        G.node[n]['level'] = new_level 
     
    # Prepare output file name
    time_stamp = '{:%Y-%m-%d %H:%M}'.format(dt.datetime.today())

    # Contruct dataflow object
    df_obj = {"workflow":sname,
              "description": bundle_description,
              "extractdate": time_stamp,
              "bundle_id": bundle_nid,
              "exec_stats": exec_stats,
              "last_record_count": last_record_count,
              "last_start_time": str(last_start_time),
              "last_end_time": str(last_end_time),
              "modified_dt": str(bundle_mod_dt),
              "modified_by": bundle_mod_by,
              "version": str(bundle_version),
              "nodes":[],
              "edges":[]}

    # save the router expressions for including in
    # visjs edge label
    
    # This dictionary willl hold any ROUTERs expressions in the dataflow
    # keyed by ROUTER package nid
    router_exp = {}
    
    for n in G.nodes():

        r = {"id":n}

        for k in G.node[n]:
            
            kv = G.node[n][k]
            r[k] = kv
            
            if k == 'router_expressions':
                router_exp[n] = kv

        df_obj['nodes'].append(r)

    
    # Joiner label, Master == left, Detail == right
    joiner_master_detail_lbl = ('Master','Detail')
    cdc_slave_master_lbl = ('Slave','Master')


    # Build the visjs edge labels
    for id, e in enumerate(G.edges(data=True)):

        # Could be multiple labels so build as a list
        label_txt = []
        
        node_from = e[0]
        node_to = e[1]
        
        edge_metadata = e[2]

        ob_idx= edge_metadata['ob_idx']
        ib_idx= edge_metadata['ib_idx']
        
        target_type = edge_metadata['outbound_type']
        source_type = edge_metadata['inbound_type']

        if target_type == 'JOINER':
            label_txt.append(joiner_master_detail_lbl[ib_idx])
            
        if target_type == 'COMPARE':
            label_txt.append(cdc_slave_master_lbl[ib_idx])
            
        if source_type == 'ROUTER':
            router_exp = G.node[node_from]['router_expressions']
            label_txt.insert(0, router_exp[ob_idx])


        df_obj['edges'].append({"id":id,
                                "from":node_from,
                                "to":node_to,
                                "label":', '.join(label_txt).strip(),
                                "arrows":"to"})
        
    file_time_stamp = '{:%Y_%m_%d_%H%M}'.format(dt.datetime.today())
    
    DIR = OUTPUT_DIR

    filename = os.path.join(DIR, "{}_{}.js".format(sname,file_time_stamp))

    fo = open(filename, 'w')

    json.dump(df_obj, fo)

    fo.close()

    print("File written to {}".format(filename))



In [34]:
# Compass Workflows
sname = ['prod_COMPASS_BILL_STMT_DETAIL_VIEW',
'prod_COMPASS_MEMBER_ADDRESS_V',
'prod_COMPASS_MEMBER_COVERAGE_GROUPS_V',
'prod_COMPASS_MEMBER_COVERAGE_V',
'prod_COMPASS_MEMBER_DEPENDENT_V',
'prod_COMPASS_MEMBER_PROFILE_V',
'prod_COMPASS_MEMBER_SALARY_V',
'prod_Compass_Bill_Groups_v',
'prod_Compass_Member_Groups_v']

In [35]:
sname = ['prod_policy_billing_info_billing_location_benefit_view',
         'prod_policy_billing_info_billing_statments_view']

In [36]:
sname = [
    'prod_Policy_Info_Policy_Benefits_View',
    'prod_Policy_Info_Policy_Profile_View' ]

In [37]:
sname = [
'prod_AEB_Member_Groups_v',
'prod_AEB_Benefit_Options_Detail_Part_1_v',
'prod_AEB_Benefit_Options_Detail_Part_2_v',
'prod_AEB_Benefit_Options_Detail_Part_3_v',
'prod_AEB_Benefit_Options_Detail_Part_4_v',
'prod_AEB_Benefit_Options_Detail_Part_5_v',
'prod_AEB_Benefit_Options_Detail_Part_6_v',
'prod_AEB_Policy_Profile_v',
'prod_AEB_Benefit_Options_v',
'prod_AEB_Benefit_Options_Detail_v',
'prod_AEB_Benefit_Details_v',
'prod_AEB_Bill_Groups_v',
'prod_AEB_Billing_Statements_v',
'prod_AEB_Policy_Documents_v',
'prod_AEB_Contacts_v',
'prod_aeb_policy_profile_history_v',
'prod_copy_aeb_policy_profile_history_v',
 ]

In [38]:
sname = [
'je70_oasis_delta_memberd_history_v',
 ]

In [39]:
# FDL Staging

sname = [ 'prod_sponsor_dl_staging_t',
'prod_sponsor_address_dl_staging_t',
'prod_sponsor_contact_dl_staging_t',
'prod_policy_dl_staging_t',
'prod_policy_benefit_dl_staging_t',
'prod_Policy_Conversion_Hist_DL_T'
]

In [40]:
sname = [ 
'init_copy_policy_benefit_dl_t',
'init_copy_policy_dl_t',
'init_copy_sponsor_address_dl_t',
'init_copy_sponsor_contact_dl_t',
'init_copy_sponsor_dl_t',
'init_policy_benefit_dl_t',
'init_policy_dl_t',
'init_sponsor_address_dl_t',
'init_sponsor_contact_dl_t',
'init_sponsor_dl_t',
'prod_bill_group_statement_benefit_dl_t',
'prod_bill_group_statement_dl_t',
'prod_bill_group_vers_dl_cdc_t',
'prod_combined_bill_cell_summary_dl_t_part_1',
'prod_combined_bill_cell_summary_dl_t_part_2',
'prod_combined_bill_cell_summary_dl_t_part_3',
'prod_combined_bill_detail_dl_t',
'prod_combined_billing_statement_vers_dl_t',
'prod_compass_bill_cell_summary_dl_t_part_1',
'prod_compass_bill_cell_summary_dl_t_part_2',
'prod_compass_bill_cell_summary_dl_t_part_3',
'prod_compass_bill_detail_dl_t_part_1',
'prod_compass_bill_detail_dl_t_part_2',
'prod_compass_bill_detail_dl_t_part_3',
'prod_compass_bill_detail_dl_t_part_4',
'prod_compass_bill_detail_dl_t_part_5',
'prod_compass_bill_detail_dl_t_part_6',
'prod_compass_bill_detail_dl_t_part_7',
'prod_compass_bill_group_vers_dl_staging_t',
'prod_compass_billing_statement_vers_dl_t',
'prod_copy_bill_group_vers_dl_t',
'prod_copy_dwcore_bill_adjstmnt_detl_2015',
'prod_copy_dwcore_bill_detl_2015',
'prod_copy_policy_benefit_vers_dl_t',
'prod_copy_sct108_bill_detl_2015',
'prod_copy_sponsor_address_vers_dl_t',
'prod_copy_sponsor_contact_vers_dl_t',
'prod_copy_sponsor_vers_dl_t',
'prod_oasis_bill_cell_summary_dl_t_part_1',
'prod_oasis_bill_cell_summary_dl_t_part_2',
'prod_oasis_bill_detail_dl_t_part_1',
'prod_oasis_bill_detail_dl_t_part_2',
'prod_oasis_bill_detail_dl_t_part_3',
'prod_oasis_bill_detail_dl_t_part_4',
'prod_oasis_bill_group_vers_dl_staging_t',
'prod_oasis_billing_statement_vers_dl_t',
'prod_oasis_sa_billing_statement_vers_dl_t',
'prod_policy_benefit_dl_cdc_t',
'prod_policy_benefit_dl_staging_t',
'prod_policy_conversion_hist_dl_t',
'prod_policy_dl_cdc_t',
'prod_policy_dl_staging_t',
'prod_sponsor_address_dl_cdc_t',
'prod_sponsor_address_dl_staging_t',
'prod_sponsor_contact_dl_cdc_t',
'prod_sponsor_contact_dl_staging_t',
'prod_sponsor_dl_cdc_t',
'prod_sponsor_dl_staging_t',
]

In [41]:
sname = ['je70_oasis_delta_memberd_history_v','je70_oasis_init_memberd_history_v','je70_oasis_pre_match']

In [42]:
sname = [ 
#'prod_AEB_Policy_Profile_v',
#'prod_AEB_Member_Groups_v',
#'prod_AEB_Benefit_Options_v',
#'prod_Billing_Location_Contact_v',
#'prod_CEL_Timeline_v',
#'prod_Compass_Bill_Groups_v',
#'prod_COMPASS_BILL_STMT_DETAIL_VIEW',
#'prod_COMPASS_MEMBER_ADDRESS_V',
#'prod_COMPASS_MEMBER_COVERAGE_GROUPS_V',
#'prod_COMPASS_MEMBER_COVERAGE_V',
#'prod_COMPASS_MEMBER_DEPENDENT_V',
#'prod_Compass_Member_Groups_v',
#'prod_COMPASS_MEMBER_PROFILE_V',
#'prod_COMPASS_MEMBER_SALARY_V',
#'prod_Member_Benefits_View',
#'prod_Member_Dependents_View',
#'prod_Member_Details_Member_Address_View',
#'prod_Member_Details_Member_Earnings_View',
#'prod_Member_Details_Member_Profile_View',
#'prod_Member_Search_View',
#'prod_Policy_Billing_Info_Billing_Class_View',
#'prod_Policy_Billing_Info_Billing_Location_Benefit_View',
#'prod_Policy_Billing_Info_Billing_Location_View',
#'prod_Policy_Billing_Info_Billing_Plan_View',
#'prod_Policy_Billing_Info_Billing_Statments_View',
#'prod_Policy_Billing_Info_Billing_Txn_Alloc_View',
#'prod_Policy_Billing_Info_Plan_Earning_Definition_Coverage_Options',
#'prod_Policy_Documents_Contracts',
#'prod_Policy_Documents_GWEW_Documents_Email_Association',
#'prod_Policy_Documents_Sold_Case_Documents',
#'prod_Policy_Events_GWEW_Events_View',
#'prod_Policy_GWEW_Document_v',
#'prod_Policy_GWEW_Note_v',
#'prod_Policy_GWEW_Work_Objects_View_Journey_style',
#'prod_Policy_Info_Policy_Benefits_View',
#'prod_Policy_Info_Policy_Profile_View',
#'prod_Policy_Info_Policy_Sales_Agreement_View',
#'prod_Policy_Info_Policy_Sales_Team_View',
#'prod_Policy_Info_Sponsor_Address_View',
#'prod_Policy_Info_Sponsor_Contact_View',
#'prod_Policy_Info_Sponsor_Profile_View',
#'prod_Policy_Plan_Design_Admin_Class_Provisions',
#'prod_Policy_Plan_Design_Benefit_Provisions',
#'prod_Policy_Plan_Design_Class_Provisions',
#'prod_Policy_QPS_Events_v',
#'prod_Policy_Search_View',
#'prod_Policy_Search_View_No_Benefit_Group',
#'prod_QPS_Reqt_Opt_Usage_Sort_v',
#'prod_QPS_Reqt_Opt_UsedBy_BFT_Sort_v',
#'prod_Sponsor_Contact_Role_V',
#'qar_CEL_Timeline_v',
#'qar_Compass_Bill_Groups_v',
#'qar_COMPASS_BILL_STMT_DETAIL_V',
#'qar_COMPASS_MEMBER_ADDRESS_V',
#'qar_COMPASS_MEMBER_COVERAGE_GROUPS_V',
#'qar_COMPASS_MEMBER_COVERAGE_V',
#'qar_COMPASS_MEMBER_DEPENDENT_V',
#'qar_Compass_Member_Groups_v',
#'qar_COMPASS_MEMBER_PROFILE_V',
#'qar_COMPASS_MEMBER_SALARY_V',
#'sq_prd_Billing_Location_Contact_v',
#'sq_prd_Member_Benefits_View',
#'sq_prd_Member_Dependents_View',
#'sq_prd_Member_Details_Member_Address_View',
#'sq_prd_Member_Details_Member_Earnings_View',
#'sq_prd_Member_Details_Member_Profile_View',
#'sq_prd_Member_Search_View',
#'sq_prd_Policy_Billing_Info_Billing_Class_View',
#'sq_prd_Policy_Billing_Info_Billing_Location_Benefit_View',
#'sq_prd_Policy_Billing_Info_Billing_Location_View',
#'sq_prd_Policy_Billing_Info_Billing_Plan_View',
#'sq_prd_Policy_Billing_Info_Billing_Statments_View',
#'sq_prd_Policy_Billing_Info_Billing_Txn_Alloc_View',
#'sq_prd_Policy_Billing_Info_Plan_Earning_Definition_Coverage_Options',
#'sq_prd_Policy_Documents_Contracts',
#'sq_prd_Policy_Documents_GWEW_Documents_Email_Association',
#'sq_prd_Policy_Documents_Sold_Case_Documents',
#'sq_prd_Policy_GWEW_Document_v',
#'sq_prd_Policy_Events_GWEW_Events_View',
#'sq_prd_Policy_GWEW_Note_v,'
#'sq_prd_Policy_GWEW_Work_Objects_View_Journey_style',
#'sq_prd_Policy_Info_Policy_Benefits_View',
#'sq_prd_Policy_Info_Policy_Profile_View',
#'sq_prd_Policy_Info_Policy_Sales_Agreement_Views',
#'sq_prd_Policy_Info_Policy_Sales_Team_View',
#'sq_prd_Policy_Info_Sponsor_Address_View',
#'sq_prd_Policy_Info_Sponsor_Contact_View',
#'sq_prd_Policy_Info_Sponsor_Profile_View',
#'sq_prd_Policy_Plan_Design_Admin_Class_Provisions',
#'sq_prd_Policy_Plan_Design_Benefit_Provisions',
#'sq_prd_Policy_Plan_Design_Class_Provisions',
#'sq_prd_Policy_QPS_Events_v',
#'sq_prd_Policy_Search_View',
#'sq_prd_Policy_Search_View_No_Benefit_Group',
#'sq_prd_Sponsor_Contact_Role_V'
]

In [43]:
sname = ['prod_Sponsor_Contact_Role_V',
         'prod_Policy_v',
'prod_Sponsor_v'
#'qar_CEL_Timeline_v',
#'qar_Compass_Bill_Groups_v',
#'qar_COMPASS_BILL_STMT_DETAIL_V',
#'qar_COMPASS_MEMBER_ADDRESS_V',
#'qar_COMPASS_MEMBER_COVERAGE_GROUPS_V',
#'qar_COMPASS_MEMBER_COVERAGE_V',
#'qar_COMPASS_MEMBER_DEPENDENT_V',
#'qar_Compass_Member_Groups_v',
#'qar_COMPASS_MEMBER_PROFILE_V',
#'qar_COMPASS_MEMBER_SALARY_V',
]

In [47]:
sname = ['je70_chess_init_member_history_v',
         'je70_chess_history_cdc_v',
         'je70_chess_copy_member_history_v']

In [48]:
bundle_count = len(sname)

# M = nx.DiGraph()
W = nx.DiGraph()

for i,s in enumerate(sname):
    print("{} / {} - {}".format(i+1, bundle_count, s))
    get_bundle(con, meta, s, W)

1 / 3 - je70_chess_init_member_history_v
(51421, '', 1905, 'je70_chess_init_member_history_v')
51421, , je70_chess_init_member_history_v records 854414
Modified by: je70, Modified Date: 2018-11-15 09:23:27.237000-05:00, Version: 10
2018-11-18 05:51:52.160000-05:00, 2018-11-18 05:55:34.568000-05:00
13 nodes in bundle
13 nodes in 
52713 STORE
51422 TRANSFORMER
51429 CUSTOM
>>> CUSTOM Transform Found
51438 TRANSFORMER
51437 TRANSFORMER
51436 ROUTER
51435 AGGREGATOR
51434 ROUTER
51433 JOINER
51431 FILTER
51430 STORE
51428 TRANSFORMER
51423 LOADER
Source: CHESS_FILE_PR, Source Type: FILE, nid: 325
orig_source_id: 325
From: 51428, To: 52713, ob_idx: 0, Outbound: TRANSFORMER, ib_idx: 0, Inbound: STORE
From: 51438, To: 51430, ob_idx: 0, Outbound: TRANSFORMER, ib_idx: 0, Inbound: STORE
From: 51433, To: 51428, ob_idx: 0, Outbound: JOINER, ib_idx: 0, Inbound: TRANSFORMER
From: 51436, To: 51433, ob_idx: 0, Outbound: ROUTER, ib_idx: 1, Inbound: JOINER
From: 51434, To: 51433, ob_idx: 0, Outbound: RO

In [42]:
# World object output
# Prepare output file name
world_time_stamp = '{:%Y_%m_%d_%H%M}'.format(dt.datetime.today())

# Contruct dataflow object
world_obj = { "extractdate": world_time_stamp,
              "nodes":[],
              "edges":[]}

for n in W.nodes(data=True):
    
    node_id = n[0]
    data = n[1]
    
    group = data['n_type']
    label = data['sname']
    
    wn = {"id":node_id,
          "group":group,
         "label":label}

    world_obj['nodes'].append(wn)
    
for id, e in enumerate(W.edges(data=True)):

        # Could be multiple labels so build as a list
        label_txt = []
        
        node_from = e[0]
        node_to = e[1]

        world_obj['edges'].append({"id":id,
                                "from":node_from,
                                "to":node_to,
                                "arrows":"to"})
    


#    # Joiner label, Master == leftm Detail == right
#    joiner_master_detail_lbl = ('Master','Detail')


#    # Build the visjs edge labels
#    for id, e in enumerate(G.edges(data=True)):

#        # Could be multiple labels so build as a list
#        label_txt = []
        
#        node_from = e[0]
#        node_to = e[1]
        
#        edge_metadata = e[2]

#        ob_idx= edge_metadata['ob_idx']
#        ib_idx= edge_metadata['ib_idx']
        
#        target_type = edge_metadata['outbound_type']
#        source_type = edge_metadata['inbound_type']

#        if target_type == 'JOINER':
#            label_txt.append(joiner_master_detail_lbl[ib_idx])
#            
#        if source_type == 'ROUTER':
#            router_exp = G.node[node_from]['router_expressions']
#            label_txt.insert(0, router_exp[ob_idx])###
#
#
#        df_obj['edges'].append({"id":id,
#                                "from":node_from,
#                                "to":node_to,
#                                "label":', '.join(label_txt).strip(),
#                                "arrows":"to"}
    
# Output World Network    
DIR = OUTPUT_DIR

world_filename = os.path.join(DIR, "{}_{}.js".format('world',world_time_stamp))

fo = open(world_filename, 'w')

json.dump(world_obj, fo)

fo.close()

print("File written to {}".format(world_filename))
    

File written to C:\Projects\CEL\Dataflows\Extracts\world_2018_10_25_1819.js


In [None]:
print("Done")

## Scratch Code

In [None]:
for n in W.nodes(data=True):
    print(n)

In [None]:
world_obj

In [None]:
list(W.edges)

In [None]:
W.node['e_16614']

In [None]:
json.dumps(world_obj)

In [None]:
x = None
print(x is None)

In [None]:
a = []


In [None]:
a.insert('b')

In [None]:
' abcdef  '.strip()

In [None]:
def make_free():
    free = {}
    def get_free(x):
        nonlocal free
        if x in free:
            free[x] = free[x] + 10
        else:
            free[x] = 1
        return free[x]
    return get_free
    

In [None]:
c1 = make_free()

In [None]:
print (c1(1), c1(1), c1(1), c1(2), c1(2))

In [None]:
c2 = make_counter()

In [None]:
print(c1(), c2(), c1())

In [None]:
del make_free

In [None]:
%%javascript

console.log("Hi number")

In [None]:
x = dt.datetime.today()

In [None]:
x

In [None]:
type(x)

In [None]:
dt.datetime.strftime??

In [None]:
'{%Y:%m:%d %H:%M:%S}'.format(dt.datetime.today())

In [None]:
import urllib

In [None]:
def encpw(user, pwd):
    return urllib.parse.urlencode({'j_username':user,'j_password': pwd})

In [None]:
if (__name__ == '__main__'):
    print(encpw('je70', '#0{FKt8yGhCv4bBNF92xz7kAg==}'))

In [None]:
import string

In [None]:
'ABC'.lower()

In [None]:
import pygraphviz
import matplotlib.pyplot as plt
from networkx.drawing.nx_pydot import write_dot

In [None]:
pos = nx.nx_agraph.graphviz_layout(W)
nx.draw(W, with_labels=True)
write_dot(W, r'c:\projects\cel\w.dot')


In [None]:
plt.subplot(121)

In [None]:
%matplotlib inline