In [1]:
import pandas as pd
import numpy as np
import time
import os
from pandas.api import types as ptypes
from pandas import CategoricalDtype
from pandera import Column, DataFrameSchema, Check
from pandera.typing.common import DateTime
from joblib import Parallel, delayed
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [2]:
# 1. Generar DataFrame con 1 millón de registros y 20 columnas
n = 10_000_000
np.random.seed(42)

# Columnas de IDs
id1 = np.arange(1, n + 1)

# Columnas de enteros
int_cols = {f"int_col{i+1}": np.random.randint(0, 100, size=n) for i in range(4)}

# Columnas de floats
float_cols = {f"float_col{i+1}": np.random.random(size=n) * 100 for i in range(4)}

# Columnas categóricas
categories = ["A", "B", "C", "D"]
cat_cols = {f"cat_col{i+1}": pd.Categorical(np.random.choice(categories, size=n)) for i in range(4)}

# Columnas datetime
start = pd.Timestamp("2020-01-01")
datetime_cols = {
    "datetime_col1": pd.date_range(start, periods=n, freq="min"),
    "datetime_col2": pd.to_datetime(np.random.randint(start.value//10**9, 
                                                     (start + pd.Timedelta(days=365)).value//10**9, 
                                                     size=n), unit="s")
}

# Columnas booleanas
bool_cols = {f"bool_col{i+1}": np.random.choice([True, False], size=n) for i in range(4)}

# Construir DataFrame
df = pd.DataFrame({
    "id": id1,
    **int_cols,
    **float_cols,
    **cat_cols,
    **datetime_cols,
    **bool_cols
})


# 2. Validación de esquema de dtypes
expected_checks = {
    "id": ptypes.is_integer_dtype,
    **{f"int_col{i+1}": ptypes.is_integer_dtype for i in range(4)},
    **{f"float_col{i+1}": ptypes.is_float_dtype for i in range(4)},
    **{f"cat_col{i+1}": lambda x: isinstance(x.dtype, CategoricalDtype) for i in range(4)},
    "datetime_col1": ptypes.is_datetime64_any_dtype,
    "datetime_col2": ptypes.is_datetime64_any_dtype,
    **{f"bool_col{i+1}": ptypes.is_bool_dtype for i in range(4)},
}

errors = []
for col, check in expected_checks.items():
    if not check(df[col]):
        errors.append(f"Columna '{col}' tiene dtype {df[col].dtype}")

if errors:
    print("Errores de validación:")
    for err in errors:
        print(" -", err)
else:
    print("Todos los dtypes coinciden con el esquema esperado.")

Todos los dtypes coinciden con el esquema esperado.


In [3]:
# Dinámicamente definir rangos de fechas según df
min_dt1, max_dt1 = df["datetime_col1"].min(), df["datetime_col1"].max()
min_dt2, max_dt2 = df["datetime_col2"].min(), df["datetime_col2"].max()

# Esquema actualizado sin errores anteriores
schema = DataFrameSchema(
    {
        # 1. ID único y creciente
        "id": Column(
            int,
            checks=[
                Check.ge(1),
                Check(lambda s: s.is_unique, element_wise=False, error="IDs no únicos"),
                Check(lambda s: s.is_monotonic_increasing, element_wise=False, error="IDs no ordenados"),
            ],
        ),
        # 2. Enteros [0–99], sin nulos
        **{
            f"int_col{i+1}": Column(
                int,
                checks=[Check.in_range(0, 99)],
                nullable=False,
            )
            for i in range(4)
        },
        # 3. Floats [0.0–100.0] y sin outliers extremos
        **{
            f"float_col{i+1}": Column(
                float,
                checks=[
                    Check.in_range(0.0, 100.0),
                ],
                nullable=False,
            )
            for i in range(4)
        },
        # 4. Categóricos (object) con valores A-D
        **{
            f"cat_col{i+1}": Column(
                object,
                checks=[
                    Check(lambda s: set(s.unique()) <= {"A", "B", "C", "D"}, element_wise=False,
                          error="Categoría inválida"),
                ],
                nullable=False,
            )
            for i in range(4)
        },
        # 5. Datetimes (naive) en rango real
        "datetime_col1": Column(
            DateTime,
            checks=[Check.in_range(min_dt1, max_dt1)],
            nullable=False,
        ),
        "datetime_col2": Column(
            DateTime,
            checks=[Check.in_range(min_dt2, max_dt2)],
            nullable=False,
        ),
        # 6. Booleanos sin nulos
        **{
            f"bool_col{i+1}": Column(bool, nullable=False)
            for i in range(4)
        },
    },
    strict=True,
    coerce=True,
)

In [6]:
N_THREADS: int = os.cpu_count() or 1

# Decorator to medir tiempos
def timeit(func):
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        t1 = time.perf_counter()
        print(f"{func.__doc__}: {t1 - t0:.3f} s")
        return result
    return wrapper

# Funciones de validación usando Pandera (suponiendo `schema` y `df` definidos)
@timeit
def validate_full():
    """Validación del DataFrame completo de una vez"""
    schema.validate(df, lazy=True)

@timeit
def validate_sequential():
    """Validación por chunks de forma secuencial"""
    chunks = np.array_split(df, N_THREADS)
    for chunk in chunks:
        schema.validate(pd.DataFrame(chunk, columns=df.columns))

@timeit
def validate_concurrent():
    """Validación por chunks usando Joblib en paralelo (CPU-bound)"""
    chunks = np.array_split(df, N_THREADS)
    Parallel(n_jobs=N_THREADS, backend='loky')(
        delayed(schema.validate)(pd.DataFrame(chunk, columns=df.columns)) for chunk in chunks
    )

In [7]:
# Ejecutar las pruebas
validate_full()
validate_sequential()
validate_concurrent()

Validación del DataFrame completo de una vez: 2.157 s
Validación por chunks de forma secuencial: 2.443 s
Validación por chunks usando Joblib en paralelo (CPU-bound): 1.261 s
