In [1]:
import networkx as nx
import os
import json

In [6]:
dirs = [
    "dataflow",
    "dataset",
    "integrationRuntime",
    "linkedService",
    "pipeline"
]

In [9]:
keep_pipelines = [
    "pl_history_load_Legacy_FactExternalMPLUsageLog",
    "pl_history_load_Legacy_FactMPLInternalProductUsageLog",
    "pl_history_load_Legacy_FactPCMPLUsageLog",
    "pl_synapse_load_fact_es_usage",
    "pl_synapse_load_fact_pc_mpl",
    "pl_synapse_load_fact_usage_log"
]

In [10]:
def getReferenced(j, referenced):
    key = "referenceName"
    if isinstance(j, list):
        for i in j:
            getReferenced(i, referenced)
    elif isinstance(j, dict):
        if key in j.keys():
            referenced.append(j[key])
        else:
            for i in j.values():
                getReferenced(i, referenced)

In [11]:
def getJsonContent(filename):
    with open(filename, "r") as f:
        contents = f.read()
    contentstring = "".join(contents)
    contentjson = json.loads(contentstring)
    return contentjson

In [12]:
G = nx.DiGraph()
for d in dirs:
    with os.scandir(os.path.join('CIADatalake', d)) as de:
        for entry in de:
            if entry.name.endswith('.json') and entry.is_file():
                filepath = entry.path
                print(filepath)
                
                # read file as json
                j = getJsonContent(filepath)
                name = j["name"]
                
                # add all objects as nodes
                #G.add_node(name)
                
                # get list of referenced names
                referenced = []
                getReferenced(j, referenced)
                
                # construct list of edges
                num_referenced = len(referenced)
                uvedges = list(zip([name] * num_referenced, referenced))
                
                # add to graph
                G.add_edges_from(uvedges)

CIADatalake/dataflow/df_copy_curated_final_lookups.json
CIADatalake/dataflow/df_csv_to_parquet.json
CIADatalake/dataflow/df_final_csv_parquet.json
CIADatalake/dataflow/df_history_load_transform.json
CIADatalake/dataflow/df_history_load_transform_businessarea.json
CIADatalake/dataflow/df_history_load_transform_date.json
CIADatalake/dataflow/df_history_load_transform_dimdepartmenthierarchy.json
CIADatalake/dataflow/df_history_load_transform_individual.json
CIADatalake/dataflow/df_history_load_transform_location.json
CIADatalake/dataflow/df_history_load_transform_lookup_individual.json
CIADatalake/dataflow/df_history_load_transform_lookup_masterproduct.json
CIADatalake/dataflow/df_history_load_transform_lookup_user.json
CIADatalake/dataflow/df_history_load_transform_masterproduct.json
CIADatalake/dataflow/df_history_load_transform_user.json
CIADatalake/dataflow/df_history_lookups_csv_parquet.json
CIADatalake/dataflow/df_processing_final_data_copy.json
CIADatalake/dataflow/df_scd1_departme

In [13]:
from networkx.algorithms.traversal import breadth_first_search as bfs

keep = []
for p in keep_pipelines:
    print(p)
    for s in bfs.bfs_successors(G, p):
        keep.extend(nx.utils.flatten(s))

pl_history_load_Legacy_FactExternalMPLUsageLog
pl_history_load_Legacy_FactMPLInternalProductUsageLog
pl_history_load_Legacy_FactPCMPLUsageLog
pl_synapse_load_fact_es_usage
pl_synapse_load_fact_pc_mpl
pl_synapse_load_fact_usage_log


In [14]:
delete = set(G)-set(keep)

In [15]:
delete.intersection(set(keep))

set()

In [17]:
set(keep)

{'AzureDataLakeStorage_AutoResolve',
 'AzureKeyVault_01',
 'CIADatalake_Synapse_Analytics',
 'ciadatalake-dev-eastus-ir-01',
 'ciadatalake-eastus-autoresolve-ir-01',
 'ciadw_sql_ciadatalake_db',
 'ciadw_sql_db',
 'ds_ciadatalake_parquet_file',
 'ds_ciadw_FactExternalMPLUsageLog',
 'ds_ciadw_FactInternalMPLUsageLog',
 'ds_ciadw_FactPCMPLUsageLog',
 'ds_ciadw_sql_query',
 'ds_factload_parquet',
 'ds_synapse_load_fact_es_usage',
 'ds_synapse_load_fact_pc_mpl',
 'ds_synapse_load_fact_usage_log',
 'pl_history_load_Legacy_FactExternalMPLUsageLog',
 'pl_history_load_Legacy_FactMPLInternalProductUsageLog',
 'pl_history_load_Legacy_FactPCMPLUsageLog',
 'pl_synapse_load_fact_es_usage',
 'pl_synapse_load_fact_pc_mpl',
 'pl_synapse_load_fact_usage_log'}

In [45]:
import pprint; pprint.pprint([f"rm pipeline/{i}" for i in filter(lambda x: x.startswith("pl_") , delete)])

['rm pipeline/pl_tranform_landing_curated_location',
 'rm pipeline/pl_scd_curated_finish_location',
 'rm pipeline/pl_tranform_landing_curated_user',
 'rm pipeline/pl_landing_finish_location_lookups',
 'rm pipeline/pl_tranform_landing_curated_individual',
 'rm pipeline/pl_scd_curated_finish_individual',
 'rm pipeline/pl_scd_curated_finish_masterproduct',
 'rm pipeline/pl_copy_curated_finish_depthierarchy_lookups',
 'rm pipeline/pl_scd_curated_finish_user',
 'rm pipeline/pl_tranform_landing_curated_masterproduct',
 'rm pipeline/pl_scd_curated_finish_departmenthierarchy',
 'rm pipeline/pl_copy_curated_finish_user_lookups',
 'rm pipeline/pl_tranform_landing_curated_departmenthierarchy',
 'rm pipeline/pl_landing_finish_individual_lookups']


In [32]:
new_pipelines = [
    "dim_user_master",
    "common_archive_files"
]
copy = []
for p in new_pipelines:
    for s in bfs.bfs_successors(G, p):
        copy.extend(nx.utils.flatten(s))

In [33]:
set(copy)

{'AzureDataLakeStorage_AutoResolve',
 'AzureKeyVault_01',
 'CIADatalake_Synapse_Analytics',
 'ciadatalake-dev-eastus-ir-01',
 'ciadatalake-eastus-autoresolve-ir-01',
 'ciadw_sql_db',
 'common_archive_files',
 'common_load_transform',
 'dim_user_extract',
 'dim_user_master',
 'ds_ciadw_sql_query',
 'ds_issql_sql_query',
 'ds_parquet_file',
 'ds_synapse_table',
 'issql_sql_db'}

In [19]:
set(delete)

{'AzureDataLakeStorage_SelfHosted',
 'df_copy_curated_final_lookups',
 'df_csv_to_parquet',
 'df_final_csv_parquet',
 'df_history_load_transform',
 'df_history_load_transform_businessarea',
 'df_history_load_transform_date',
 'df_history_load_transform_dimdepartmenthierarchy',
 'df_history_load_transform_individual',
 'df_history_load_transform_location',
 'df_history_load_transform_lookup_individual',
 'df_history_load_transform_lookup_masterproduct',
 'df_history_load_transform_lookup_user',
 'df_history_load_transform_masterproduct',
 'df_history_load_transform_user',
 'df_history_lookups_csv_parquet',
 'df_processing_final_data_copy',
 'df_scd1_departmenthierarchy',
 'df_scd1_individual',
 'df_scd1_individual_lookups',
 'df_scd1_location',
 'df_scd1_location_lookups',
 'df_scd1_masterproduct',
 'df_scd1_user',
 'df_scd2_final_department_hierarchy',
 'df_scd2_final_user',
 'df_scd2_individual',
 'df_scd2_location',
 'df_scd2_masterproduct',
 'df_transform_curated_departmenthierarchy