In [10]:
import numpy as np
import pandas as pd
import re


from metabasepy import Client, MetabaseTableParser

In [2]:
##### LINEAGE #####

def remove_comments(view_definition):
    # split text into lines
    lines = view_definition.split('\n')
    # remove comment
    cleaned_lines = [line.split('--')[0] for line in lines]
    cleaned_lines = [line.split('#')[0] for line in cleaned_lines]
    # return cleaned view definition
    return '\n'.join(cleaned_lines)

def extract_sources(text):
    # define pattern to search
    pattern = re.compile(r'\b(from|join)\b', re.IGNORECASE)
    # find matches
    matches = pattern.finditer(text)
    # iterate through matches to find sources
    sources = []
    for match in matches:
        # extract text
        start_index = match.end()
        source = text[start_index:].strip().split()[0]
        # clean #1: remove symbols
        if '`' in source:
            parts = source.split('`')
            if len(parts) > 2:
                source = parts[1]
        source = re.sub(r'[^a-zA-Z0-9._]', '', source)
        # clean #2: remove project name 
        parts = source.split('.')
        if len(parts) > 2:
            source = '.'.join(parts[-2:])
        # clean #3: append only if it's not cte
        parts = source.split('.')
        if len(parts) == 2:
            sources.append(source)
    # return sources
    return sources

def get_edges(df_raw):
    df_raw_ = df_raw.copy()
    # remove comments and extract sources
    df_raw_['view_definition_wo_comments'] = df_raw_['view_definition'].apply(remove_comments)
    df_raw_['sources'] = df_raw_['view_definition_wo_comments'].apply(extract_sources)
    df_raw_ = df_raw_.explode('sources')[['table_name', 'sources']].rename(columns={'table_name': 'table', 'sources': 'source'})
    # add scheme in front of table
    df_raw_['table'] = df_raw_['table'].apply(lambda x: 'defi_bi.' + x)
    # remove _v and _t
    df_raw_['table'] = df_raw_['table'].apply(lambda x: x[:-2] if x.endswith('_v') or x.endswith('_t') else x)
    df_raw_['source'] = df_raw_['source'].apply(lambda x: x[:-2] if x.endswith('_v') or x.endswith('_t') else x)
    df_raw_.drop_duplicates(inplace=True)
    # return processed data
    return df_raw_

def get_nodes(df_edges):
    df_edges_ = df_edges.copy()
    all_tables = set(df_edges_['table'])
    all_sources = set(df_edges_['source'])
    all_tables_sources = all_tables.union(all_sources)
    # classify table type
    results = []
    for ts in all_tables_sources:
        if ts in all_tables and ts in all_sources:
            results.append({'table': ts, 'type': 'default'})
        elif ts in all_sources:
            results.append({'table': ts, 'type': 'input'})
        else:
            results.append({'table': ts, 'type': 'output'})
    df_result = pd.DataFrame(results)
    # return node
    return df_result

def get_lineage(df_edges, selected_table):
    # initiate process
    prev_left = df_edges.copy()
    prev_right = df_edges.copy()
    left = df_edges[(df_edges.table==selected_table)].copy()
    right = df_edges[(df_edges.source==selected_table)].copy()
    # iterate to find left lineage
    while prev_left.shape != left.shape:
        prev_left = left.copy()
        left = pd.concat([left, df_edges[(df_edges.table.isin(prev_left.source))].copy()]).drop_duplicates()
    # iterate to find right lineage
    while prev_right.shape != right.shape:
        prev_right = right.copy()
        right = pd.concat([right, df_edges[(df_edges.source.isin(prev_right.table))].copy()]).drop_duplicates()
    # combine left and right
    df_lineage = pd.concat([left,right]).drop_duplicates()
    return df_lineage

In [62]:
##### METABASE #####

def initiate_metabase():
    cli = Client('nicolas.gunawan@moladin.com', 'MoladinOpsPlanning123', 'https://metabase.moladin.com/')
    cli.authenticate()

def run_metabase(card_id):
    # get preview table
    response = cli.cards.query(str(card_id))
    preview_table = MetabaseTableParser.get_table(response)
    column_names = [col['display_name'] for col in preview_table.cols]

    # get full table if rows = 2000
    if preview_table.row_count == 2000:
        json_result = cli.cards.download(card_id=str(card_id), format='json')
        df = pd.DataFrame(json_result)
        # arrange column orders
        df = df[column_names]
    else:
        df = pd.DataFrame(preview_table.rows, columns=column_names)
    
    return df

In [3]:
df_raw = pd.read_csv('data/defi_bi.csv')
df_edges = get_edges(df_raw)
df_nodes = get_nodes(df_edges)
df_lineage = get_lineage(df_edges, 'defi_bi.31_credit_cost')

In [63]:
initiate_metabase()
df = run_metabase(2866)

In [64]:
df

Unnamed: 0,table_name,view_definition
0,10_col_resp_dcm_assignment_v,CREATE VIEW `moladin-mof-data-prod.defi_bi.10_...
1,11_employee_phone_book_v,CREATE VIEW `moladin-mof-data-prod.defi_bi.11_...
2,11_marketplace_defi_v,CREATE VIEW `moladin-mof-data-prod.defi_bi.11_...
3,11_marketplace_seed_v,CREATE VIEW `moladin-mof-data-prod.defi_bi.11_...
4,11_purging_data_history_v,CREATE VIEW `moladin-mof-data-prod.defi_bi.11_...
...,...,...
452,temp_sample_tools_enhancement,CREATE VIEW `moladin-mof-data-prod.defi_bi.tem...
453,temp_tr_offline_gathering,CREATE VIEW `moladin-mof-data-prod.defi_bi.tem...
454,temp_tr_pricing_history_checker,CREATE VIEW `moladin-mof-data-prod.defi_bi.tem...
455,temp_warehouse_process,CREATE VIEW `moladin-mof-data-prod.defi_bi.tem...
