In [1]:
import pandas as pd
import os
from pathlib import Path

from sklearn.preprocessing import RobustScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import f1_score, precision_score, recall_score

A continuaci√≥n, se crea la clase TrainModel, cuya funci√≥n principal es entrenar un modelo de machine learning. Para ello, carga el conjunto de datos de entrenamiento y el conjunto de evaluaci√≥n del desempe√±o, empleando un pipeline de scikit-learn para escalar los datos y entrenar un √°rbol de decisi√≥n. Los m√©todos encargados de realizar estos procesos son los siguientes:

- **load_dataset**: Se encarga de leer el dataset utilizado para entrenamiento.
- **load_test_dataset** Se encarga de leer el dataset utilizado para evaluar el modelo.
- **train_pipeline**: Se encarga de escalar los datos y entrenar el modelo utilizando pipeline de scikit-learn.
- **test_pipeline** Se encarga de evaluar el pipeline.

In [2]:
class TrainModel:
    """
    Clase encargada de entrenar y evaluar modelos de ml
    """
    def __init__(self,train_file_path,test_file_path=None):
        """
        Funci√≥n que inicializa la clase con la ruta el archivo y verifica
        que exista.

        Attributes
        ----------
        file_path: str
            Ruta del archivo csv

        Raise
        FileNotFoundError
            Si el archivo no existe.
        """
        self.train_file_path = Path(train_file_path)
        self.test_file_path = Path(test_file_path) if test_file_path else None
        if not self.train_file_path.exists():
            raise FileNotFoundError(f"No existe la ruta {self.train_file_path}")

        if self.test_file_path and not self.test_file_path.exists():
            raise FileNotFoundError(f"No existe la ruta {self.test_file_path}")
        
        self.data = None
        self.test_data =None
        self.pipeline = None
    
    def load_dataset(self):
        """
        Lee el archivo csv de entrenamiento indicado en el self.train_file_path y lo carga como un
        dataframe de pandas.

        return
        ------
        pd.DataFrame
            DataFrame que contiene los datos leidos del archivo csv.

        Raises
        ------
        pandas.errors.EmptyDataError
            Si el archivo est√° vac√≠o

        pandas.errors.ParserError
            Si el archivo CSV tiene errores de formato o no puede ser parseado correctamente.
        """

        self.data = pd.read_csv(self.train_file_path)
        return self.data
    
    def load_test_dataset(self):

        """
        Carga el dataset de test (si se proporcion√≥).

        return
        ------
        pd.DataFrame
            DataFrame que contiene los datos leidos del archivo csv.

        Raises
        ------
        ValueError
            Si no se proporciona la ruta del dataset de prueba
        
        pandas.errors.EmptyDataError
            Si el archivo est√° vac√≠o

        pandas.errors.ParserError
            Si el archivo CSV tiene errores de formato o no puede ser parseado correctamente.
        """

        if self.test_file_path is None:
            raise ValueError("No se proporcion√≥ la ruta del dataset de test.")
        self.test_data = pd.read_csv(self.test_file_path)

        return self.test_data
    
    def train_pipeline(self, target ='default_12m',parameters = None):
        """
        Entrena un pipeline con RobustScaler y DecisionTreeClassifier

        Attributes
        ----------
        target: str
            Nombre de la columna objetivo, por decfecto es 'defaul_12m'

        parameters: dict
            Diccionario con los mejores par√°metros para entrenar el modelo

        Raise
        -----
        ValueError
            Si la variable target no existe en el dataset

        Return
        ------
        sklearn.pipeline
            pipeline de skalearn entrenada

        """
        if target not in self.data.columns:
            raise ValueError(f"La variable objetivo '{target}' no se encuentra en los datos.")
        
        independientes = self.data.drop(columns=[target],axis=1)
        objetivo = self.data[target]

        parameters = parameters or {}
        pipeline = Pipeline([ 
            ('scaler',RobustScaler()), 
            ('classifier',DecisionTreeClassifier(**parameters)) ])

        pipeline.fit(independientes, objetivo)
        self.pipeline = pipeline
    
        return pipeline
    
    def test_pipeline(self,target = 'default_12m',pos_label=1,print_metrics=True):
        """
        Se encarga de evaluar el modelo entrenado

        Attributes
        ----------
        target: str
            Nombre de la columna objetivo, por decfecto es 'defaul_12m'

        pos_label: int
            Clase que se considera positiva, por defecto es 1

        print_metrics: Booleano
            Indica si desea imprimir m√©tricas de rendimiento
        
        Raises
        ------
        ValueError
            Si los datos de test no est√°n cargados, si el pipeline no est√° entrenado y si 
            la variable objetivo no est√° en los datos de test.

        Return
        ------
        dict
            Diccionario con las m√©tricas de evaluaci√≥n del modelo

        """
        if self.pipeline is None:
            raise ValueError("El pipeline no est√° entrenado. Ejecuta primero 'train_pipeline()'.")
        if self.test_data is None:
            raise ValueError("Los datos de test no est√°n cargados. Ejecuta 'load_test_dataset()'.")
        if target not in self.test_data.columns:
            raise ValueError(f"La variable objetivo '{target}' no se encuentra en los datos de test.")
        
        X_test = self.test_data.drop(columns=[target])
        y_test = self.test_data[target]
        
        y_pred = self.pipeline.predict(X_test)
        f1 = f1_score(y_test, y_pred, pos_label=pos_label)
        precision = precision_score(y_test, y_pred, pos_label=pos_label)
        recall = recall_score(y_test, y_pred, pos_label=pos_label)

        metrics = {
            'f1': f1,
            'precision': precision,
            'recall': recall
            }
        
        if print_metrics:
            print("=== M√âTRICAS DEL MODELO ===")
            print(f"F1-Score: {f1:.4f}")
            print(f"Precisi√≥n: {precision:.4f}")
            print(f"Recall: {recall:.4f}")
        return metrics


Veamos el funcionamieno de la clase TrainModel.

In [3]:
project_root = next(p for p in Path.cwd().parents if (p / 'data').exists())
file_path = lambda file: os.path.join(project_root,'data/processed',file)

train_model = TrainModel(
    train_file_path=file_path('covalto_sme_credit_train.csv'),
    test_file_path=file_path('covalto_sme_credit_test.csv')
)
train_model.load_dataset()
train_model.load_test_dataset()

modelo = train_model.train_pipeline(
    target = 'default_12m',
    parameters = {
        'criterion': 'gini', 
        'max_depth': 2, 
        'min_samples_split': 8, 
        'min_samples_leaf': 19, 
        'max_features': None, 
        'class_weight': None}
)

metrics = train_model.test_pipeline()

=== M√âTRICAS DEL MODELO ===
F1-Score: 0.8000
Precisi√≥n: 0.8000
Recall: 0.8000


In [None]:
import joblib, os

joblib.dump(modelo, os.path.join(project_root, "test_pipeline.pkl"))
print("‚úÖ Pipeline serializado correctamente")


Perfecto, este modelo ofrece un rendimiento satisfactorio. El siguiente paso consiste en crear la clase encargada de generar el bucket llamado mlflow en MinIO, el cual se utilizar√° para almacenar los artefactos generados por MLflow.

In [4]:
import json
import boto3
class MinioMlflowBucketCreator:
    """
    Se conecta a minio y puede crear buckest
    """
    def __init__(self,credential_path):
        """
        """
        self.credential_path = Path(credential_path)
        if not self.credential_path.exists():
            raise FileNotFoundError(f"No existe la ruta {self.credential_path}")
        self.credentials = None
        self.client = None

    def load_minio_credentials(self):
        """
        """
        with open(self.credential_path, 'r') as file:
            self.credentials = json.load(file)
        return self.credentials
    
    def conection_minio(self):
        """
        """
        s3 = boto3.client(
            "s3",
            endpoint_url = self.credentials['endpoint_url'],
            aws_access_key_id = self.credentials['aws_access_key_id'],
            aws_secret_access_key = self.credentials['aws_secret_access_key']
        )
        self.client = s3
        return self.client
    
    def create_bucket(self, bucket_name ='mlflow'):

        """
        """
        if self.client is None:
            raise ConnectionError("Primero debes conectarte a MinIO con conection_minio().")
        try:
            existing_buckets = [b['Name'] for b in self.client.list_buckets().get('Buckets', [])]
            if bucket_name not in existing_buckets:
                self.client.create_bucket(Bucket=bucket_name)
            else:
                pass
        except:
            raise RuntimeError(f"Error al crear el bucket {bucket_name}")



Porbemos el c√≥digo para ver si tenemos exito en la conexi√≥n con minio

In [5]:
project_secrets = next(p for p in Path.cwd().parents if (p / 'secrets').exists())
credential_path = lambda file: os.path.join(project_root,'secrets',file)
minio = MinioMlflowBucketCreator(
    credential_path = credential_path('credentials_minio.json')
)
minio.load_minio_credentials()
minio.conection_minio()
minio.create_bucket(bucket_name='mlflow')

Perfecto, ahora que ya contamos con la clase encargada de crear los buckets en MinIO, debemos proceder a implementar la clase responsable de cargar los modelos entrenados en MLflow y registrarlos.

In [6]:
import mlflow
from mlflow.client import MlflowClient
from datetime import datetime

class MLflowModelRegister:
    """
    """
    def __init__(self,tracking_uri_path):
        """
        """
        self.tracking_uri_path = Path(tracking_uri_path)
        if not self.tracking_uri_path.exists():
            raise FileNotFoundError(f"No existe la ruta {self.tracking_uri_path}")
        self.tracking_uri = None
    
    def load_tracking_uri_mlflow(self):
        """
        """
        with open(self.tracking_uri_path, 'r') as file:
            self.tracking_uri = json.load(file)
        return self
        
    def create_mlflow_experiment(self, experiment_name = "Experimento_1"):
        """
        """
        tracking_uri = self.tracking_uri['tracking_uri']
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        return self

    def log_pipeline(
        self,
        pipeline,
        metrics: dict,
        params: dict,
        model_name: str = "sklearn-pipeline",
        framework: str = "scikit-learn",
        type: str = "classification",
        tags: dict = None
    ):
        run_name = f"{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        with mlflow.start_run(run_name=run_name):
            if params:
                mlflow.log_params(params)                           #Registrar par√°metros

            if metrics:
                mlflow.log_metrics(metrics)                         # Registrar m√©tricas
            
            mlflow.sklearn.log_model(pipeline, artifact_path=model_name)     # Registrar pipeline

            # Registrar tags descriptivos
            mlflow.set_tag("model_name", model_name)
            mlflow.set_tag("framework", framework)
            mlflow.set_tag("type", type)

            # Para tags en diccionarios
            if tags:
                for k, v in tags.items():
                    mlflow.set_tag(k, v)


Veamos si se carga el modelo a mlflow y a minio.

In [11]:
tracking_uri_path = lambda file: os.path.join(project_root,'secrets',file)

mlflow_register = MLflowModelRegister(
    tracking_uri_path = tracking_uri_path ('tracking_uri_mlflow.json')
)


mlflow_register = mlflow_register.load_tracking_uri_mlflow()
mlflow_register = mlflow_register.create_mlflow_experiment(
    experiment_name = "riesgo_crediticio"
)

In [12]:
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://localhost:6000"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minio_123"

mlflow_register = mlflow_register.log_pipeline(
    pipeline = modelo,
    metrics = metrics,
    params={
    'criterion': 'gini', 
    'max_depth': 2, 
    'min_samples_split': 8, 
    'min_samples_leaf': 19, 
    'max_features': None, 
    'class_weight': None},
    model_name = "DecisionTree_CreditRiskModel",
    tags={"author": "Luis Garcia", "use_case": "credit-risk"}
)



üèÉ View run DecisionTree_CreditRiskModel_20251101_185049 at: http://localhost:5000/#/experiments/1/runs/e3c6ceecb6574beb9fb8a0f19acc7f6e
üß™ View experiment at: http://localhost:5000/#/experiments/1


In [None]:
import mlflow
from mlflow.client import MlflowClient
from datetime import datetime

class MLflowModelRegister:
    """
    Configura un cliente para interacturar con MLflow. El prop√≥sito general
    es inicializar un objeto para gestionar experimentos y modelos de MLflow.
    """
    def __init__(self, tracking_uri=None, experiment_name="default_experiment"):

        self.tracking_uri = tracking_uri or os.getenv("MLFLOW_TRACKING_URI", "http://mlflow:5000")
        self.experiment_name = experiment_name

        # Configuraci√≥n de MLflow
        mlflow.set_tracking_uri(self.tracking_uri)
        mlflow.set_experiment(self.experiment_name)
        self.client = MlflowClient(tracking_uri=self.tracking_uri)
        

    def log_pipeline(
        self,
        pipeline,
        metrics: dict,
        params: dict,
        model_name: str = "sklearn-pipeline",
        tags: dict = None
    ):
        run_name = f"{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        with mlflow.start_run(run_name=run_name):
            if params:
                mlflow.log_params(params)

            if metrics:
                mlflow.log_metrics(metrics)
            
            # === Registrar el pipeline ===
            mlflow.set_tag("model_name", model_name)
            mlflow.set_tag("framework", "scikit-learn")
            mlflow.set_tag("type", "classification")

            if tags:
                for k, v in tags.items():
                    mlflow.set_tag(k, v)
            
            mlflow.sklearn.log_model(pipeline, artifact_path="model")
            run_id = mlflow.active_run().info.run_id
        return run_id

    def register_models_in_registry(self, model_names: list):
        """
        Registra los modelos en el Model Registry y los promueve a Production.
        Busca el run m√°s reciente asociado a cada 'model_name'.
        """
        for model_name in model_names:
            try:
                # Buscar el run m√°s reciente por tag
                runs = mlflow.search_runs(
                    filter_string=f"tags.model_name = '{model_name}'",
                    order_by=["start_time DESC"]
                )

                if runs.empty:
                    print(f"No se encontr√≥ ning√∫n run con tag model_name = '{model_name}'")
                    continue

                run_id = runs.iloc[0]["run_id"]
                model_uri = f"runs:/{run_id}/model"

                # Registrar modelo en el Model Registry
                registered_model = mlflow.register_model(
                    model_uri=model_uri,
                    name=model_name
                )

                version = registered_model.version
                print(f"Modelo {model_name} registrado como versi√≥n {version}")

                # Transicionar la versi√≥n a Production
                self.client.transition_model_version_stage(
                    name=model_name,
                    version=version,
                    stage="Production",
                    archive_existing_versions=True
                )

                print(f"Modelo {model_name} versi√≥n {version} promovido a 'Production'")

            except Exception as e:
                print(f"Error registrando o promoviendo {model_name}: {e}")

Probando el c√≥digo

In [None]:
# Registrar el modelo en MLflow
mlflow_logger = MLflowModelRegister(
    tracking_uri="http://localhost:5000",
    experiment_name="riesgo_crediticio"
)

In [None]:
run_id = mlflow_logger.log_pipeline(
    pipeline=modelo ,
    metrics=metrics,
    params={
        'criterion': 'gini', 
        'max_depth': 2, 
        'min_samples_split': 8, 
        'min_samples_leaf': 19, 
        'max_features': None, 
        'class_weight': None},
    model_name="DecisionTreeCreditModel",
    tags={"author": "Luis Garcia", "use_case": "credit-risk"}
)

In [None]:
mlflow_logger.register_models_in_registry(["DecisionTreeCreditModel"])