In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Notebook Description: The Metadata Discovery is responsible to extract metadata                */
#/*                       from a query, such as columns, where, join, group by, subquery.          */
#/*                       Also, extract Function, Partition and Variables (using regex)            */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

In [None]:
pip install mo_sql_parsing



In [None]:
pip install sql_metadata



In [None]:
# Import Libraries
from mo_sql_parsing import parse
from mo_sql_parsing import format as format_json
from sql_metadata import Parser
import json
import re
import enum
import pandas as pd

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: List of Comparison Operators                                                      */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

class Operator(enum.Enum):
#/* <START > */
    eq = "="
    neq = "<>"
    gt = ">"
    gte = "=>"
    lt = "<"
    lte = "<="
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Target - Aux Functions for Metadata Output                                    */
#/*               Returns columns and name of target table                                         */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_target(parser):
#/* <START > */
    try:
        t_columns = parser['columns']
        t_table = parser['insert']
    except:
        t_columns = ''
        t_table = ''
    return t_columns, t_table
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Table and Alias - Aux Function for Get Source                                 */
#/*               Returns array with table_name and table_alias                                    */
#/*               OUTPUT [table_name, table_alias]                                                 */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def build_simple_from(parser):
#/* <START > */
    if type(parser) is str:
        return [parser, ""]
    else:
        return [parser['value'], parser['name']]
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get List of Columns of the SELECT- Aux Function for Get Source                    */
#/*               Returns the select string from dict                                              */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_select_string(parser):
#/* <START > */
    s_expression = format_json({'select': parser}).replace('SELECT ','')
    regex = r',\s*(?![^()]*\))'
    return re.split(regex, s_expression)
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Where Condition - Aux Function                                                */
#/*               Returns the where string from dict                                               */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_where_string(parser):
#/* <START > */
    r = format_json({'where': parser}).replace('WHERE ', '',1)
    return r
 #/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Type of Query                                                                 */
#/*               Returns the type of query                                                        */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def type_query(parser):
#/* <START > */
    queries_types = ['create table', 'update', 'delete', 'insert', 'select', 'union_all', 'union', 'drop table', 'drop']
    for t in queries_types:
        if t in list(parser.keys()):
            return t
 #/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Create Header for Output                                                          */
#/*               Aggregates the important info about the query and returns as dict                */
#/*               -Type of query                                                                   */
#/*               -Number of tables in query                                                       */
#/*               -Name of the tables                                                              */
#/*               -Number of subqueries                                                            */
#/*               -If contains join condition                                                      */
#/*               -If contains where condition                                                     */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_header(query, type_of_query):
#/* <START > */

    parser_info = Parser(query)
    tables = parser_info.tables

    if 'TABLE' in tables or '' in tables:
        try:
            tables.remove('TABLE')
            [x for x in tables if x !='']
        except:
            pass
    number_of_tables = len(tables)
    number_of_subqueries = len(parser_info.subqueries)
    try:
        contains_join = 'join' in list(parser_info.columns_dict.keys()) if type_of_query != "create table" and type_of_query != "drop"  else []
    except:
        contains_join = []
    try:
        contains_where = 'where' in list(parser_info.columns_dict.keys()) if type_of_query != "create table" and type_of_query != "drop" else []
    except:
        contains_where = []
    return {
        "type_query": type_of_query,
        "number_tables": number_of_tables,
        "tables": tables,
        "number_subqueries": number_of_subqueries,
        "contains_join": contains_join,
        "contains_where": contains_where
    }
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Create Query Parameters                                                       */
#/*               Builds the final dict when query is a create table                               */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def extract_create_table(parser):
#/* <START > */

    #/* PARSER CREATE | Get Datatype */
    def get_c_type(parser_type):
        keys = list(parser_type.keys())
        type_value = parser_type[keys[0]]
        output_type_value = type_value if type(type_value) is not dict else ""
        return [keys[0], output_type_value]

    #/* PARSER CREATE | Mapping Column of Create Statement */
    def build_columns(parser_columns):
        array = []
        if isinstance(parser_columns, list): #if it's a list
            for c in parser_columns:
                c_type = get_c_type(c['type'])
                keys = list(c.keys())
                array.append({
                    'name': c['name'],
                    'type': c_type,
                    'null': c['nullable'] if 'nullable' in keys else False
                })
            return array
        else:
            parser_columns = [parser_columns]
            return build_columns(parser_columns)

    create_parser = parser['create table']
    c_table = create_parser['name']
    c_columns = build_columns(create_parser['columns'])
    return {
        "table": c_table,
        "columns": c_columns
    }
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Parser to get Delete Query Parameters                                             */
#/*               Builds the final dict when query is a delete                                     */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def extract_delete(parser):
#/* <START > */

    #/* PARSER DELETE | Get parameters for subquery if exists DELETE */
    def is_subquery_delete(parser, string=""):
        array = [e.name for e in Operator]
        array.append('select')
        array.append("")
        is_dict = type(parser) is dict
        first_key = list(parser.keys())[0] if is_dict else ""
        if first_key not in array:
            return is_subquery_delete(parser[first_key], string + " " + str(first_key))
        else:
            output = parser if is_dict else ""
            return is_dict, output, string

    has_subquery, subquery, type_where = is_subquery_delete(parser['where'])

    #/* PARSER DELETE | Can bet subquery or array of (column, value) */
    d_subquery = extract_query(subquery) if has_subquery else ""
    d_where = type_where if has_subquery else get_where_string(parser['where'])
    d_table = parser['delete']

    return {
        "table": d_table,
        "where": d_where.removeprefix(" "),
        "sub_table": d_subquery
    }
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Parser to get UPDATE Query Parameters                                             */
#/*               Builds the final dict when query is an update                                    */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def extract_update(parser):
#/* <START > */

    #/* PARSER UPDATE | Check Subquery */
    def is_subquery(set_parser):
        keys = list(set_parser.keys())
        return type(set_parser[keys[0]]) is dict, keys

    #/* PARSER UPDATE | List of arguments of the SET */
    def build_set(set_parser):
        array = []
        for x in set_parser['set']:
            obj = set_parser['set'][x]
            array.append([x, obj['literal']]) if type(obj) is dict else array.append([x, obj])
        return array

    keys = list(parser.keys())
    subquery_exists, set_keys = is_subquery(parser['set'])

    #/* PARSER UPDATE | Can bet subquery or array of (column, value) */
    c_set = extract_query(parser['set'][set_keys[0]]) if subquery_exists else build_set(parser)
    c_where = get_where_string(parser['where']) if 'where' in keys else []

    return {
        "set": c_set,
        "where": c_where
    }
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Query Parameters, this function returns metadata of the query such as:        */
#/*               columns, table, name, the join and where condition, group by, having and order   */
#/*               Only call this function when reached the simplest dict                           */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_query(parser, count, from_is_array, have_join):
#/* <START > */

    #/* GET QUERY | Get Parameters */
    dict_keys       = parser.keys()
    from_dict       = parser['from'][0] if from_is_array and have_join else parser['from']
    array_of_tables = True if from_is_array and not have_join else False

    #/* GET QUERY | Get columns */
    columns = get_select_string(parser['select'])

    #/* GET QUERY | Get source table */
    table = [[table['value'], table['name']] for table in parser['from']] if array_of_tables else build_simple_from(from_dict)

    #/* GET QUERY | Get where condition */
    try:
        where_condition = get_where_string(parser['where']) if 'where' in dict_keys else []

    #/* GET QUERY | Get where condition when where condition uses SELECT */
    except:
        where_condition = eval(str(parser['where']))

    #/* GET QUERY | Get JOIN, GROUP BY, HAVING and ORDER BY */
    join_condition = extract_join(parser['from'][1:], count+1) if have_join else []
    groupby = format_json({'groupby': parser['groupby']}).removeprefix("groupby") if 'groupby' in dict_keys else []
    having = format_json({'having': parser['having']}).removeprefix("having") if 'having' in dict_keys else []
    orderby = format_json({'orderby': parser['orderby']}).removeprefix("orderby") if 'orderby' in dict_keys else []

    #/* GET QUERY | Source Output */
    r = {
        "source_id": count,
        "source_columns": columns,
        "source_table": table,
        "source_join": join_condition,
        "source_condition": where_condition,
        "source_groupby": groupby,
        "source_having": having,
        "source_orderby": orderby
    }
    return r
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get JOIN parameters                                                               */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def extract_join(array_dict_obj, counter):
#/* <START > */

    #/* GET JOIN | Check if is a subquery */
    def is_subquery(parser):
        return type(parser['value']) is dict

    array_join = []
    for join_dict in array_dict_obj:
        keys = list(join_dict.keys())

        #/* GET JOIN | Extract type of join */
        type_join = keys[0]

        #/* GET JOIN | Extract table name/subquery (JSON) */
        join_table = [extract_query(join_dict[type_join]['value'], counter+1), join_dict[type_join]['name']] if is_subquery(join_dict[type_join]) else [join_dict[type_join]['value'], join_dict[type_join]['name']]

        #/* GET JOIN | Extract join condition */
        join_condition = format_json({keys[1]: join_dict[keys[1]]})
        array_join.append({
            'type_join': type_join,
            'join_table': join_table,
            'join_condition': join_condition
        })
    return array_join
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Get Query and Subquery, Join, UNION Parameters Functions                          */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def extract_query(parser, count=0):
#/* <START > */

    #/* EXTRACT QUERY | Check if is a subquery */
    def subquery_exists(parser, is_list):
        if type(parser) is str:
            return False
        else:
            return type(parser[0]['value']) is dict if is_list else type(parser['value']) is dict

   #/* EXTRACT QUERY | Check join */
    def join_exists(parser):
        for e in parser:
            for e2 in list(e.keys()):
                if 'join' in e2:
                    return True
        return False

    #/* EXTRACT QUERY | Check if have subquery, join */
    def get_info(parser):

        #/* EXTRACT QUERY | Verify if from it's an array */
        if type(parser) is list:
            return True, subquery_exists(parser, True), join_exists(parser)

        #/* EXTRACT QUERY | A normal subquery without joins */
        elif subquery_exists(parser, False):
            return False, True, False

        #/* EXTRACT QUERY | A normal query without any subquery or joins */
        else:
            return False, False, False

    try:
        is_list, have_subquery, have_join = get_info(parser['from'])
    except:
        is_list, have_subquery, have_join = False, False, False

    keys = list(parser.keys())

    #/* EXTRACT QUERY | If exists a subquery */
    if have_subquery:

        #/* EXTRACT QUERY | Get columns */
        columns = get_select_string(parser['select'])

        #/* EXTRACT QUERY | Get source table for UNION case */
        if 'union' in str(parser['from']['value']):
            list_source=[]
            list_select=[]
            for select in parser['from']['value']['union_all']:

                #/* EXTRACT QUERY | Get parameters (Table, Where, Join, Group by, Having, Order by) */
                table = extract_query(select, count+1) if is_list else extract_query(select, count+1)
                where_condition = get_where_string(select) if 'where' in keys else []
                join_condition = extract_join(parser['from'][1:], count+1) if have_join else []
                groupby = format_json({'groupby': parser['groupby']}).removeprefix("groupby") if 'groupby' in keys else []
                having = format_json({'having': parser['having']}).removeprefix("having") if 'having' in keys else []
                orderby = format_json({'orderby': parser['orderby']}).removeprefix("orderby") if 'orderby' in keys else []

                list_select.append({
                    'source_id': count,
                    'source_columns': columns,
                    'source_table': table,
                    'source_join': join_condition,
                    'source_condition': where_condition,
                    "source_groupby": groupby,
                    "source_having": having,
                    "source_orderby": orderby
                    })
            d=list_select

        #/* EXTRACT QUERY | Get source table when does not have UNION */
        else:

            #/* EXTRACT QUERY | Get parameters (Table, Where, Join, Group by, Having, Order by) */
            table = extract_query(parser['from'][0]['value'], count+1) if is_list else extract_query(parser['from']['value'], count+1)
            where_condition = get_where_string(parser['where']) if 'where' in keys else []
            join_condition = extract_join(parser['from'][1:], count+1) if have_join else []
            groupby = format_json({'groupby': parser['groupby']}).removeprefix("groupby") if 'groupby' in keys else []
            having = format_json({'having': parser['having']}).removeprefix("having") if 'having' in keys else []
            orderby = format_json({'orderby': parser['orderby']}).removeprefix("orderby") if 'orderby' in keys else []

            d={
            'source_id': count,
            'source_columns': columns,
            'source_table': table,
            'source_join': join_condition,
            'source_condition': where_condition,
            "source_groupby": groupby,
            "source_having": having,
            "source_orderby": orderby
            }
        return d
    else:
        return get_query(parser, count, is_list, have_join)
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Metadata Output (JSON)                                                            */
#/*               Chooses how is going to build the json according to type of query                */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def get_output_json(parser, query_string):
#/* <START > */

    type_of_query = type_query(parser)
    output = {
        "header": get_header(query_string, type_of_query)
    }

    #/* JSON OUTPUT | Get metadata from create table statement */
    if type_of_query == "create table":
        output.update(extract_create_table(parser))
        return output
    #/* JSON OUTPUT | Get metadata from select statement */
    elif type_of_query == "select":
        output.update({"source": extract_query(parser)})
        return output
    #/* JSON OUTPUT | Get metadata from union statement */
    elif type_of_query == "union" or type_of_query == "union_all":
        list_source=[]
        for select in parser[type_of_query]:
            list_source.append(extract_query(select))
        output.update({"source": list_source})
        return output
    #/* JSON OUTPUT | Get metadata from insert statement */
    elif type_of_query == "insert":
        t_columns, t_table = get_target(parser)
        s_json = extract_query(parser['query'])
        output.update({
            'target_columns': t_columns,
            'target_table': t_table,
            'source': s_json
        })

        return output
    #/* JSON OUTPUT | Get metadata from update statement */
    elif type_of_query == "update":
        output.update(extract_update(parser))
        return output
    #/* JSON OUTPUT | Get metadata from delete statement */
    elif type_of_query == "delete":
        output.update(extract_delete(parser))
        return output
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example of JSON Output - Create Table Statement                                   */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

query_create = '''
CREATE TABLE db_work_dev.ais_areata_tmp_et(
country_sid STRING,
areakey STRING,
areatype STRING,
areaname STRING,
sortname STRING,
isvalid STRING,
stateid STRING
)
'''

m_create = parse(query_create)
print(m_create)

{'create table': {'name': 'db_work_dev.ais_areata_tmp_et', 'columns': [{'name': 'country_sid', 'type': {'string': {}}}, {'name': 'areakey', 'type': {'string': {}}}, {'name': 'areatype', 'type': {'string': {}}}, {'name': 'areaname', 'type': {'string': {}}}, {'name': 'sortname', 'type': {'string': {}}}, {'name': 'isvalid', 'type': {'string': {}}}, {'name': 'stateid', 'type': {'string': {}}}]}}


In [None]:
get_output_json(m_create, query_create)

{'header': {'type_query': 'create table',
  'number_tables': 1,
  'tables': ['db_work_dev.ais_areata_tmp_et'],
  'number_subqueries': 0,
  'contains_join': [],
  'contains_where': []},
 'table': 'db_work_dev.ais_areata_tmp_et',
 'columns': [{'name': 'country_sid', 'type': ['string', ''], 'null': False},
  {'name': 'areakey', 'type': ['string', ''], 'null': False},
  {'name': 'areatype', 'type': ['string', ''], 'null': False},
  {'name': 'areaname', 'type': ['string', ''], 'null': False},
  {'name': 'sortname', 'type': ['string', ''], 'null': False},
  {'name': 'isvalid', 'type': ['string', ''], 'null': False},
  {'name': 'stateid', 'type': ['string', ''], 'null': False}]}

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example of JSON Output - Insert Statement                                         */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

query_insert = '''
INSERT INTO TABLE db_ais_dev.la_ais_areata_et
SELECT
country_sid,
areakey,
areatype,
areaname,
sortname,
isvalid,
stateid,
regexp_replace(substr(CAST(current_timestamp() AS STRING), 1, 10), '-', '') AS business_date
FROM db_work_dev.ais_areata_tmp_et
'''

m_insert = parse(query_insert)
print(m_insert)

{'query': {'select': [{'value': 'country_sid'}, {'value': 'areakey'}, {'value': 'areatype'}, {'value': 'areaname'}, {'value': 'sortname'}, {'value': 'isvalid'}, {'value': 'stateid'}, {'value': {'regexp_replace': [{'substr': [{'cast': [{'current_timestamp': {}}, {'string': {}}]}, 1, 10]}, {'literal': '-'}, {'literal': ''}]}, 'name': 'business_date'}], 'from': 'db_work_dev.ais_areata_tmp_et'}, 'insert': 'db_ais_dev.la_ais_areata_et'}


In [None]:
get_output_json(m_insert, query_insert)

{'header': {'type_query': 'insert',
  'number_tables': 2,
  'tables': ['db_ais_dev.la_ais_areata_et', 'db_work_dev.ais_areata_tmp_et'],
  'number_subqueries': 0,
  'contains_join': False,
  'contains_where': False},
 'target_columns': '',
 'target_table': '',
 'source': {'source_id': 0,
  'source_columns': ['country_sid',
   'areakey',
   'areatype',
   'areaname',
   'sortname',
   'isvalid',
   'stateid',
   "REGEXP_REPLACE(SUBSTR(CAST(CURRENT_TIMESTAMP() AS STRING), 1, 10), '-', '') AS business_date"],
  'source_table': ['db_work_dev.ais_areata_tmp_et', ''],
  'source_join': [],
  'source_condition': [],
  'source_groupby': [],
  'source_having': [],
  'source_orderby': []}}

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example of JSON Output - Select Statement                                         */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

query_select = '''
SELECT ID_AuditInformation,
  ID_Division,
  ID_AbsoluteMargin,
  Margin,
  TaxAmount,
  UsedTaxRate,
  UsedRelativeMargin,
  MarginNegotiatedAsGrossValue,
  regexp_replace(substr(current_timestamp() , 1, 10), '-', '') AS business_date,
  countrycode AS corporate,
  regexp_replace(substr(PartitionDate , 1, 10), '-', '') AS partitiondate
FROM db_work_dev.cpd_absolutemargin_tmp_mt
WHERE PartitionDate > '2023-01-01'
'''

m_select = parse(query_select)
print(m_insert)

{'query': {'select': [{'value': 'country_sid'}, {'value': 'areakey'}, {'value': 'areatype'}, {'value': 'areaname'}, {'value': 'sortname'}, {'value': 'isvalid'}, {'value': 'stateid'}, {'value': {'regexp_replace': [{'substr': [{'cast': [{'current_timestamp': {}}, {'string': {}}]}, 1, 10]}, {'literal': '-'}, {'literal': ''}]}, 'name': 'business_date'}], 'from': 'db_work_dev.ais_areata_tmp_et'}, 'insert': 'db_ais_dev.la_ais_areata_et'}


In [None]:
get_output_json(m_insert, query_select)

{'header': {'type_query': 'insert',
  'number_tables': 1,
  'tables': ['db_work_dev.cpd_absolutemargin_tmp_mt'],
  'number_subqueries': 0,
  'contains_join': False,
  'contains_where': True},
 'target_columns': '',
 'target_table': '',
 'source': {'source_id': 0,
  'source_columns': ['country_sid',
   'areakey',
   'areatype',
   'areaname',
   'sortname',
   'isvalid',
   'stateid',
   "REGEXP_REPLACE(SUBSTR(CAST(CURRENT_TIMESTAMP() AS STRING), 1, 10), '-', '') AS business_date"],
  'source_table': ['db_work_dev.ais_areata_tmp_et', ''],
  'source_join': [],
  'source_condition': [],
  'source_groupby': [],
  'source_having': [],
  'source_orderby': []}}

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Extract functions                                                                 */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def function_into_list(source_columns):
#/* <START > */

    function_list=[]
    if isinstance(source_columns, list):
        source_columns = source_columns
    else:
        source_columns = [source_columns]

    for i in source_columns:
        if '(' in i:
            i = i.split('(')
            for function in i:
                if "(" in function or ")" in function:
                    if 'THEN' in function:
                        function = function.split('THEN')[-1]
                        function_list.append(function)
                    pass
                else:
                    if ' AS ' not in function:
                        function_list.append(function)

    return function_list

#/* EXTRACT FUNCTIONS | Get function from source columns and where clause */
def function_output(json_output):
    try:
        json_query=eval(json_output)
    except:
        json_query=json_output
    list_table_function=[]
    if 'drop' not in str(json_query):
        try:
            if isinstance(json_query['source'], list):
                list_table_function = (function_into_list(json_query['source'][0]['source_columns']))
                if function_into_list(json_query['source']) != []:
                    list_table_function = list_table_function + (function_into_list(json_query['source']['source_table']['source_condition']))
            #/* EXTRACT FUNCTIONS | Get function from source columns and where clause in case of Subquery */
            elif isinstance(json_query['source']['source_table'], list) and json_query['source']['source_table'][1] != '':
                list_table_function = (function_into_list(json_query['source']['source_columns']))
                if function_into_list(json_query['source']) != []:
                    list_table_function = list_table_function + (function_into_list(json_query['source']['source_table']['source_condition']))
            else:
                list_table_function = (function_into_list(json_query['source']['source_columns']))
                if function_into_list(json_query['source']) != []:
                    list_table_function = list_table_function + (function_into_list(json_query['source']['source_condition']))
        except Exception as e:
            print(e)


    return list_table_function
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Extract partitions from the statement (Insert or Create Table)                    */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

import re
def partition_into_list(query):
#/* <START > */

    output_list=[]
    query = query.upper()
    query = query.replace('PARTITIONED BY','PARTITION').replace('string','').replace('PARTITION(','PARTITION (')
    variable = re.findall('PARTITION \(.*?\)',query)
    for var in variable:
        var = var.replace(')','')
        output_list = (var.split('(')[1:][0].split(','))
    return output_list
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Extract variables from the statement based on a regex                             */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def variables_into_list(query, replace, dict_variable, regex = '', var_dict = {}):
#/* <START > */

    if regex != '':

         #/* EXTRACT VARIABLES | Replace variables so parser is executable */
        if replace == "input variables":
            dict_variable = {}
            count = 0
            variable = re.findall(regex,query)

            #/* EXTRACT VARIABLES | To make the dictionary variable */
            for i in variable:
                if i not in dict_variable.keys():
                    count+=1
                    dict_variable[i] = 'var_query_{}'.format(count)
            for word, replacement in dict_variable.items():
                query = query.replace(word, replacement)
            return query,dict_variable

         #/* EXTRACT VARIABLES | Replace variables with a new format to be used */
        if replace == "output variables":
            query=query.replace('VAR_QUERY_','var_query_')
            new_dict = {}
            for value,key in dict_variable.items():
                for str_ini, str_var in var_dict.items():
                    value = value.replace(str_ini.lower(), str_var)
                new_dict.update({value:key})
            dict_variable = eval(str(new_dict))

            for word, replacement in dict_variable.items():
                query = query.replace(replacement, word)
            return query

    else:
        return query
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Functions to prepare the statement and parser the queries                         */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/


#/* PREPARE STATEMENT | Create Statement */
def create_query(query):
#/* <START > */
    create_list=[]

    #/* CLEAN STATEMENT | Specific Clean for the Create Statement */
    if "CREATE" in query:
        for i in query.split():
            if i != ')':
                create_list.append(i)
            else:
                create_list.append(')')
                break

        create_query = ' '.join(create_list)
        final_create_query = create_query.replace(' EXTERNAL ','').replace('IF NOT EXISTS ','')
        try:
            final_create_query = final_create_query.split(';')[1]
        except:
            pass

        #/* CLEAN STATEMENT | Apply Metadata Discovery in the Create Statement */
        final_parse = parse(final_create_query)
        query_json = get_output_json(final_parse, final_create_query)
        final_parse['full query'] = create_query.replace('\n','')

        return query_json

#/* CLEAN STATEMENT | Insert Statement */
def insert_query(query):

    #/* CLEAN STATEMENT | Specific Clean for the Insert Statement */
    query=query.upper().replace('\\n','kxyz').replace('kxyz', ' ')
    insert_list = [[i for i in query.split(';') if "INSERT" in i][0].partition("INSERT")[-1].replace('OVERWRITE','INTO').replace('\n','').replace("PARTITION(","(").replace('overwrite','into').replace('PARTITION ','')]
    insert_list.insert(0,'INSERT')
    final_insert_query = ''.join(insert_list)
    final_insert_query = re.sub(r"\([^()]*\)", ' ', final_insert_query,1)

    #/* CLEAN STATEMENT | Apply Metadata Discovery in the Insert Statement */
    final_parse = parse(final_insert_query)
    query_json = get_output_json(final_parse, final_insert_query)
    final_parse['full query'] = 'INSERT' + [i for i in query.split(';') if "INSERT" in i][0].replace('\n',' ').partition("INSERT")[-1]

    return query_json

#/* CLEAN STATEMENT | Drop Statement */
def drop_query(query):
    try:
        #/* CLEAN STATEMENT | Specific Clean for the Drop Statement */
        full_drop_list = [i for i in query.split(';') if "DROP" in i][0].replace('\n','')
        drop_list = full_drop_list
        #/* CLEAN STATEMENT | Apply Metadata Discovery in the Drop Statement */
        final_parse = parse(drop_list)
        final_parse['full query'] = full_drop_list
        return final_parse
    except:
        pass
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Function to read XML an SQL files to prepare a JSON Output                        */
#/*              This function can be used for other files types                                   */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

class metadata_discovery():
#/* <START > */

    #/* EXTRACT FROM FILE | Metadata Discovery for XML File string */
    def query_xml_file (self, xml_script, source, regex_variable, dict_var, str_var):

        sql_queries = []
        output = []
        source=source.upper()
        variable_list = []
        list_partition  =[]
        list_function = []
        xml = xml_script

        #/* EXTRACT FROM FILE | Preload */
        xml = xml.replace('\r\n',' ').replace('\\r','').replace('\\n','').replace('     ',' ').replace('    ',' ').replace("\"",'').strip().replace('</query>','<query>').replace('\\','').replace('T0 n','')

        #/* EXTRACT FROM FILE | Specific clean by AIS Source */
        if source == 'AIS':
            xml = xml.replace('WHERE $CONDITIONS', '').replace('<arg>', '<arg> ')


        list_xml = xml.split("</arg>")
        for i in list_xml:
            if "--" in i:
                i=re.sub('<!--[^>]+-->', '', i)

            #/* EXTRACT FROM FILE | Check for a SELECT Statement */
            if 'SELECT' in i.upper() and 'FROM' in i.upper():

                #/* EXTRACT FROM FILE | Replace variables with special characters with a specific string */
                i,dict_ = variables_into_list(i,"input variables",None, regex_variable)

                #/* EXTRACT FROM FILE | Preload */
                xml_query = i.replace('n <arg> ',' ').replace('<arg>"','').strip().replace('<arg>','')

                #/* EXTRACT FROM FILE | Specific clean by AIS Source */
                if source.lower() == 'ais':
                  xml_query = xml_query.replace('WHERE $CONDITIONS"','').replace('xc2xa0','').replace('WHERE$CONDITIONS', '')

                #/* EXTRACT FROM FILE | Execute Metadata Discovery in the query*/
                parser1 = parse(xml_query)
                xml_query_json = get_output_json(parser1, xml_query)

                #/* EXTRACT FROM FILE | Return the variables with a specific special characters "str_var" in the JSON output */
                xml_query = variables_into_list(xml_query,"output variables",dict_, regex_variable, dict_var)

                #/* EXTRACT FROM FILE | Get partitions */
                list_partition.append(partition_into_list(xml_query))

                #/* EXTRACT FROM FILE | Save the query */
                sql_queries.append(xml_query)

                #/* EXTRACT FROM FILE | Return the variables with a specific special characters "str_var" */
                json_output = variables_into_list(str(xml_query_json),"output variables",dict_, regex_variable, dict_var)
                output.append(eval(json_output))

                #/* EXTRACT FROM FILE | Get variables */
                variable = re.findall(f'\\{str_var}.*?\\{str_var}',xml_query)
                variable = [i.replace(str_var,'') for i in variable]
                variable = [*set(variable)]
                variable_list.append(variable)

                #/* EXTRACT FROM FILE | Get functions */
                list_function.append(function_output(json_output))

        list_df = {'Query': sql_queries, 'Output_XML': output,'Variables':variable_list, 'Partition':list_partition,'Function':list_function}

        #/* EXTRACT FROM FILE | To Dataframe */
        df_files = pd.DataFrame.from_dict(list_df)

        return df_files

    #/* EXTRACT FROM FILE | Metadata Discovery for SQL File string */
    def query_sql_file (self, sql_script, source, regex_variable, dict_var, str_var):
        sql_queries = []
        output_create_func = []
        output_insert_func = []
        output_query = []
        source=source.upper()
        variable_list = []
        list_partition = []
        list_function = []
        query = sql_script

        #/* EXTRACT FROM FILE | Preload */
        query = query.replace('kxyz', ' ').replace("b'",'').replace('\\t',' ').replace('b"','').replace('\\r','').replace('\\n',' ').replace("'\\\\001'","").replace('\\','').replace("b'",'').replace('\\t',' ').replace('b"','').replace('\\r','').replace('\\\\n',' ').replace("'\\\\001'","").replace('\\','')

        #/* EXTRACT FROM FILE | Specific clean by AIS Source */
        if source == 'AIS':
            query = query.replace('EXTERNAL ','')

        #/* EXTRACT FROM FILE | Replace variables with special characters with a specific string */
        query,dict_ = variables_into_list(query, "input variables", None, regex_variable, dict_var)

        #/* EXTRACT FROM FILE | Return the variables with a specific special characters "str_var" in the query */
        str_query = variables_into_list(query, "output variables", dict_, regex_variable, dict_var)
        output_query.append((str_query))

        str_create = str(create_query(query))

        #/* EXTRACT FROM FILE | Return the variables with a specific special characters "str_var" in the JSON output */
        str_create = variables_into_list(str_create, "output variables", dict_, regex_variable, dict_var)

         #/* EXTRACT FROM FILE | Get functions from CREATE statemente*/
        output_create_func.append(eval(str_create))

         #/* EXTRACT FROM FILE | Preload for Insert statement*/
        str_insert = str(insert_query(query.lower()))

        #/* EXTRACT FROM FILE | Replace variables with special characters with a specific string */
        str_insert = variables_into_list(str_insert,"output variables",dict_, regex_variable, dict_var)

        #/* EXTRACT FROM FILE | Return the variables with a specific special characters "str_var" */
        query = variables_into_list(query, "output variables", dict_, regex_variable, dict_var)

        #/* EXTRACT FROM FILE | Get partition*/
        list_partition.append(partition_into_list(query))

        #/* EXTRACT FROM FILE | Get functions*/
        output_insert_func.append(eval(str_insert))
        variable = re.findall(f'\\{str_var}.*?\\{str_var}',query)
        variable = [i.replace(str_var,'') for i in variable]
        variable = [*set(variable)]
        variable_list.append(variable)

        #/* EXTRACT FROM FILE | Get partition*/
        list_function.append(function_output(str_insert))

        list_df = {'Query': output_query, 'Output_Create_Table': output_create_func,'Output_Insert':output_insert_func,'Variables':variable_list, 'Partition': list_partition,'Function':list_function}

        #/* EXTRACT FROM FILE | To Dataframe */
        df_files = pd.DataFrame.from_dict(list_df)

        return df_files

    #/* EXTRACT FROM FILE | Create a dataframe with information for configure a feed, uses Input (XML and SQL file)*/
    def config_file(self, df):
        df_feed = pd.DataFrame()

        #/* EXTRACT FROM FILE | Name for the feed*/
        df_feed['string_1']  = 'Copy_' + df['Output_Insert'].str['header'].str['tables'].str[0].str.split('.').str[0].str.upper() + '_' + df['Output_Insert'].str['header'].str['tables'].str[0].str.split('.').str[1].str.replace('LA_','').str.replace('AIS_','').str.replace('_ET','') + '_999'

        #/* EXTRACT FROM FILE | Schema name*/
        df_feed['string_2'] = df['Output_XML'].str['header'].str['tables'].str[0].str.rsplit('.', n=1).str.get(0)

        #/* EXTRACT FROM FILE | Table Name*/
        df_feed['string_3']  = df['Output_XML'].str['header'].str['tables'].str[0].str.rsplit('.', n=1).str.get(1).str.upper()

        #/* EXTRACT FROM FILE | Filter*/
        df_feed['string_4'] = df['Output_Insert'].str['source'].str['source_condition'].astype(str).str.replace('\[','', regex=True).str.replace('\]','', regex=True)

        #/* EXTRACT FROM FILE | Partition*/
        df_feed['string_5'] = df['Partition_sql'].astype(str).str.lower()

        #/* EXTRACT FROM FILE | Format mapping output*/
        '''[

        ["target": column_a
        "source": [column_c, column_d]
        "mapping": CONCAT(column_c, column_d)],

        ["target": column_a],

        ...
        ]'''

        list_mapping=[]
        #/* EXTRACT FROM FILE | Loop by row (table) in the dataframe*/
        for index, row in df.iterrows():
            list_column_target  = row['Output_Insert']['source']['source_columns']
            list_column_source  = row['Output_XML']['source']['source_columns']
            column_mapping = []

            #/* EXTRACT FROM FILE | Loop by column in target select*/
            for c_target in list_column_target:
                target  = ''
                source  = []
                mapping = ''

                #/* EXTRACT FROM FILE | If have 'AS' in a column selected*/
                if ' AS ' in c_target:
                    target = c_target.rpartition(' AS ')[2].strip()

                    #Replace HIVE functions to SQL functions
                    mapping = c_target.rpartition(' AS ')[0].strip().replace('REGEXP_REPLACE','REPLACE').replace('SUBSTR', 'SUBSTRING')

                    #/* EXTRACT FROM FILE | Loop for source columns*/
                    for c_source in list_column_source:
                        if c_source in mapping:
                            source.append(c_source.lower())

                    d={"target": target.lower(), "source": source, "mapping": mapping}

                #/* EXTRACT FROM FILE | Else, just considered target*/
                else:
                    d = {"target": c_target.lower()}

                #/* EXTRACT FROM FILE | Mapping per column*/
                column_mapping.append(d)

            #/* EXTRACT FROM FILE | Mapping per file (row in dataframe)*/
            list_mapping.append(column_mapping)

        df_feed['string_6'] = list_mapping
        list_json = []
        for index, row in df_feed.iterrows():
            list_json.append(json.dumps(row['string_6'], ensure_ascii=False))
        df_feed['string_6'] = list_json

        #/* EXTRACT FROM FILE | Dependecies*/
        df_feed['string_7'] = df['Output_Insert'].str['header'].str['tables'].str[1:]

        return df_feed
#/* <END > */

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Auxiliar Parameters for the Example                                               */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

str_var = "§"
dict_var = {
    "${":str_var,
    "}":str_var
}
regex_variable = '\$\{.*?\}'

In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Import from colab to read files                                                   */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

from google.colab import drive
from google.colab import files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example - Read SQL and XML Files from a Path                                      */
#/*                      - This part needs to be adapt for each environment                        */
#/*                      - Get the list of paths to read                                           */
#/*                      - Loop to read each file and apply metadata discovery                     */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

md = metadata_discovery()
source = 'ais'

#/* EXTRACT FROM FILE | Example - Read SQL filess and apply metadata discovery*/
df_sql = pd.DataFrame()
import os
path = "SQL"
sql_in_path = os.listdir(path)
print('Reading SQL Files...')
for sql_file in sql_in_path:
    if 'ipynb_checkpoints' not in sql_file:
      print(sql_file)
      file = open(f'{path}/{sql_file}',"r")
      sql_query = file.read()
      df = md.query_sql_file(sql_query, source, regex_variable, dict_var, str_var)
      df['File_Location'] = f'{path}/{sql_file}'
      df['File_Name']     = sql_file.rsplit('/',1)[-1]
      df_sql = pd.concat([df_sql,df], ignore_index=True)

#/* EXTRACT FROM FILE | Example - Read XML files and apply metadata discovery*/
df_xml = pd.DataFrame()
path = "XML"
xml_in_path = os.listdir(path)
print('Reading XML Files...')
for xml_file in xml_in_path:
    if 'ipynb_checkpoints' not in xml_file:
      print(xml_file)
      file = open(f'{path}/{xml_file}',"r")
      xml_query = file.read()
      df = md.query_xml_file(xml_query, source, regex_variable, dict_var, str_var)
      df['File_Location'] = f'{path}/{xml_file}'
      df['File_Name']     = xml_file.rsplit('/',1)[-1]
      df_xml = pd.concat([df_xml,df], ignore_index=True)


Reading SQL Files...
process_extracted_ais_legalholidayspecialopenings.sql
process_extracted_ais_division.sql
process_extracted_ais_countries.sql
process_extracted_ais_objstatu.sql
process_extracted_ais_objdata.sql
process_extracted_ais_areata.sql
Reading XML Files...
wf_extract_ais_objdata.xml
wf_extract_ais_division.xml
wf_extract_ais_legalholidayspecialopenings.xml
wf_extract_ais_countries.xml
wf_extract_ais_objstatu.xml
wf_extract_ais_areata.xml


In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example - In this case, the process works a SELECT query for the source e         */
#/*                      and then CREATE a temporary tabl                                          */
#/*                      - After that, uses the temporary table to INSERT into a table             */
#/*                      - However, this could change for each source, client and etc.             */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

#/* PREPARE CSV FILE | Create a column with table name*/
df_sql['Table_Name'] = df_sql['File_Name'].str.replace('process_extracted_','',regex=True).str.replace('ais_','').str.replace('.sql','',regex=True).str.upper()
df_xml['Table_Name'] = df_xml['File_Name'].str.replace('wf_extract_','',regex=True).str.replace('ais_','').str.replace('.xml','',regex=True).str.upper()

#/* PREPARE CSV FILE | Join dataframe of SQL Files an XML Files*/
df = pd.merge(df_sql, df_xml, how='left', on='Table_Name', suffixes = ('_sql', '_xml'))
df['Partition'] = df['Partition_sql'] + df['Partition_xml']
df['Function'] = df['Function_sql'] + df['Function_xml']
df['Variables'] = df['Variables_sql'] + df['Variables_xml']

In [None]:
#/* EXTRACT FROM FILE | File with information of output of Metadata Discovery*/

path = 'output_ais.csv'
with open(path, 'w', encoding = 'utf-8-sig') as f:
  df.to_csv(f)
df

Unnamed: 0,Query_sql,Output_Create_Table,Output_Insert,Variables_sql,Partition_sql,Function_sql,File_Location_sql,File_Name_sql,Table_Name,Query_xml,Output_XML,Variables_xml,Partition_xml,Function_xml,File_Location_xml,File_Name_xml,Partition,Function,Variables
0,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]",SQL/process_extracted_ais_legalholidayspecialo...,process_extracted_ais_legalholidayspecialopeni...,LEGALHOLIDAYSPECIALOPENINGS,"SELECT country_sid, objno, legalholidaydate, ...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_legalholidayspecialopenings...,wf_extract_ais_legalholidayspecialopenings.xml,[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]","[DATALAKE_ENV, aisSchema]"
1,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]",SQL/process_extracted_ais_division.sql,process_extracted_ais_division.sql,DIVISION,"SELECT country_sid, divno, street1, street2, ...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_division.xml,wf_extract_ais_division.xml,[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]","[DATALAKE_ENV, aisSchema]"
2,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]",SQL/process_extracted_ais_countries.sql,process_extracted_ais_countries.sql,COUNTRIES,"SELECT country_sid, country, description, tel...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_countries.xml,wf_extract_ais_countries.xml,[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]","[DATALAKE_ENV, aisSchema]"
3,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]",SQL/process_extracted_ais_objstatu.sql,process_extracted_ais_objstatu.sql,OBJSTATU,"SELECT country_sid, objno, validfrom, validto...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_objstatu.xml,wf_extract_ais_objstatu.xml,[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]","[DATALAKE_ENV, aisSchema]"
4,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[CASE WHEN COUNTRY_SID NOT IN , MASK_SHOW_LAS...",SQL/process_extracted_ais_objdata.sql,process_extracted_ais_objdata.sql,OBJDATA,"SELECT country_sid, objno, validfrom, validto...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_objdata.xml,wf_extract_ais_objdata.xml,[BUSINESS_DATE],"[CASE WHEN COUNTRY_SID NOT IN , MASK_SHOW_LAS...","[DATALAKE_ENV, aisSchema]"
5,CREATE TABLE IF NOT EXISTS db_work_§DATALAKE_E...,"{'header': {'type_query': 'create table', 'num...","{'header': {'type_query': 'insert', 'number_ta...",[DATALAKE_ENV],[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]",SQL/process_extracted_ais_areata.sql,process_extracted_ais_areata.sql,AREATA,"SELECT country_sid, areakey, areatype, areana...","{'header': {'type_query': 'select', 'number_ta...",[aisSchema],[],[],XML/wf_extract_ais_areata.xml,wf_extract_ais_areata.xml,[BUSINESS_DATE],"[REGEXP_REPLACE, SUBSTR, CAST, CURRENT_TIMESTAMP]","[DATALAKE_ENV, aisSchema]"


In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Example - Create the excel configuration file with DMaaP Columns for Azure        */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

#/* PREPARE EXCEL FILE | Prepare Excel File*/
df_config = md.config_file(df)

#/* PREPARE EXCEL FILE | This Dict could be a JSON or Excel file and Created example for AWS and GCP*/
dict_column_name = {
    "dmaap_azure" : {
        "string_1":"Feed_Name",
        "string_2":"Source_Location_Name",
        "string_3":"Source_Object_Name",
        "string_4":"Source_Filters",
        "string_5":"Partition_Level_Field",
        "string_6":"Map_Exceptions",
        "string_7":"Dependecies"
    },
    "dmaap_aws" : {
        "string_1":"Feed_Name",
        "string_2":"Location",
        "string_3":"Object",
        "string_4":"Filters",
        "string_5":"Partition",
        "string_6":"Mapping",
        "string_7":"Dep"
    },
    "dmaap_gcp" : {
        "string_1":"feed",
        "string_2":"loc_name",
        "string_3":"obj_name",
        "string_4":"filter",
        "string_5":"partition",
        "string_6":"mapping",
        "string_7":"depen"
    }
}

#/* PREPARE EXCEL FILE | Mapping columns for DMaaP Azure*/
df_config.rename(columns=dict_column_name['dmaap_azure'], inplace=True)

In [None]:
#/* PREPARE EXCEL FILE | Configuration file with columns of DMaaP Azure*/
df_config.to_excel('config_feed_ais.xlsx', sheet_name = 'AIS')
df_config

Unnamed: 0,Feed_Name,Source_Location_Name,Source_Object_Name,Source_Filters,Partition_Level_Field,Map_Exceptions,Dependecies
0,Copy_DB_AIS_§DATALAKE_ENV§_LEGALHOLIDAYSPECIAL...,§aisSchema§.dbo,LEGALHOLIDAYSPECIALOPENINGS,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""objno""...",[DB_WORK_§DATALAKE_ENV§.AIS_LEGALHOLIDAYSPECIA...
1,Copy_DB_AIS_§DATALAKE_ENV§_DIVISION_999,§aisSchema§.dbo,DIVISION,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""divno""...",[DB_WORK_§DATALAKE_ENV§.AIS_DIVISION_TMP_ET]
2,Copy_DB_AIS_§DATALAKE_ENV§_COUNTRIES_999,§aisSchema§.dbo,COUNTRIES,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""countr...",[DB_WORK_§DATALAKE_ENV§.AIS_COUNTRIES_TMP_ET]
3,Copy_DB_AIS_§DATALAKE_ENV§_OBJSTATU_999,§aisSchema§.dbo,OBJSTATU,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""objno""...",[DB_WORK_§DATALAKE_ENV§.AIS_OBJSTATU_TMP_ET]
4,Copy_DB_AIS_§DATALAKE_ENV§_OBJDATA_999,§aisSchema§.dbo,OBJDATA,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""objno""...",[DB_WORK_§DATALAKE_ENV§.AIS_OBJDATA_TMP_ET]
5,Copy_DB_AIS_§DATALAKE_ENV§_AREATA_999,§aisSchema§.dbo,AREATA,,['business_date'],"[{""target"": ""country_sid""}, {""target"": ""areake...",[DB_WORK_§DATALAKE_ENV§.AIS_AREATA_TMP_ET]


In [None]:
#/*------------------------------------------------------------------------------------------------*/
#/*                                           CF Metadata Discovery                                */
#/*------------------------------------------------------------------------------------------------*/
#/* Description: Read the Dataframe with partition, functions and variable list and                */
#/*              returns the percentage for each                                                   */
#/*                                                                                                */
#/* Creation User: Celfocus                                                                        */
#/* Creation Date: 01/06/2023                                                                      */
#/*                                                                                                */
#/* Last Update User: Celfocus                                                                     */
#/* Last Update Date: 03/07/2023                                                                   */
#/*                                                                                                */
#/*------------------------------ CELFOCUS | Proprietary & Confidential ---------------------------*/

def variables_analysis(df,statistics):
  frames=[]
  df['File_Name'] = df['File_Name_sql']

  #/* STATISTICS | Check if have more than 1 row*/
  if isinstance(df,list):
      for dataframes in df:
          frames.append(dataframes[[statistics,'File_Name']])
          result = pd.concat(frames)
  else:
      result = df
      source = df['File_Name'][0].split('_')[-2]

  #/* STATISTICS | Remove duplicates and explode the column*/
  result.loc[result.astype(str).drop_duplicates().index]
  result = result.explode(statistics).reset_index()

  result.reset_index(drop=True)
  table_name_list = []

  #/* STATISTICS | For each row*/
  for i in result['File_Name']:
      source = i.split('_')[-2]

      #/* STATISTICS | For each row remove the extension*/
      if re.findall('_{}.*?.xml'.format(source),i):
          table_name = re.findall('{}.*?.xml'.format(source),i)
          table_name = table_name[0].replace('.xml','')
          table_name_list.append(table_name)

      #/* STATISTICS | For each row remove the extension*/
      elif re.findall('_{}.*?.sql'.format(source),i):
          table_name = re.findall('{}.*?.sql'.format(source),i)
          table_name = table_name[0].replace('.sql','')
          table_name_list.append(table_name)

  #/* STATISTICS | Calculate the percentage and returns it in a column*/
  number_tables = len(list(dict.fromkeys(table_name_list)))
  result['File_Name'] = table_name_list
  result=result[['File_Name',statistics]]
  result = result.drop_duplicates()
  df=result.groupby(by=statistics, as_index = False).count()
  df['Percentage'] =  df['File_Name'].astype(int).div(number_tables) * 100
  df = df.drop('File_Name', axis=1)
  return df

In [None]:
variables_analysis(df,"Function")

Unnamed: 0,Function,Percentage
0,MASK_SHOW_LAST_N,16.666667
1,CASE WHEN COUNTRY_SID NOT IN,16.666667
2,CAST,100.0
3,CURRENT_TIMESTAMP,100.0
4,REGEXP_REPLACE,100.0
5,SUBSTR,100.0


In [None]:
variables_analysis(df,"Variables")

Unnamed: 0,Variables,Percentage
0,DATALAKE_ENV,100.0
1,aisSchema,100.0


In [None]:
variables_analysis(df,"Partition")

Unnamed: 0,Partition,Percentage
0,BUSINESS_DATE,100.0
