#============================================
#Great Expectations - DATA QUALITY
#Grupo 5
#============================================
Integrantes:
 - Karina R. Romero Flores
 - Juan Marcos Miranda Nina
 - Ever Soto
 - Marcelo De La Quintana
 - Sharon Calcina
 - Roni Oyardo


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Imputer, StandardScaler, VectorAssembler
from pyspark.ml.stat import Correlation

# Inicializar Spark
spark = SparkSession.builder \
    .appName("DataQualityExample") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Cargar datos
df_spark = spark.read.option("header", "true").csv("ecommerce_dirty.csv", inferSchema=True)
df_spark.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



In [None]:
# analisis de calidad

def spark_quality_analysis(df):
    """Análisis de calidad para Spark DataFrame"""
    print("=== ANÁLISIS DE CALIDAD SPARK ===")

    # Conteo de registros
    print(f"Total registros: {df.count()}")
    print(f"Total columnas: {len(df.columns)}")

    # Análisis de nulls por columna
    null_counts = df.select([
        count(when(isnan(c) | col(c).isNull(), c)).alias(c)
        for c in df.columns
    ]).collect()[0]

    print("\nVALORES NULOS POR COLUMNA:")
    for col_name, null_count in null_counts.asDict().items():
        if null_count > 0:
            percentage = (null_count / df.count()) * 100
            print(f"{col_name}: {null_count} ({percentage:.2f}%)")

    # Duplicados
    total_rows = df.count()
    unique_rows = df.distinct().count()
    duplicates = total_rows - unique_rows
    print(f"\nRegistros duplicados: {duplicates}")

    return df

spark_quality_analysis(df_spark)

=== ANÁLISIS DE CALIDAD SPARK ===
Total registros: 541909
Total columnas: 8

VALORES NULOS POR COLUMNA:
Description: 1454 (0.27%)
CustomerID: 135080 (24.93%)

Registros duplicados: 5268


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [None]:
# Limpieza Distribuida

def spark_data_cleaning(df):
    """Pipeline de limpieza para Spark"""

    # 1. Eliminar registros con muchos nulls
    threshold = 0.5  # Eliminar filas con >50% nulls
    min_non_null = int(threshold * len(df.columns))
    df_clean = df.dropna(thresh=min_non_null)

    # 2. Imputación por columna
    numeric_cols = [field.name for field in df_clean.schema.fields
                   if field.dataType in [IntegerType(), DoubleType(), FloatType()]]

    # Imputar con mediana
    for col_name in numeric_cols:
        median_val = df_clean.approxQuantile(col_name, [0.5], 0.01)[0]
        df_clean = df_clean.na.fill({col_name: median_val})

    # Imputar categóricas con moda
    string_cols = [field.name for field in df_clean.schema.fields
                  if field.dataType == StringType()]

    for col_name in string_cols:
        mode_val = df_clean.groupBy(col_name).count().orderBy(desc("count")).first()[0]
        df_clean = df_clean.na.fill({col_name: mode_val})

    # 3. Eliminar duplicados
    df_clean = df_clean.dropDuplicates()

    # 4. Detección de outliers usando IQR
    for col_name in numeric_cols:
        quantiles = df_clean.approxQuantile(col_name, [0.25, 0.75], 0.01)
        q1, q3 = quantiles[0], quantiles[1]
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        df_clean = df_clean.filter(
            (col(col_name) >= lower_bound) &
            (col(col_name) <= upper_bound)
        )

    return df_clean

df_spark_clean = spark_data_cleaning(df_spark)
print(f"Registros después de limpieza: {df_spark_clean.count()}")

Registros después de limpieza: 468260


In [None]:
# 2) Imports
import pandas as pd
import great_expectations as ge

# 3) Asegurar df_clean (si ya lo tienes definido, no se sobreescribe)
try:
    df_clean
except NameError:
    df_clean = df_clean.toPandas()
# 4) Convertir DataFrame a GE Dataset (API legacy)
ge_df = ge.from_pandas(df_clean)


In [None]:
df_clean.head()

Unnamed: 0,col1,col2
0,1.0,a
1,2.0,b
2,3.0,c
3,,d


  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
ge_df.info()

<class 'great_expectations.dataset.pandas_dataset.PandasDataset'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   col1    3 non-null      float64
 1   col2    4 non-null      object 
dtypes: float64(1), object(1)
memory usage: 196.0+ bytes


  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
# Definir Expectativas

def create_data_expectations(ge_df):
    """Define expectativas de calidad para el dataset"""

    # 1. Expectativas básicas de estructura
    ge_df.expect_table_row_count_to_be_between(min_value=1000, max_value=100000)
    ge_df.expect_table_column_count_to_equal(value=10)

    # 2. Expectativas de completitud
    ge_df.expect_column_values_to_not_be_null(column="col2")
    ge_df.expect_column_values_to_not_be_null(column="col1")

    # 3. Expectativas de valores únicos
    ge_df.expect_column_values_to_be_unique(column="col2")

    # 4. Expectativas de rango
    ge_df.expect_column_values_to_be_between(
        column="col1",
        min_value=0,
        max_value=10000
    )

    return ge_df

# Aplicar expectativas
ge_df_with_expectations = create_data_expectations(ge_df)

In [None]:
# Validar y Generar Reporte

# Ejecutar validación
validation_result = ge_df_with_expectations.validate()

# Mostrar resultados
print("=== RESULTADOS DE VALIDACIÓN ===")
print(f"Validación exitosa: {validation_result.success}")
print(f"Total expectativas: {len(validation_result.results)}")

# Detalles de fallos
failed_expectations = [
    result for result in validation_result.results
    if not result.success
]

print(f"Expectativas fallidas: {len(failed_expectations)}")

for failed in failed_expectations:
    print(f"❌ {failed.expectation_config.expectation_type}")
    print(f"   Columna: {failed.expectation_config.kwargs.get('column', 'N/A')}")
    print(f"   Detalle: {failed.result}")

=== RESULTADOS DE VALIDACIÓN ===
Validación exitosa: False
Total expectativas: 6
Expectativas fallidas: 3
❌ expect_table_row_count_to_be_between
   Columna: N/A
   Detalle: {'observed_value': 4}
❌ expect_table_column_count_to_equal
   Columna: N/A
   Detalle: {'observed_value': 2}
❌ expect_column_values_to_not_be_null
   Columna: col1
   Detalle: {'element_count': 4, 'unexpected_count': 1, 'unexpected_percent': 25.0, 'unexpected_percent_total': 25.0, 'partial_unexpected_list': []}


In [None]:
# ========== DATA QUALITY DASHBOARD (forzando pandas) ==========
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

def _to_pandas_force(df, max_rows=200_000):
    """
    Convierte a pandas de forma robusta:
    - Si ya es pandas, lo devuelve tal cual.
    - Si es pandas-on-Spark: usa .to_pandas().
    - Si parece PySpark (tiene .toPandas y .select): usa .toPandas() o .limit().
    - Si nada de lo anterior, intenta pd.DataFrame(df).
    """
    if isinstance(df, pd.DataFrame):
        return df

    # pandas-on-Spark
    try:
        import pyspark.pandas as ps  # noqa: F401
        from pyspark.pandas.frame import DataFrame as PsDataFrame  # type: ignore
        if isinstance(df, PsDataFrame):
            return df.to_pandas()
    except Exception:
        pass

    # PySpark (detección por atributos en vez de isinstance)
    if hasattr(df, "toPandas") and hasattr(df, "select") and hasattr(df, "schema"):
        try:
            rows = df.count()
        except Exception:
            rows = None
        if rows is None or rows <= max_rows:
            return df.toPandas()
        else:
            # evitar sample() (que puede introducir generadores); usa limit()
            return df.limit(max_rows).toPandas()

    # Último recurso
    return pd.DataFrame(df)

def create_quality_dashboard(df, validation_result, title_suffix=""):
    """Crea dashboard interactivo de calidad a partir de un pandas DataFrame."""
    # Forzar pandas aquí sí o sí
    df = _to_pandas_force(df)
    if not isinstance(df, pd.DataFrame):
        raise TypeError("No se pudo convertir el DataFrame a pandas. Revisa el tipo de entrada.")

    # --- 1) Completitud por columna ---
    missing_data = df.isnull().sum().reset_index()
    missing_data.columns = ['Column', 'Missing_Count']
    total_rows = len(df)
    missing_data['Missing_Percentage'] = (
        (missing_data['Missing_Count'] / total_rows) * 100.0 if total_rows > 0 else 0.0
    )

    fig1 = px.bar(
        missing_data,
        x='Column',
        y='Missing_Percentage',
        title=f'Completitud por Columna (%){(" - " + title_suffix) if title_suffix else ""}'
    )
    fig1.update_layout(xaxis_tickangle=-45, yaxis_title="Porcentaje (%)")

    # --- 2) Distribución de calidad por expectativas ---
    results_list = list(getattr(validation_result, "results", []))
    success_count = sum(1 for r in results_list if getattr(r, "success", False))
    fail_count = max(len(results_list) - success_count, 0)

    fig2 = go.Figure(
        data=[go.Pie(labels=['Pasadas', 'Fallidas'], values=[success_count, fail_count], hole=0.3)]
    )
    fig2.update_layout(title=f'Distribución de Expectativas{(" - " + title_suffix) if title_suffix else ""}')

    # --- 3) Outliers por columna numérica (hasta 4 primeras) ---
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    fig3 = make_subplots(
        rows=2, cols=2,
        subplot_titles=[f'Outliers en {col}' for col in numeric_cols[:4]]
    )
    for i, col in enumerate(numeric_cols[:4]):
        row = (i // 2) + 1
        col_pos = (i % 2) + 1
        fig3.add_trace(go.Box(y=df[col], name=str(col)), row=row, col=col_pos)
    fig3.update_layout(title_text=f'Outliers por Columna Numérica{(" - " + title_suffix) if title_suffix else ""}')

    # --- 4) Correlación entre variables numéricas ---
    if len(numeric_cols) >= 2:
        corr_matrix = df[numeric_cols].corr()
        fig4 = px.imshow(
            corr_matrix,
            title=f'Matriz de Correlación{(" - " + title_suffix) if title_suffix else ""}',
            color_continuous_scale='RdBu_r',
            zmin=-1, zmax=1
        )
    else:
        fig4 = go.Figure()
        fig4.update_layout(title='Matriz de Correlación (no hay suficientes columnas numéricas)')

    # Mostrar (opcional)
    fig1.show(); fig2.show(); fig3.show(); fig4.show()
    return fig1, fig2, fig3, fig4


In [None]:
# Convierte una vez (sin Spark ops peligrosas) y reutiliza
pdf_clean = to_pandas_no_gen(df_spark_clean)
dashboard_figs = create_quality_dashboard(pdf_clean, validation_result, title_suffix="DF Clean")
generate_html_report(pdf_clean, validation_result, "quality_report.html")



distutils Version classes are deprecated. Use packaging.version instead.




datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.now(datetime.UTC).



Reporte generado: quality_report.html


In [None]:
# ===== Reporte HTML Automatizado (extendido: Spark/Pandas) =====
import pandas as pd
from datetime import datetime, timezone

# ---- Detecciones de tipo ----
def _is_spark_df(df):
    try:
        from pyspark.sql import DataFrame as SparkDataFrame  # noqa
        return isinstance(df, SparkDataFrame)
    except Exception:
        return False

def _is_pandas_df(df):
    return isinstance(df, pd.DataFrame)

# ---- Métricas base ----
def _spark_metrics(df_spark):
    """Métricas base con PySpark, sin generadores."""
    from pyspark.sql import functions as F

    cols = list(df_spark.columns)
    total_rows = df_spark.count()
    total_columns = len(cols)

    # Duplicados: total - distintos en todas las columnas
    duplicates = total_rows - df_spark.dropDuplicates(cols).count()

    # Nulos por columna en UNA sola agregación
    null_exprs = [F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in cols]
    nulls_row = df_spark.agg(*null_exprs).collect()[0].asDict()

    missing_counts = []
    for c in cols:
        miss = int(nulls_row.get(c, 0) or 0)
        pct = (miss / total_rows * 100.0) if total_rows > 0 else 0.0
        missing_counts.append((c, miss, pct))

    return total_rows, total_columns, duplicates, missing_counts

def _pandas_metrics(df_pd):
    total_rows = len(df_pd)
    total_columns = len(df_pd.columns)
    try:
        duplicates = df_pd.duplicated().sum()
    except Exception:
        duplicates = 0

    miss_ser = df_pd.isnull().sum()
    missing_counts = []
    for c, miss in miss_ser.items():
        miss = int(miss)
        pct = (miss / total_rows * 100.0) if total_rows > 0 else 0.0
        missing_counts.append((c, miss, pct))
    return total_rows, total_columns, duplicates, missing_counts

# ---- Cardinalidad (unique) y tipos ----
def _spark_uniques_and_dtypes(df_spark):
    from pyspark.sql import functions as F
    cols = list(df_spark.columns)
    # approx_count_distinct por columna en una sola agregación
    uniq_exprs = [F.approx_count_distinct(F.col(c)).alias(c) for c in cols]
    uniq_row = df_spark.agg(*uniq_exprs).collect()[0].asDict()
    # dtypes: lista de (col, dtype_str)
    dtypes_map = dict(df_spark.dtypes)
    uniques_map = {c: int(uniq_row.get(c, 0) or 0) for c in cols}
    return uniques_map, dtypes_map

def _pandas_uniques_and_dtypes(df_pd):
    uniques_map = {c: int(df_pd[c].nunique(dropna=True)) for c in df_pd.columns}
    dtypes_map = {c: str(dt) for c, dt in df_pd.dtypes.items()}
    return uniques_map, dtypes_map

# ---- Resumen numérico ----
def _spark_numeric_summary(df_spark, max_cols=20):
    from pyspark.sql import functions as F
    # Tipos numéricos Spark (aprox)
    numeric_keywords = {"int", "bigint", "double", "float", "decimal", "smallint", "tinyint"}
    num_cols = [c for c, t in df_spark.dtypes if any(k in t.lower() for k in numeric_keywords)]
    num_cols = num_cols[:max_cols]
    if not num_cols:
        return None  # sin numéricas

    aggs = []
    for c in num_cols:
        aggs.extend([
            F.mean(F.col(c)).alias(f"{c}__mean"),
            F.stddev(F.col(c)).alias(f"{c}__std"),
            F.min(F.col(c)).alias(f"{c}__min"),
            F.max(F.col(c)).alias(f"{c}__max"),
        ])
    row = df_spark.agg(*aggs).collect()[0].asDict()
    # Convertir a tabla plana
    data = []
    for c in num_cols:
        data.append({
            "Column": c,
            "Mean": row.get(f"{c}__mean"),
            "Std": row.get(f"{c}__std"),
            "Min": row.get(f"{c}__min"),
            "Max": row.get(f"{c}__max"),
        })
    return pd.DataFrame(data)

def _pandas_numeric_summary(df_pd):
    num = df_pd.select_dtypes(include="number")
    if num.shape[1] == 0:
        return None
    desc = num.describe().T  # count, mean, std, min, 25%, 50%, 75%, max
    desc = desc.rename(columns={"std": "Std", "mean": "Mean", "min": "Min", "max": "Max"})
    return desc.reset_index().rename(columns={"index": "Column"})

# ---- Vista previa ----
def _preview_html(df, is_spark, n=10):
    if is_spark:
        try:
            pdf = df.limit(n).toPandas()
            return pdf.to_html(index=False)
        except Exception:
            return "<p>(No se pudo generar la vista previa desde Spark)</p>"
    else:
        try:
            return df.head(n).to_html(index=False)
        except Exception:
            return "<p>(No se pudo generar la vista previa)</p>"

# ---- GE results ----
def _ge_results(validation_result):
    """Cuenta éxitos/fallas y detalle (objeto GE 0.13.x o dict similar)."""
    try:
        results_list = list(getattr(validation_result, "results", []))
    except Exception:
        results_list = []
    if not results_list and isinstance(validation_result, dict):
        results_list = validation_result.get("results", [])

    def _ok(item):
        try:
            return bool(getattr(item, "success", False))
        except Exception:
            try:
                return bool(item.get("success", False))
            except Exception:
                return False

    success = sum(1 for r in results_list if _ok(r))
    fail = max(len(results_list) - success, 0)

    failed = []
    for item in results_list:
        if not _ok(item):
            try:
                exp_type = getattr(item.expectation_config, "expectation_type", "expectation")
                kwargs = getattr(item.expectation_config, "kwargs", {}) or {}
                res = getattr(item, "result", {}) or {}
            except Exception:
                exp_type = (item.get("expectation_config", {}) or {}).get("expectation_type", "expectation")
                kwargs = (item.get("expectation_config", {}) or {}).get("kwargs", {}) or {}
                res = item.get("result", {}) or {}
            details = res.get("partial_unexpected_list") or res.get("unexpected_list") or res.get("unexpected_count") or "Ver detalles"
            failed.append((exp_type, kwargs, details))
    return success, fail, failed

# ---- HTML principal ----
def generate_html_report(df, validation_result, output_path="quality_report.html"):
    """Genera reporte HTML completo; Spark/Pandas, sin generadores."""
    is_spark = _is_spark_df(df)

    # Métricas base
    if is_spark:
        total_rows, total_columns, duplicates, missing_counts = _spark_metrics(df)
        uniques_map, dtypes_map = _spark_uniques_and_dtypes(df)
        numeric_summary = _spark_numeric_summary(df)
    else:
        if not _is_pandas_df(df):
            df = pd.DataFrame(df)
        total_rows, total_columns, duplicates, missing_counts = _pandas_metrics(df)
        uniques_map, dtypes_map = _pandas_uniques_and_dtypes(df)
        numeric_summary = _pandas_numeric_summary(df)

    # GE
    success_count, fail_count, failed = _ge_results(validation_result)

    # Overview de columnas (tipo, nulos, %, únicos)
    overview_rows = []
    miss_map = {c: (m, p) for c, m, p in missing_counts}
    for c in (df.columns if not is_spark else [x for x in dtypes_map.keys()] ):
        miss, pct = miss_map.get(c, (0, 0.0))
        overview_rows.append(
            f"<tr><td>{c}</td><td>{dtypes_map.get(c, 'n/a')}</td>"
            f"<td>{miss}</td><td>{pct:.2f}%</td><td>{uniques_map.get(c, 0)}</td></tr>"
        )
    overview_table = "".join(overview_rows)

    # Tabla de completitud (solo columnas con faltantes > 0)
    completeness_rows = [
        f"<tr><td>{c}</td><td>{m}</td><td>{p:.2f}%</td></tr>"
        for c, m, p in missing_counts if m > 0
    ]
    completeness_table = "".join(completeness_rows) or "<tr><td colspan='3'>Sin valores faltantes</td></tr>"

    # Vista previa
    preview_html = _preview_html(df, is_spark=is_spark, n=10)

    # Fallidas
    if failed:
        failed_items = []
        for exp, kwargs, details in failed:
            # kwargs resumidos
            short_kwargs = ", ".join(f"{k}={v}" for k, v in list(kwargs.items())[:6])
            failed_items.append(f"<tr><td>{exp}</td><td>{short_kwargs}</td><td>{details}</td></tr>")
        failed_expectations_html = "<table><tr><th>Expectation</th><th>Kwargs</th><th>Detalles</th></tr>" + "".join(failed_items) + "</table>"
    else:
        failed_expectations_html = "<p>No hay expectativas fallidas.</p>"

    # Resumen numérico (si existe)
    if numeric_summary is not None and len(numeric_summary) > 0:
        numeric_summary_html = numeric_summary.to_html(index=False)
    else:
        numeric_summary_html = "<p>No hay columnas numéricas para resumir.</p>"

    # Plantilla con llaves CSS escapadas {{ }}
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>Data Quality Report</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f0f0f0; padding: 20px; border-radius: 6px; }}
            .metrics {{ display: flex; flex-wrap: wrap; gap: 12px; margin: 12px 0; }}
            .metric {{ min-width: 220px; display: inline-block; padding: 12px 16px; background-color: #e8f4fd; border-radius: 6px; }}
            .success {{ color: #0a7f34; }}
            .error {{ color: #b00020; }}
            table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
            th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            th {{ background-color: #f2f2f2; }}
            h2 {{ margin-top: 24px; }}
            .note {{ color: #666; font-size: 12px; }}
        </style>
    </head>
    <body>
        <div class="header">
            <h1>Data Quality Report</h1>
            <p>Generado: {timestamp}</p>
        </div>

        <h2>Métricas Generales</h2>
        <div class="metrics">
            <div class="metric"><strong>Total Registros:</strong> {total_rows}</div>
            <div class="metric"><strong>Total Columnas:</strong> {total_columns}</div>
            <div class="metric"><strong>Registros Duplicados:</strong> {duplicates}</div>
            <div class="metric"><strong>Expectativas Pasadas:</strong> <span class="success">{success_count}</span></div>
            <div class="metric"><strong>Expectativas Fallidas:</strong> <span class="error">{fail_count}</span></div>
        </div>

        <h2>Vista previa (primeras 10 filas)</h2>
        {preview_html}

        <h2>Overview de Columnas</h2>
        <table>
            <tr><th>Columna</th><th>Tipo</th><th>Faltantes</th><th>% Faltantes</th><th>Únicos (aprox en Spark)</th></tr>
            {overview_table}
        </table>

        <h2>Completitud por Columna</h2>
        <table>
            <tr><th>Columna</th><th>Valores Faltantes</th><th>Porcentaje</th></tr>
            {completeness_table}
        </table>

        <h2>Resumen Numérico</h2>
        {numeric_summary_html}

        <h2>Expectativas Fallidas</h2>
        {failed_expectations_html}

        {sampling_note}
    </body>
    </html>
    """

    sampling_note = ""
    if is_spark:
        sampling_note = "<p class='note'>Nota: En Spark, la vista previa usa limit(10). Los conteos únicos usan approx_count_distinct.</p>"

    html_content = html_template.format(
        timestamp=datetime.now(timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S %Z'),
        total_rows=total_rows,
        total_columns=total_columns,
        duplicates=duplicates,
        success_count=success_count,
        fail_count=fail_count,
        preview_html=preview_html,
        overview_table=overview_table,
        completeness_table=completeness_table,
        numeric_summary_html=numeric_summary_html,
        failed_expectations_html=failed_expectations_html,
        sampling_note=sampling_note
    )

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(html_content)

    print(f"Reporte generado: {output_path}")


In [None]:
# Spark o pandas, ambas sirven:
#generate_html_report(df_clean, validation_result, "quality_report.html")
# o
generate_html_report(df_spark_clean, validation_result, "quality_report.html")


PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.

In [None]:
# ========= Helpers "a prueba de Spark" =========
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

def to_pandas_no_gen(df, max_rows=200_000):
    """
    Convierte el DF a pandas sin usar sample() ni expresiones Spark:
    - Si ya es pandas, lo devuelve.
    - Si parece Spark (tiene .toPandas y .limit), trae hasta max_rows filas con limit().
    - Último recurso: pd.DataFrame(df).
    """
    if isinstance(df, pd.DataFrame):
        return df
    # pandas-on-Spark
    try:
        import pyspark.pandas as ps  # noqa: F401
        from pyspark.pandas.frame import DataFrame as PsDataFrame  # type: ignore
        if isinstance(df, PsDataFrame):
            return df.to_pandas()
    except Exception:
        pass
    # PySpark por atributos (evita isinstance para no importar clases)
    if hasattr(df, "toPandas") and hasattr(df, "limit") and hasattr(df, "schema"):
        # NO usamos count() ni sample(): solo limit() -> toPandas()
        return df.limit(max_rows).toPandas()
    return pd.DataFrame(df)


# ========= DASHBOARD (siempre con pandas) =========
def create_quality_dashboard(df, validation_result, title_suffix=""):
    # Fuerza pandas aquí sí o sí
    df = to_pandas_no_gen(df)

    # 1) Completitud
    missing_data = df.isnull().sum().reset_index()
    missing_data.columns = ['Column', 'Missing_Count']
    total_rows = len(df)
    missing_data['Missing_Percentage'] = (
        (missing_data['Missing_Count'] / total_rows) * 100.0 if total_rows > 0 else 0.0
    )

    fig1 = px.bar(
        missing_data, x='Column', y='Missing_Percentage',
        title=f'Completitud por Columna (%){(" - " + title_suffix) if title_suffix else ""}'
    )
    fig1.update_layout(xaxis_tickangle=-45, yaxis_title="Porcentaje (%)")

    # 2) Distribución de expectativas (GE 0.13.x)
    results_list = list(getattr(validation_result, "results", []))
    success_count = sum(1 for r in results_list if getattr(r, "success", False))
    fail_count = max(len(results_list) - success_count, 0)
    fig2 = go.Figure([go.Pie(labels=['Pasadas','Fallidas'], values=[success_count, fail_count], hole=0.3)])
    fig2.update_layout(title=f'Distribución de Expectativas{(" - " + title_suffix) if title_suffix else ""}')

    # 3) Outliers
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    fig3 = make_subplots(rows=2, cols=2, subplot_titles=[f'Outliers en {c}' for c in numeric_cols[:4]])
    for i, c in enumerate(numeric_cols[:4]):
        fig3.add_trace(go.Box(y=df[c], name=str(c)), row=(i//2)+1, col=(i%2)+1)
    fig3.update_layout(title_text=f'Outliers por Columna Numérica{(" - " + title_suffix) if title_suffix else ""}')

    # 4) Correlación
    if len(numeric_cols) >= 2:
        corr = df[numeric_cols].corr()
        fig4 = px.imshow(corr, title=f'Matriz de Correlación{(" - " + title_suffix) if title_suffix else ""}',
                         color_continuous_scale='RdBu_r', zmin=-1, zmax=1)
    else:
        fig4 = go.Figure(); fig4.update_layout(title='Matriz de Correlación (insuficientes columnas numéricas)')

    # Mostrar (opcional)
    fig1.show(); fig2.show(); fig3.show(); fig4.show()
    return fig1, fig2, fig3, fig4


# ========= REPORTE HTML (siempre con pandas) =========
from datetime import datetime, timezone

def _ge_results(validation_result):
    try:
        results_list = list(getattr(validation_result, "results", []))
    except Exception:
        results_list = []
    if not results_list and isinstance(validation_result, dict):
        results_list = validation_result.get("results", [])

    def _ok(item):
        try:
            return bool(getattr(item, "success", False))
        except Exception:
            try: return bool(item.get("success", False))
            except Exception: return False

    success = sum(1 for r in results_list if _ok(r))
    fail = max(len(results_list) - success, 0)
    failed = []
    for item in results_list:
        if not _ok(item):
            try:
                exp_type = getattr(item.expectation_config, "expectation_type", "expectation")
                res = getattr(item, "result", {}) or {}
                kwargs = getattr(item.expectation_config, "kwargs", {}) or {}
            except Exception:
                exp_type = (item.get("expectation_config", {}) or {}).get("expectation_type", "expectation")
                res = item.get("result", {}) or {}
                kwargs = (item.get("expectation_config", {}) or {}).get("kwargs", {}) or {}
            details = res.get("partial_unexpected_list") or res.get("unexpected_list") or res.get("unexpected_count") or "Ver detalles"
            failed.append((exp_type, kwargs, details))
    return success, fail, failed

def generate_html_report(df, validation_result, output_path="quality_report.html"):
    # Fuerza pandas aquí sí o sí (sin Spark ops)
    df = to_pandas_no_gen(df)
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    # Métricas base
    total_rows = len(df)
    total_columns = len(df.columns)
    try:
        duplicates = df.duplicated().sum()
    except Exception:
        duplicates = 0

    # Completitud
    miss_ser = df.isnull().sum()
    completeness_rows = []
    for col, miss in miss_ser.items():
        if miss > 0:
            pct = (miss / total_rows * 100.0) if total_rows > 0 else 0.0
            completeness_rows.append(f"<tr><td>{col}</td><td>{int(miss)}</td><td>{pct:.2f}%</td></tr>")
    completeness_table = "".join(completeness_rows) or "<tr><td colspan='3'>Sin valores faltantes</td></tr>"

    # GE
    success_count, fail_count, failed = _ge_results(validation_result)
    if failed:
        failed_items = []
        for exp, kwargs, details in failed:
            short_kwargs = ", ".join(f"{k}={v}" for k, v in list(kwargs.items())[:6])
            failed_items.append(f"<tr><td>{exp}</td><td>{short_kwargs}</td><td>{details}</td></tr>")
        failed_expectations_html = "<table><tr><th>Expectation</th><th>Kwargs</th><th>Detalles</th></tr>" + "".join(failed_items) + "</table>"
    else:
        failed_expectations_html = "<p>No hay expectativas fallidas.</p>"

    # Plantilla (CSS con llaves escapadas)
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>Data Quality Report</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f0f0f0; padding: 20px; border-radius: 6px; }}
            .metrics {{ display: flex; flex-wrap: wrap; gap: 12px; margin: 12px 0; }}
            .metric {{ min-width: 220px; display: inline-block; padding: 12px 16px; background-color: #e8f4fd; border-radius: 6px; }}
            .success {{ color: #0a7f34; }}
            .error {{ color: #b00020; }}
            table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
            th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            th {{ background-color: #f2f2f2; }}
            h2 {{ margin-top: 24px; }}
        </style>
    </head>
    <body>
        <div class="header">
            <h1>Data Quality Report</h1>
            <p>Generado: {timestamp}</p>
        </div>

        <h2>Métricas Generales</h2>
        <div class="metrics">
            <div class="metric"><strong>Total Registros:</strong> {total_rows}</div>
            <div class="metric"><strong>Total Columnas:</strong> {total_columns}</div>
            <div class="metric"><strong>Registros Duplicados:</strong> {duplicates}</div>
            <div class="metric"><strong>Expectativas Pasadas:</strong> <span class="success">{success_count}</span></div>
            <div class="metric"><strong>Expectativas Fallidas:</strong> <span class="error">{fail_count}</span></div>
        </div>

        <h2>Completitud por Columna</h2>
        <table>
            <tr><th>Columna</th><th>Valores Faltantes</th><th>Porcentaje</th></tr>
            {completeness_table}
        </table>

        <h2>Expectativas Fallidas</h2>
        {failed_expectations_html}
    </body>
    </html>
    """

    html_content = html_template.format(
        timestamp=datetime.now(timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S %Z'),
        total_rows=total_rows,
        total_columns=total_columns,
        duplicates=duplicates,
        success_count=success_count,
        fail_count=fail_count,
        completeness_table=completeness_table,
        failed_expectations_html=failed_expectations_html
    )

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(html_content)

    print(f"Reporte generado: {output_path}")


In [None]:
# ======= FIX: forzar builtins para evitar choque con PySpark =======
import builtins
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

def to_pandas_no_gen(df, max_rows=200_000):
    if isinstance(df, pd.DataFrame):
        return df
    # pandas-on-Spark
    try:
        import pyspark.pandas as ps  # noqa
        from pyspark.pandas.frame import DataFrame as PsDataFrame  # type: ignore
        if isinstance(df, PsDataFrame):
            return df.to_pandas()
    except Exception:
        pass
    # PySpark por atributos (sin isinstance, para no importar clases)
    if hasattr(df, "toPandas") and hasattr(df, "limit") and hasattr(df, "schema"):
        return df.limit(max_rows).toPandas()  # NO usamos sample() ni count()
    return pd.DataFrame(df)

def _ge_results_safe(validation_result):
    try:
        results_list = list(getattr(validation_result, "results", []))
    except Exception:
        results_list = []
    if not results_list and isinstance(validation_result, dict):
        results_list = validation_result.get("results", [])

    def _ok(item):
        try:
            return bool(getattr(item, "success", False))
        except Exception:
            try:
                return bool(item.get("success", False))
            except Exception:
                return False

    success = builtins.sum(1 for r in results_list if _ok(r))  # <-- builtins.sum
    fail = builtins.max(len(results_list) - success, 0)         # <-- builtins.max

    failed = []
    for item in results_list:
        if not _ok(item):
            try:
                exp_type = getattr(item.expectation_config, "expectation_type", "expectation")
                res = getattr(item, "result", {}) or {}
                kwargs = getattr(item.expectation_config, "kwargs", {}) or {}
            except Exception:
                exp_type = (item.get("expectation_config", {}) or {}).get("expectation_type", "expectation")
                res = item.get("result", {}) or {}
                kwargs = (item.get("expectation_config", {}) or {}).get("kwargs", {}) or {}
            details = res.get("partial_unexpected_list") or res.get("unexpected_list") or res.get("unexpected_count") or "Ver detalles"
            failed.append((exp_type, kwargs, details))
    return success, fail, failed

def create_quality_dashboard(df, validation_result, title_suffix=""):
    df = to_pandas_no_gen(df)

    # 1) Completitud
    missing_data = df.isnull().sum().reset_index()
    missing_data.columns = ['Column', 'Missing_Count']
    total_rows = len(df)
    missing_data['Missing_Percentage'] = (
        (missing_data['Missing_Count'] / total_rows) * 100.0 if total_rows > 0 else 0.0
    )

    fig1 = px.bar(
        missing_data, x='Column', y='Missing_Percentage',
        title=f'Completitud por Columna (%){(" - " + title_suffix) if title_suffix else ""}'
    )
    fig1.update_layout(xaxis_tickangle=-45, yaxis_title="Porcentaje (%)")

    # 2) Distribución de expectativas (usar builtins.sum / max)
    results_list = list(getattr(validation_result, "results", []))
    success_count = builtins.sum(1 for r in results_list if getattr(r, "success", False))
    fail_count    = builtins.max(len(results_list) - success_count, 0)

    fig2 = go.Figure([go.Pie(labels=['Pasadas','Fallidas'], values=[success_count, fail_count], hole=0.3)])
    fig2.update_layout(title=f'Distribución de Expectativas{(" - " + title_suffix) if title_suffix else ""}')

    # 3) Outliers
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    fig3 = make_subplots(rows=2, cols=2, subplot_titles=[f'Outliers en {c}' for c in numeric_cols[:4]])
    for i, c in enumerate(numeric_cols[:4]):
        fig3.add_trace(go.Box(y=df[c], name=str(c)), row=(i//2)+1, col=(i%2)+1)
    fig3.update_layout(title_text=f'Outliers por Columna Numérica{(" - " + title_suffix) if title_suffix else ""}')

    # 4) Correlación
    if len(numeric_cols) >= 2:
        corr = df[numeric_cols].corr()
        fig4 = px.imshow(corr, title=f'Matriz de Correlación{(" - " + title_suffix) if title_suffix else ""}',
                         color_continuous_scale='RdBu_r', zmin=-1, zmax=1)
    else:
        fig4 = go.Figure(); fig4.update_layout(title='Matriz de Correlación (insuficientes columnas numéricas)')

    fig1.show(); fig2.show(); fig3.show(); fig4.show()
    return fig1, fig2, fig3, fig4

def generate_html_report(df, validation_result, output_path="quality_report.html"):
    df = to_pandas_no_gen(df)
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    total_rows = len(df)
    total_columns = len(df.columns)
    try:
        duplicates = df.duplicated().sum()
    except Exception:
        duplicates = 0

    # Completitud
    miss_ser = df.isnull().sum()
    rows_html = []
    for col, miss in miss_ser.items():
        if miss > 0:
            pct = (miss / total_rows * 100.0) if total_rows > 0 else 0.0
            rows_html.append(f"<tr><td>{col}</td><td>{int(miss)}</td><td>{pct:.2f}%</td></tr>")
    completeness_table = "".join(rows_html) or "<tr><td colspan='3'>Sin valores faltantes</td></tr>"

    # GE (usar builtins.sum / max)
    success_count, fail_count, failed = _ge_results_safe(validation_result)
    if failed:
        items = []
        for exp, kwargs, details in failed:
            short_kwargs = ", ".join(f"{k}={v}" for k, v in list(kwargs.items())[:6])
            items.append(f"<tr><td>{exp}</td><td>{short_kwargs}</td><td>{details}</td></tr>")
        failed_expectations_html = "<table><tr><th>Expectation</th><th>Kwargs</th><th>Detalles</th></tr>" + "".join(items) + "</table>"
    else:
        failed_expectations_html = "<p>No hay expectativas fallidas.</p>"

    # Plantilla (CSS con llaves escapadas)
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>Data Quality Report</title>
        <style>
            body {{ font-family: Arial, sans-serif; margin: 20px; }}
            .header {{ background-color: #f0f0f0; padding: 20px; border-radius: 6px; }}
            .metrics {{ display: flex; flex-wrap: wrap; gap: 12px; margin: 12px 0; }}
            .metric {{ min-width: 220px; display: inline-block; padding: 12px 16px; background-color: #e8f4fd; border-radius: 6px; }}
            .success {{ color: #0a7f34; }}
            .error {{ color: #b00020; }}
            table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
            th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
            th {{ background-color: #f2f2f2; }}
            h2 {{ margin-top: 24px; }}
        </style>
    </head>
    <body>
        <div class="header">
            <h1>Data Quality Report</h1>
            <p>Generado: {timestamp}</p>
        </div>

        <h2>Métricas Generales</h2>
        <div class="metrics">
            <div class="metric"><strong>Total Registros:</strong> {total_rows}</div>
            <div class="metric"><strong>Total Columnas:</strong> {total_columns}</div>
            <div class="metric"><strong>Registros Duplicados:</strong> {duplicates}</div>
            <div class="metric"><strong>Expectativas Pasadas:</strong> <span class="success">{success_count}</span></div>
            <div class="metric"><strong>Expectativas Fallidas:</strong> <span class="error">{fail_count}</span></div>
        </div>

        <h2>Completitud por Columna</h2>
        <table>
            <tr><th>Columna</th><th>Valores Faltantes</th><th>Porcentaje</th></tr>
            {completeness_table}
        </table>

        <h2>Expectativas Fallidas</h2>
        {failed_expectations_html}
    </body>
    </html>
    """

    html_content = html_template.format(
        timestamp=datetime.now(timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S %Z'),
        total_rows=total_rows,
        total_columns=total_columns,
        duplicates=duplicates,
        success_count=success_count,
        fail_count=fail_count,
        completeness_table=completeness_table,
        failed_expectations_html=failed_expectations_html
    )

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(html_content)

    print(f"Reporte generado: {output_path}")
