In [0]:
%load_ext autoreload
%autoreload 2
%reload_ext autoreload

import sys
import inspect
import configparser
import json
import logging
import uuid
import os

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    DoubleType,
    IntegerType,
)
from pyspark.sql.functions import length, lit


def input_values():
    """Retrieve input values from widgets.
    Returns:
        tuple: A tuple containing the input values and a status flag.
    """
    try:
        dbutils.widgets.text("sr_proceso", "")
        dbutils.widgets.text("sr_subproceso", "")
        dbutils.widgets.text("sr_subetapa", "")
        dbutils.widgets.text("sr_origen_arc", "")
        dbutils.widgets.text("sr_dt_org_arc", "")
        dbutils.widgets.text("sr_folio", "")
        dbutils.widgets.text("sr_id_archivo", "")
        dbutils.widgets.text("sr_tipo_layout", "")
        dbutils.widgets.text("sr_instancia_proceso", "")
        dbutils.widgets.text("sr_usuario", "")
        dbutils.widgets.text("sr_etapa", "")
        dbutils.widgets.text("sr_id_snapshot", "")
        dbutils.widgets.text("sr_paso", "")

        sr_proceso = dbutils.widgets.get("sr_proceso")
        sr_subproceso = dbutils.widgets.get("sr_subproceso")
        sr_subetapa = dbutils.widgets.get("sr_subetapa")
        sr_origen_arc = dbutils.widgets.get("sr_origen_arc")
        sr_dt_org_arc = dbutils.widgets.get("sr_dt_org_arc")
        sr_folio = dbutils.widgets.get("sr_folio")
        sr_id_archivo = dbutils.widgets.get("sr_id_archivo")
        sr_tipo_layout = dbutils.widgets.get("sr_tipo_layout")
        sr_instancia_proceso = dbutils.widgets.get("sr_instancia_proceso")
        sr_usuario = dbutils.widgets.get("sr_usuario")
        sr_etapa = dbutils.widgets.get("sr_etapa")
        sr_id_snapshot = dbutils.widgets.get("sr_id_snapshot")
        sr_paso = dbutils.widgets.get("sr_paso")

        if any(
            len(str(value).strip()) == 0
            for value in [
                sr_proceso,
                sr_subproceso,
                sr_subetapa,
                sr_origen_arc,
                sr_dt_org_arc,
                sr_folio,
                sr_id_archivo,
                sr_tipo_layout,
                sr_instancia_proceso,
                sr_usuario,
                sr_etapa,
                sr_id_snapshot,
                sr_paso,
            ]
        ):
            logger.error("Function: %s", inspect.stack()[0][3])
            logger.error("Some of the input values are empty or null")
            return "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0"
    except Exception as e:
        logger.error("Function: %s", inspect.stack()[0][3])
        logger.error("An error was raised: %s", str(e))
        return "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0", "0"
    return (
        sr_proceso,
        sr_subproceso,
        sr_subetapa,
        sr_origen_arc,
        sr_dt_org_arc,
        sr_folio,
        sr_id_archivo,
        sr_tipo_layout,
        sr_instancia_proceso,
        sr_usuario,
        sr_etapa,
        sr_id_snapshot,
        sr_paso,
        "1",
    )


def conf_process_values(config_file, process_name):
    """Retrieve process configuration values from a config file.
    Args:
        config_file (str): Path to the configuration file.
        process_name (str): Name of the process.
    Returns:
        tuple: A tuple containing the configuration values and a status flag.
    """
    try:
        config = configparser.ConfigParser()
        config.read(config_file)
        # Subprocess configurations
        #
        sql_conf_file = config.get(process_name, "sql_conf_file")
        conn_schema_001 = config.get(process_name, "conn_schema_001")
        table_001 = config.get(process_name, "table_001")
        conn_schema_002 = config.get(process_name, "conn_schema_002")
        table_006 = config.get(process_name, "table_006")
        global_temp_view_001 = config.get(process_name, "global_temp_view_001")
        global_temp_view_002 = config.get(process_name, "global_temp_view_002")
        # Unity
        debug = config.get(process_name, "debug")
        debug = debug.lower() == "true"
        catalog_name = config.get(process_name, "catalog_name")
        schema_name = config.get(process_name, "schema_name")

    except Exception as e:
        logger.error("Function: %s", inspect.stack()[0][3])
        logger.error("An error was raised: " + str(e))
        return (
            "0",
            "0",
            "0",
            "0",
            "0",
            "0",
            "0",
            "0",
            #unity
            "0",
            "0",
            "0",
        )
    return (
        sql_conf_file,
        conn_schema_001,
        table_001,
        conn_schema_002,
        table_006,
        global_temp_view_001,
        global_temp_view_002,
        # unity
        debug,
        catalog_name,
        schema_name,
        "1",
    )


if __name__ == "__main__":
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    notebook_name = (
        dbutils.notebook.entry_point.getDbutils()
        .notebook()
        .getContext()
        .notebookPath()
        .get()
    )
    message = "NB Error: " + notebook_name
    source = "ETL"
    current_dir = os.getcwd()
    root_repo = current_dir[:current_dir.find('MITAFO') + 6]

    try:
        sys.path.append(root_repo + "/" + "CGRLS_0010/Notebooks")
        from NB_GRLS_DML_FUNCTIONS import *
        from NB_GRLS_SIMPLE_FUNCTIONS import *
    except Exception as e:
        logger.error("Error at the beggining of the process")
        logger.error("An error was raised: " + str(e))

    config_file = root_repo + "/" + "CGRLS_0010/Conf/CF_GRLS_PROCESS.py.properties"
    config_conn_file = root_repo + "/" + "CGRLS_0010/Conf/CF_GRLS_CONN.py.properties"
    config_process_file = (
        root_repo
        + "/"
        + "ANCIN_0030/Jobs/01_IDENTIFICACION_DE_CLIENTE/01_DISPERSIONES/Conf/CF_PART_PROC_UNITY.py.properties"
    )

    (
        sr_proceso,
        sr_subproceso,
        sr_subetapa,
        sr_origen_arc,
        sr_dt_org_arc,
        sr_folio,
        sr_id_archivo,
        sr_tipo_layout,
        sr_instancia_proceso,
        sr_usuario,
        sr_etapa,
        sr_id_snapshot,
        sr_paso,
        failed_task,
    ) = input_values()
    if failed_task == "0":
        logger.error("Please review log messages")
        # notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("An error ocurred, check out log messages")

    input_parameters = dbutils.widgets.getAll().items()

    process_name = "root"
    webhook_url, channel, failed_task = conf_init_values(
        config_file, process_name, "IDC"
    )
    if failed_task == "0":
        logger.error("Please review log messages")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Process ends")

    process_name = "root"
    (
        sql_conf_file,
        conn_schema_001,
        table_001,
        conn_schema_002,
        table_006,
        global_temp_view_001,
        global_temp_view_002,
        # unity
        debug,
        catalog_name,
        schema_name,
        failed_task,
    ) = conf_process_values(config_process_file, process_name)
    if failed_task == "0":
        logger.error("Please review log messages")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Process ends")

    conn_name_ora = "jdbc_oracle"
    (
        conn_options,
        conn_aditional_options,
        conn_user,
        conn_key,
        conn_url,
        scope,
        failed_task
    ) = conf_conn_values(config_conn_file, conn_name_ora)
    if failed_task == "0":
        logger.error("Please review log messages")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Process ends")

    sql_conf_file = (
        root_repo
        + "/"
        + "/ANCIN_0030/Jobs/01_IDENTIFICACION_DE_CLIENTE/01_DISPERSIONES/JSON/"
        + sql_conf_file
    )

In [0]:
with open(sql_conf_file) as f:
    file_config_sql = json.load(f)

conf_values = [ (fields['step_id'], '\n'.join(fields['value'])) for line, value in file_config_sql.items() if line == 'steps' for fields in value ]

In [0]:
#conn_schema_001 : CIERREN_ETL
#table_001 : TTSISGRAL_ETL_PRE_DISPERSION
#conn_schema_002 : CIERREN
#table_006 : TTCRXGRAL_FOLIO

table_name_001 = conn_schema_001 + '.' + table_001
table_name_002 = conn_schema_002 + '.' + table_006

query_statement = '001'

params = [sr_folio]

statement, failed_task = getting_statement(conf_values, query_statement, params)

print(statement)

if failed_task == '0':
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("Process ends")

In [0]:
# df, failed_task = query_table(conn_name_ora, spark, statement,  conn_options, conn_user, conn_key)

##################################################
# Bloque 1 de 1
#
#Para pruebas con Joan, comentar la linea de arriba y descomentar el siguiente bloque y poner los datos proporcionados por Ã©l
#
##################################################

df = spark.read \
    .format("jdbc") \
    .option("url","jdbc:oracle:thin:CIERREN_ETL_APP/P4$$w0rd**@//10.16.91.14:1521/MITAFODEV.snashqdbsy02.vcnashgrnvcnqa.oraclevcn.com") \
    .option("user", "CIERREN_ETL_APP") \
    .option("password", "P4$$w0rd**") \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .option("query", statement) \
    .load()


# df.cache()

if failed_task == '0':
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(df)