In [1]:
import sys
from pathlib import Path

sys.path.append(str(Path.cwd().parent))
import itertools

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline

# 1) Define el directorio raíz del proyecto y el subdirectorio 'data'
root_dir = Path().cwd().parent
data_dir = root_dir / 'data'

# Cargar los conjuntos de datos de entrenamiento y prueba
train_set = pd.read_parquet(data_dir / 'train.parquet')
test_set = pd.read_parquet(data_dir / 'test.parquet')

# 2️⃣ Definir grupos de variables
continuous_vars = ['cont1', 'cont2', 'cont3', 'cont4']
discrete_vars = ['disc1', 'disc2']
categorical_vars = ['cat1', 'cat2']

# 3️⃣ Convertir tipos correctamente
for dataset in [train_set, test_set]:
    dataset[discrete_vars] = dataset[discrete_vars].astype('object')
    dataset[categorical_vars] = dataset[categorical_vars].astype('object')

# Separar las características y la variable objetivo
X_train = train_set.loc[
    :, [var for var in train_set.columns if var not in ['id', 'target']]
]
y_train = train_set['target']
X_test = test_set.loc[
    :, [var for var in test_set.columns if var not in ['id', 'target']]
]
y_test = test_set['target']

In [2]:
import hashlib
import warnings

from feature_engine.discretisation import EqualWidthDiscretiser
from feature_engine.encoding import OneHotEncoder, OrdinalEncoder
from feature_engine.wrappers import SklearnTransformerWrapper
from joblib import Parallel, delayed
from sklearn.base import clone
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import make_scorer, roc_auc_score
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import PolynomialFeatures

warnings.simplefilter('ignore')

In [3]:
# Diccionario de transformaciones posibles para cada tipo de variable
transformaciones = {
    'continuous': [
        {'type': 'raw'},
        {'type': 'discretize', 'bins': 3, 'encode': False},
        {'type': 'discretize', 'bins': 3, 'encode': True},
        {'type': 'discretize', 'bins': 5, 'encode': False},
        {'type': 'discretize', 'bins': 5, 'encode': True},
        {'type': 'discretize', 'bins': 7, 'encode': False},
        {'type': 'discretize', 'bins': 7, 'encode': True},
        {'type': 'poly', 'degree': 2},
        {'type': 'poly', 'degree': 3},
        {'type': 'poly', 'degree': 4},
    ],
    'discrete': [{'strategy': 'onehot'}, {'strategy': 'ordinal'}],
    'categorical': [{'strategy': 'onehot'}, {'strategy': 'ordinal'}],
}


# Generador de combinaciones de transformaciones
def generar_combinaciones():
    """Genera todas las combinaciones posibles de transformaciones para variables continuas, discretas y categóricas.
    """
    for combo in itertools.product(
        transformaciones['continuous'],
        transformaciones['discrete'],
        transformaciones['categorical'],
    ):
        yield combo


# Construcción de un pipeline a partir de una combinación
def construir_pipeline(
    combo: tuple[dict[str, object], dict[str, object], dict[str, object]],
) -> Pipeline:
    """Construye un pipeline de preprocesamiento según la combinación de transformaciones seleccionada.

    Args:
        combo (tuple): Tupla con las transformaciones para cada tipo de variable.

    Returns:
        sklearn.pipeline.Pipeline: Pipeline configurado.

    """
    cont, disc, cat = combo
    steps = list()
    # Transformación para variables continuas
    if cont['type'] == 'discretize':
        steps.append(
            (
                'discretizer',
                EqualWidthDiscretiser(
                    variables=continuous_vars,
                    bins=cont['bins'],
                    return_object=cont['encode'],
                ),
            )
        )
        if cont['encode']:
            # Si se requiere codificación tras discretizar
            steps.append(
                (
                    'encoder_cont',
                    OneHotEncoder(variables=continuous_vars)
                    if cont.get('encode_strategy', 'onehot') == 'onehot'
                    else OrdinalEncoder(variables=continuous_vars),
                )
            )
    elif cont['type'] == 'poly':
        steps.append(
            (
                'poly',
                SklearnTransformerWrapper(
                    PolynomialFeatures(degree=cont['degree']), variables=continuous_vars
                ),
            )
        )
    # Transformación para variables discretas
    steps.append(
        (
            'encoder_disc',
            OneHotEncoder(variables=discrete_vars)
            if disc['strategy'] == 'onehot'
            else OrdinalEncoder(variables=discrete_vars),
        )
    )
    # Transformación para variables categóricas
    steps.append(
        (
            'encoder_cat',
            OneHotEncoder(variables=categorical_vars)
            if cat['strategy'] == 'onehot'
            else OrdinalEncoder(variables=categorical_vars),
        )
    )
    return Pipeline(steps)

In [4]:
# Diccionario para almacenar los pipelines generados
pipelines_combinatoria = dict()
for combo in generar_combinaciones():
    try:
        pipe = construir_pipeline(combo)
        # Generar hash único para la configuración
        config_str = str(combo).encode('utf-8')
        hash_id = hashlib.md5(config_str).hexdigest()[:12]
        pipelines_combinatoria[f'pipe_{hash_id}'] = pipe
    except Exception as e:
        print(f'Error en combinación {combo}: {str(e)}')
        continue

# Asegurar que todos los pipelines tengan un clasificador al final
for pipe_id in pipelines_combinatoria:
    if 'classifier' not in pipelines_combinatoria[pipe_id].named_steps:
        pipelines_combinatoria[pipe_id].steps.append(
            ('classifier', RandomForestClassifier(random_state=42, n_jobs=-1))
        )

In [5]:
# Definición de métricas de evaluación
scorers = {'roc_auc': make_scorer(roc_auc_score, greater_is_better=True)}


# Función para evaluar un pipeline individual
def evaluar_pipeline(
    pipe_id: str, pipeline: Pipeline, X: pd.DataFrame, y: np.ndarray
) -> dict:
    """Evalúa un pipeline usando validación cruzada y retorna métricas principales.

    Args:
        pipe_id (str): Identificador del pipeline.
        pipeline (Pipeline): Pipeline a evaluar.
        X (DataFrame): Datos de entrada.
        y (array): Etiquetas.

    Returns:
        dict: Resultados de evaluación.

    """
    try:
        current_pipe = clone(pipeline)
        # Evaluar ROC-AUC
        roc_scores = cross_val_score(
            current_pipe, X, y, cv=5, scoring=scorers['roc_auc'], n_jobs=-1
        )
        return {
            'hash_id': pipe_id,
            'roc_auc_mean': np.mean(roc_scores),
            'roc_auc_std': np.std(roc_scores),
        }
    except Exception as e:
        print(f'Error en {pipe_id}: {str(e)}')
        return {'hash_id': pipe_id, 'roc_auc_mean': np.nan, 'roc_auc_std': np.nan}

In [8]:
# Ejemplo conceptual de procesamiento por lotes
batch_size = 13
resultados_totales = list()
items_a_procesar = list(pipelines_combinatoria.items())

for i in range(0, len(items_a_procesar), batch_size):
    lote_actual = items_a_procesar[i : i + batch_size]
    print(f'Procesando lote {i // batch_size + 1}...')
    resultados_lote = Parallel(n_jobs=-1, verbose=1, backend='loky')(
        delayed(evaluar_pipeline)(pipe_id, pipeline, X_train, y_train)
        for pipe_id, pipeline in lote_actual
    )
    resultados_totales.extend(resultados_lote)

Procesando lote 1...


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.
[Parallel(n_jobs=-1)]: Done   6 out of  13 | elapsed:   31.1s remaining:   36.3s
[Parallel(n_jobs=-1)]: Done  13 out of  13 | elapsed:   37.9s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.


Procesando lote 2...


[Parallel(n_jobs=-1)]: Done   6 out of  13 | elapsed:   37.6s remaining:   43.9s
[Parallel(n_jobs=-1)]: Done  13 out of  13 | elapsed:   43.0s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.


Procesando lote 3...


[Parallel(n_jobs=-1)]: Done   6 out of  13 | elapsed:  2.0min remaining:  2.3min


Procesando lote 4...


[Parallel(n_jobs=-1)]: Done  13 out of  13 | elapsed:  3.1min finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.


In [9]:
# Crear pipelines de solo preprocesamiento (sin clasificador)
pipelines_preprocesamiento = {
    pipe_id: Pipeline(pipeline.steps[:-1])  # Excluir el clasificador
    for pipe_id, pipeline in pipelines_combinatoria.items()
}

# Evaluar resultados y seleccionar el mejor pipeline
resultados_df = pd.DataFrame(resultados_totales).sort_values(
    'roc_auc_mean', ascending=False
)
mejor_pipeline_id = resultados_df.iloc[0]['hash_id']
mejor_pipeline = pipelines_preprocesamiento[mejor_pipeline_id]
mejor_pipeline

In [21]:
# Transformar los datos con el mejor preprocesamiento
X_train_transformed = mejor_pipeline.fit_transform(X_train, y_train)
X_test_transformed = mejor_pipeline.transform(X_test)

# Entrenar modelo final
modelo_final = RandomForestClassifier(random_state=42, n_jobs=-1)
modelo_final.fit(X_train_transformed, y_train)

# Evaluar desempeño final
print(
    f'ROC-AUC en train: {roc_auc_score(y_train, modelo_final.predict_proba(X_train_transformed)[:, 1]):.4f}'
)
print(
    f'ROC-AUC en test: {roc_auc_score(y_test, modelo_final.predict_proba(X_test_transformed)[:, 1]):.4f}'
)

ROC-AUC en train: 1.0000
ROC-AUC en test: 0.9088


In [27]:
# Guardar el pipeline y el modelo final
import joblib

pipeline_dir = root_dir / 'pipelines'

if not pipeline_dir.exists():
    pipeline_dir.mkdir(parents=True, exist_ok=True)

joblib.dump(value=mejor_pipeline, filename=pipeline_dir / 'best_pipeline.pkl')

['/home/lynn/Documentos/development/scripts-notebooks/feast-aws-deepchecks/pipelines/best_pipeline.pkl']