## Great Expectations - Calidad de Datos
Descripción: Permite aplicar validaciones sobre un conjunto de datos usando el Framework GX<br />

https://legacy.016.docs.greatexpectations.io/

##### Autor: jhasaren | 24/Oct/2024

#### Instalación de Dependencias

In [None]:
%pip install great-expectations==0.18.21
%pip install pyspark

### Importación de librerias

In [None]:
# Librerias Generales
# =======================================================
import pandas as pd # type: ignore
import numpy as np # type: ignore
from datetime import date, datetime, timedelta
import pytz # type: ignore
import os
import warnings
import json
warnings.filterwarnings("ignore")

# Librerías PySpark
#=========================
from pyspark.sql import SparkSession # type: ignore
from pyspark.sql.functions import * # type: ignore
from pyspark.sql.types import * # type: ignore

# Crea sesión de Spark
spark = SparkSession \
    .builder \
    .master("local")\
    .appName("Python Spark") \
    .getOrCreate()


# Librerías GX - QA
#=========================
import great_expectations as gx # type: ignore
from great_expectations.checkpoint import SimpleCheckpoint # type: ignore
from great_expectations.render.renderer import ValidationResultsPageRenderer, ProfilingResultsPageRenderer # type: ignore
from great_expectations.render.view import DefaultJinjaPageView # type: ignore
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler # type: ignore
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset # type: ignore


In [None]:
# Revisión de Versiones
print('GX Version :'+gx.__version__)
print('PySpark Version :'+spark.sparkContext.version)

### Definición de Funciones y Parámetros

In [None]:
# Fecha Actual
# ==================================================

# Timezone (UTC-5)
tzInfo = pytz.timezone('America/Bogota')

timestamp = datetime.now(tz=tzInfo).strftime('%Y-%m-%d %H:%M:%S')
today = datetime.now(tz=tzInfo).date().strftime('%Y%m%d')
year = datetime.now(tz=tzInfo).date().strftime('%Y')
month = datetime.now(tz=tzInfo).date().strftime('%m')
hour = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')

today


In [7]:
# Configura directorio de GX
# ==================================================

# Asigna carpeta para el contexto
context_root_dir = "./qa_gx/clientes"
context = gx.get_context(context_root_dir=context_root_dir)

# Configura el ID de ejecución
run_id = {
    "run_name": "Ejecución QA - Clientes CRM",
    "run_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}


## Ingesta de datos

#### Clientes CRM

In [8]:
# Lee BD Clientes

clientes_crm = (spark.read.format("csv")
                .option("delimiter", ",")
                .option("header", "True")
                .load("./qa_gx/clientes/CRM/BD_Clientes_CRM.csv"))\
                .withColumn("TELEFONO", col("TELEFONO").cast(StringType()))

In [None]:
# Previsualización
clientes_crm.show(50)

#### Clientes ERP

In [10]:
# Lee BD Clientes ERP

clientes_erp = (spark.read.format("csv")
                .option("delimiter", ",")
                .option("header", "True")
                .load("./qa_gx/clientes/ERP/BD_Clientes_ERP.csv"))\
                .withColumn("TELEFONO_ERP", col("TELEFONO_ERP").cast(StringType()))

In [None]:
# Previsualización
clientes_erp.show(50)

## Procesamiento

In [None]:
# Cruza las BD de Clientes por el ID del Cliente
# =============================================================

# Realiza un left join
df_crm_erp = clientes_crm.join(clientes_erp, clientes_crm['ID_CLIENTE_CRM'] == clientes_erp['ID_UNICO_ERP'], how="left")

# Seleccionar y renombrar algunas columnas
df_crm_erp = df_crm_erp.select(
    df_crm_erp.ID_CLIENTE_CRM.alias("ID_CLIENTE_CRM"),
    df_crm_erp.NOMBRE_CLIENTE.alias("NOMBRE_CLIENTE_CRM"),
    df_crm_erp.TELEFONO.alias("TELEFONO_CRM"),
    df_crm_erp.EMAIL.alias("EMAIL_CRM"),
    df_crm_erp.NOMBRE_CLIENTE_ERP.alias("NOMBRE_CLIENTE_ERP"),
    df_crm_erp.TELEFONO_ERP.alias("TELEFONO_ERP")
)

# Previsualización
df_crm_erp.show(50)

In [None]:
df_crm_erp.count()

In [None]:
# Convertir el DataFrame de Spark a un DataFrame de Great Expectations
# =============================================================

df_ge = gx.dataset.SparkDFDataset(df_crm_erp)
print(type(df_ge))


#### Completitud

Se evaluará si los campos obligatorios (nombre, dirección de correo electrónico, número de teléfono) están completos. Se espera que el 80% de los datos cumplan la característica.

In [15]:
# Expectativas para el conjunto de datos (Reglas de validación)
# URL: https://greatexpectations.io/expectations/
# =============================================================

# Columnas no nulas, se tolera que máximo el 20% de los registros esten vacíos
result_1 = df_ge.expect_column_values_to_not_be_null(
    column="NOMBRE_CLIENTE_CRM",
    mostly=0.20,
    meta={"notes": "Tipo de Caracteristica: completitud"}
)

# Columnas no nulas, se tolera que máximo el 20% de los registros esten vacíos
result_2 = df_ge.expect_column_values_to_not_be_null(
    column="TELEFONO_CRM",
    mostly=0.20,
    meta={"notes": "Tipo de Caracteristica: completitud"}
)

# Columnas no nulas, se tolera que máximo el 20% de los registros esten vacíos
result_3 = df_ge.expect_column_values_to_not_be_null(
    column="EMAIL_CRM",
    mostly=0.20,
    meta={"notes": "Tipo de Caracteristica: completitud"}
)


#### Exactitud (sintáctica)

Se evaluará si las direcciones de correo electrónico y los números de teléfono siguen el formato adecuado. Se espera que el 90% de los datos cumplan la característica.

<B>Cálculo para correos electrónicos:</B> Verificar que el formato del correo electrónico sea nombre@dominio.extensión<br>
<B>Cálculo para teléfonos:</B> Verificar que los números de teléfono sigan el formato nacional (por ejemplo, 10 números sin espacios ni caracteres especiales y que inicien por 3).

In [16]:
# Expectativas para el conjunto de datos (Reglas de validación)
# URL: https://greatexpectations.io/expectations/
# =============================================================

# Se espera valores en la columna EMAIL tengan el formato nombre@dominio.extensión. 
# Se tolera que máximo el 10% no cumplan
result_4 = df_ge.expect_column_values_to_match_regex(
    column="EMAIL_CRM", 
    regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
    mostly=0.10,
    meta={"notes": "Tipo de Caracteristica: exactitud sintáctica"}
)

# Se espera valores en la columna TELEFONO tengan 10 números 
# sin espacios ni caracteres especiales y que inicien por 3. 
# Se tolera que máximo el 10% no cumplan
result_5 = df_ge.expect_column_values_to_match_regex(
    column="TELEFONO_CRM", 
    regex=r'^3\d{9}$',
    mostly=0.10,
    meta={"notes": "Tipo de Caracteristica: exactitud sintáctica"}
)


#### Consistencia

Se evaluará si los números de teléfono almacenados en el sistema CRM coinciden con los números almacenados en el sistema ERP. Se espera que el 85% de los datos cumplan la característica.

In [17]:
# Expectativas para el conjunto de datos (Reglas de validación)
# URL: https://greatexpectations.io/expectations/
# =============================================================

# Se espera que el TELEFONO sea igual en ambos sistemas. 
# Se espera que el 85% sean consistentes
result_6 = df_ge.expect_column_pair_values_to_be_equal(
    column_A="TELEFONO_CRM",
    column_B="TELEFONO_ERP",
    mostly=0.80,
    meta={"notes": "Tipo de Caracteristica: consistencia"}
)


## Resultados

In [None]:
# Guardar las expectativas en la suite
# =============================================================

# Nombre de la Suite de Reglas
suite_name = run_id['run_name']

# Guardar las expectativas en un archivo JSON
expectation_suite = df_ge.get_expectation_suite(suite_name)

output_file_path = "./qa_gx/clientes/Resultado/"+today+"_"+expectation_suite['expectation_suite_name']+".json"

# Abrir el archivo en modo escritura
with open(output_file_path, 'w') as json_file:
    # Escribir los datos en el archivo en formato JSON
    json.dump(expectation_suite.to_json_dict(), json_file, indent=4)  # 'indent' para formatear con sangrías

# Copiar el archivo JSON al proyecto gx para usos futuros
os.popen("cp ./qa_gx/clientes/Resultado/"+today+"_"+expectation_suite['expectation_suite_name']+".json ./qa_gx/gx/expectations/"+expectation_suite['expectation_suite_name']+".json")


In [19]:
# Consolidación de Resultados
# =============================================================

# Validar el DataFrame completo
validation_results = df_ge.validate(
    run_id=run_id,  # Nombre del run o ejecución 
)

# Imprimir resultados
# print(validation_results)

In [None]:
# Renderizar el reporte
renderer = ValidationResultsPageRenderer()
rendered_content = renderer.render(validation_results)

# Convertirlo en HTML
html_content = DefaultJinjaPageView().render(rendered_content)

# Guardar el archivo HTML
output_path = "./qa_gx/clientes/Resultado/"+today+"_qa_clientes_crm_1.html"
with open(output_path, "w") as file:
    file.write(html_content)

print(f"Reporte guardado en {output_path}")