In [1]:
# General Imports

import numpy as np
from numpy import random as rnd
from matplotlib import pyplot as plt
import os,sys,datetime,time,math, warnings,itertools

import pandas as pd
from contextlib import contextmanager

# Snowpark imports
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.window import Window

  warn_incompatible_dep(


In [2]:
def read_single_lineage_extract(excel_path):
    """
    Read and process an Informatica Lineage Excel extract for a single lineage.

    This function reads an Informatica Lineage Excel extract file, performs post-processing
    specific to this file type, and returns a DataFrame containing relevant lineage information.

    Args:
        excel_path (str): The file path to the Informatica Lineage Excel extract.

    Returns:
        pandas.DataFrame: A DataFrame containing lineage information with the following columns:
            - 'Accounts'
            - 'Martlayer.CRMCLOUD_OSC.Accounts'
            - 'SF_CRM_DataMarts/MARTLAYER/CRMCLOUD_OSC/Accounts/'
            - 'Source File'

    Note:
        The function expects the input Excel file to have a specific structure as defined by Informatica Lineage.
        The function will skip the first two rows and use the third row as the header row for reading the DataFrame.
        Post-processing is applied to filter the DataFrame based on the 'SF_CRM_DataMarts' column and select specific
        columns related to the 'Accounts' lineage. The 'Source File' column will be added to the DataFrame with the value
        of the 'excel_path' argument.

    Example:
        excel_path = 'path/to/lineage_extract.xlsx'
        lineage_df = read_single_lineage_extract(excel_path)
    """
    # Expects file structure as defined by Informatica Lineage output
    df = pd.read_excel(excel_path,skiprows=2,header=1)
    # Postprocessing defined for this specific file type
    df = df[df['Resource Name']=='SF_CRM_DataMarts'].filter(items=[
        'Asset Name',
        'Business_Terms',
        'Path'
    ],axis=1)
    df['DP Table'] = excel_path.replace('.xls','').replace('.xlsx','')
    return df

In [3]:
@contextmanager
def cwd(path):
    """
    Context manager to temporarily change the current working directory.

    This context manager changes the current working directory to the specified 'path'
    while the context is active. After the context exits, the original working directory
    is restored.

    Args:
        path (str): The path to the directory to which the current working directory should be changed.

    Usage:
        with cwd('/path/to/new/directory'):
            # Code executed within this block will have the current working directory changed.
            # After the block exits, the original working directory is restored.

    Example:
        with cwd('/home/user/documents'):
            file_list = os.listdir()  # List files in the '/home/user/documents' directory
        # 
    """
    oldpwd = os.getcwd()
    os.chdir(path)
    try: yield
    finally: os.chdir(oldpwd)

In [15]:
def read_single_lineage_extract(excel_path):
    """
    Read and process an Informatica Lineage Excel extract for a single lineage.

    This function reads an Informatica Lineage Excel extract file, performs post-processing
    specific to this file type, and returns a DataFrame containing relevant lineage information.

    Args:
        excel_path (str): The file path to the Informatica Lineage Excel extract.

    Returns:
        pandas.DataFrame: A DataFrame containing lineage information with the following columns:
            - 'Accounts'
            - 'Martlayer.CRMCLOUD_OSC.Accounts'
            - 'SF_CRM_DataMarts/MARTLAYER/CRMCLOUD_OSC/Accounts/'
            - 'Source File'

    Note:
        The function expects the input Excel file to have a specific structure as defined by Informatica Lineage.
        The function will skip the first two rows and use the third row as the header row for reading the DataFrame.
        Post-processing is applied to filter the DataFrame based on the 'SF_CRM_DataMarts' column and select specific
        columns related to the 'Accounts' lineage. The 'Source File' column will be added to the DataFrame with the value
        of the 'excel_path' argument.

    Example:
        excel_path = 'path/to/lineage_extract.xlsx'
        lineage_df = read_single_lineage_extract(excel_path)
    """
    # Expects file structure as defined by Informatica Lineage output
    df = pd.read_excel(excel_path,skiprows=2,header=1)
    # Postprocessing defined for this specific file type
    df = df[df['Resource Name']=='SF_CRM_DataMarts'].filter(items=[
        'Asset Name',
        'Business_Terms',
        'Path'
    ],axis=1)
    df['DP Table'] = excel_path.replace('.xls','').replace('.xlsx','')
    return df

In [4]:
# Path settings
if sys.platform.startswith('win'):
    excel_input_directory = r'../../TF_Data/Dropbox/PhD Prep/DQ Framework - Clustering/Data Products/Informatica Lineage Status Funnel Management 11.08.2023/'
    excel_output_directory = r'../../TF_Data/Dropbox/PhD Prep/DQ Framework - Clustering/Data Products/'
    shsdq_directory = r'../../TF_Data/Dropbox/PhD Prep/SHS DQ/'

In [13]:
# Load Data Products and associated customer tables
with cwd(excel_output_directory):
    products_file = pd.read_excel('Preliminary List of Data Products.xlsx',sheet_name='Data Products')
    products_file = products_file.merge(
        pd.read_excel('Preliminary List of Data Products.xlsx',sheet_name='DP Layer Tables'),
        on='Data Product Name',how='inner'
    )

In [37]:
# Read in individual Excel files to construct columns in data product
with cwd(excel_input_directory):
    lineage_file = pd.DataFrame()
    # Iterate tables in Data Product
    for dp_member in os.listdir():
        lineage_file = pd.concat([lineage_file,read_single_lineage_extract(dp_member)],axis=0)
    lineage_file = lineage_file.reset_index(drop=True)

# Post-processing of lineage extract
lineage_file['Path_tmp'] = lineage_file['Path'].str.split('/')
lineage_file[['TABLE_CATALOG','TABLE_SCHEMA','TABLE_NAME']] = pd.DataFrame(lineage_file.Path_tmp.tolist(),index=lineage_file.index).drop(columns=[0,4])
lineage_file = lineage_file.drop(columns=['Path_tmp','Path'])

In [60]:
# Determine composition of Data products on source table level
dp_composition = products_file.add_prefix('Customer Tables.').merge(
    lineage_file[['TABLE_CATALOG','TABLE_SCHEMA','TABLE_NAME','DP Table']].add_prefix('Source Table.'),
    left_on='Customer Tables.Relevant Data Table in Products Layer',right_on='Source Table.DP Table',how='left'
).drop(columns=['Customer Tables.Technical Location','Source Table.DP Table']).drop_duplicates().reset_index(drop=True)

In [42]:
# Connection Parameters
connection_parameters = {
    'account':'shsitdl.west-europe.azure',
    'user':'jan-lucas.deinhard@siemens-healthineers.com',
    'authenticator':'externalbrowser',
    'role':'FR_CRMCLOUD_DEV',
    'database':'MARTLAYER',
    'schema':'INFORMATION_SCHEMA',
    'warehouse':'W_CRMCLOUD_P'
}

# Establish Connection
session = Session.builder.configs(connection_parameters).create()

# Fetch column table from Snowflake
cC = session.table("COLUMNS")
C = pd.DataFrame(cC.collect())

session.close()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...


In [46]:
# Read file with DQ Checks
with cwd(shsdq_directory):
    dqf_flag = pd.read_excel('DQ Flags in CDC.xlsx',sheet_name='DQ Checks')

In [76]:
# Contract columns file to relevant DQ flags
C_contracted = C[['TABLE_CATALOG','TABLE_SCHEMA','TABLE_NAME','COLUMN_NAME']].merge(
    dqf_flag[['Name','Dimension']],
    left_on='COLUMN_NAME',right_on='Name',
    how='inner'
).drop_duplicates()

In [173]:
# Data Quality Flags per data product
dp_dqfs = dp_composition.merge(
    C_contracted,
    left_on=['Source Table.TABLE_CATALOG','Source Table.TABLE_SCHEMA','Source Table.TABLE_NAME'],right_on=['TABLE_CATALOG','TABLE_SCHEMA','TABLE_NAME'],
    how='inner'
).drop(columns=['TABLE_CATALOG','TABLE_SCHEMA','TABLE_NAME'])

In [174]:
# Create the SQL Select Statement
dp_dqfs['SQL Select'] = dp_dqfs[['Source Table.TABLE_CATALOG','Source Table.TABLE_SCHEMA','Source Table.TABLE_NAME','COLUMN_NAME']].apply(lambda x:
    r'''SELECT "'Y'" AS "Y","'N'" AS "N","NULL" FROM (SELECT "{3}" as "Status",SUM(1) AS "ct" FROM "{0}"."{1}"."{2}" GROUP BY ALL) PIVOT(SUM("ct") FOR "Status" IN ('N','Y',NULL))'''.format(x[0],x[1],x[2],x[3])                                                                                                           
,axis=1)

In [126]:
# Connection Parameters
connection_parameters = {
    'account':'shsitdl.west-europe.azure',
    'user':'jan-lucas.deinhard@siemens-healthineers.com',
    'authenticator':'externalbrowser',
    'role':'FR_CRMCLOUD_DEV',
    'warehouse':'W_CRMCLOUD_P'
}

# Establish Connection
session = Session.builder.configs(connection_parameters).create()

In [159]:
results = []

ctr = 1
totalrows = dp_dqfs.shape[0]
for idx,k in dp_dqfs.iterrows():
    print('({1}/{2}) Working on {0}...'.format(k['Name'],ctr,totalrows))
    ctr += 1
    query_result = session.sql(k['SQL Select'])
    results.append(pd.DataFrame(query_result.collect()).iloc[0].tolist())


(1/302) Working on Opportunity Expired (Flag)...
(2/302) Working on Data Quality Issue (Flag)...
(3/302) Working on Data Quality Issue (Flag)...
(4/302) Working on Implausible Revenue (Flag)...
(5/302) Working on Possible Test Quote (Flag)...
(6/302) Working on Implausible Cost (Flag)...
(7/302) Working on Opportunity Expired (Flag)...
(8/302) Working on Data Quality Issue (Flag)...
(9/302) Working on Data Quality Issue (Flag)...
(10/302) Working on Implausible Revenue (Flag)...
(11/302) Working on Possible Test Quote (Flag)...
(12/302) Working on Implausible Cost (Flag)...
(13/302) Working on Expiration Date Missing (Flag)...
(14/302) Working on Expiration Date Missing (Flag)...
(15/302) Working on Potential Test Account (Flag)...
(16/302) Working on Potential Test Account (Flag)...
(17/302) Working on Potential Test Account (Flag)...
(18/302) Working on Potential Test Account (Flag)...
(19/302) Working on Potential Test Account (Flag)...
(20/302) Working on Potential Test Account (Fl

In [180]:
session.close()

In [175]:
dp_dqfs = pd.concat(
    [dp_dqfs,pd.DataFrame(results).rename(columns={0:'Y Count',1:'N Count',2:'NULL Count'})]
,axis=1)

dp_dqfs[['Y Count','N Count','NULL Count']] = dp_dqfs[['Y Count','N Count','NULL Count']].fillna(0)

In [192]:
source_dqfs = dp_dqfs.groupby(['Source Table.TABLE_CATALOG','Source Table.TABLE_SCHEMA','Source Table.TABLE_NAME','Dimension'])['COLUMN_NAME'].apply(list).reset_index(name='Flags')

In [193]:
source_dqfs[['Source Table.TABLE_CATALOG','Source Table.TABLE_SCHEMA','Source Table.TABLE_NAME','Flags']]

Unnamed: 0,Source Table.TABLE_CATALOG,Source Table.TABLE_SCHEMA,Source Table.TABLE_NAME,Flags
0,MARTLAYER,CRMCLOUD_CPQ,ACTIVE_QUOTES_HEADER,"[Opportunity Expired (Flag), Opportunity Expir..."
1,MARTLAYER,CRMCLOUD_CPQ,ACTIVE_QUOTES_HEADER,"[Data Quality Issue (Flag), Implausible Revenu..."
2,MARTLAYER,CRMCLOUD_CPQ,ACTIVE_QUOTES_HEADER,"[Data Quality Issue (Flag), Data Quality Issue..."
3,MARTLAYER,CRMCLOUD_CPQ,ACTIVE_QUOTES_HEADER,"[Possible Test Quote (Flag), Possible Test Quo..."
4,MARTLAYER,CRMCLOUD_CPQ,IN_VITRO_Contract_KPIs,"[Implausible Revenue (Flag), Implausible Cost ..."
5,MARTLAYER,CRMCLOUD_CPQ,Proposals,"[Expiration Date Missing (Flag), Expiration Da..."
6,MARTLAYER,CRMCLOUD_OSC,Accounts,"[Potential Test Account (Flag), Potential Test..."
7,MARTLAYER,CRMCLOUD_OSC,OLIs,[Primary Incumbent Vendor Product Missing (Fla...
8,MARTLAYER,CRMCLOUD_OSC,OLIs,"[DQ Issue Funnel Inconsistencies (Flag), Sale ..."
9,MARTLAYER,CRMCLOUD_OSC,OLIs,"[Decision Date Wrong (Flag), Shipment Date Wro..."
