### Industrialización del modelo

#### Imports

In [2]:
import kfp
from kfp.components import func_to_container_op
import kfp.components as comp

Primero se definen las dependencias que son necesarias en cada paso.

In [3]:
import_packages = ['pandas', 'sklearn', 'mlflow', 'codecarbon', 'numpy', 'lightgbm']

El pipeline se divide en 3 pasos
1. Preprocesamiento de datos
2. Entrenamiento
3. Evaluación

#### Preprocesamiento de datos

In [5]:
def load_and_preprocess_data(file_path : comp.InputPath() , train_output_csv: comp.OutputPath()):
    from src.data_process import DataStorage, FeaturesGenerator
  
    # Se leen y se preprocesan los datos
    data_storage = DataStorage(file_path)
    features_generator = FeaturesGenerator(data_storage=data_storage)
    train_data = features_generator.generate_features(data_storage.df_data)
    train_data.to_csv(train_output_csv)     


In [6]:
load_and_preprocess_data_op = func_to_container_op(load_and_preprocess_data,packages_to_install = import_packages)

#### Entrenamiento

In [9]:
def train_data(train_path: comp.InputPath(),  model_id : comp.OutputPath('GBDTModel')):
    import mlflow
    import pandas as pd
    from codecarbon import EmissionsTracker
    import lightgbm as lgb

    train_data = pd.read_csv(train_path)

    # Se definen parametros de preprocesado
    params = {
        "learning_rate" : .1,
        "max_depth" : 10,
        "n_estimators" : 500,
        "num_leaves" : 31
    }

    # Se inicializa el tracker de emisiones
    tracker = EmissionsTracker()
    # Se define el modelo en cuestión con los parámetros
    my_model = lgb.LGBMRegressor(**params)
    # Se inicializa el tracker y se entrena
    tracker.start()
    my_model.fit(train_data.drop(columns=["target"]), train_data["target"])
    emissions = tracker.stop()


    # Nos conectamos a nuestro servidor de MLFlow y se crea un nuevo experimento
    mlflow.set_tracking_uri(uri=MLFLOW_SERVER_URL)
    mlflow.set_experiment("Enefit-GBDT")

    # Se registra el experimento en MLFlow
    with mlflow.start_run():
        # Se juntan los parametros de preprocesado con los de entrenamiento
        mlflow.log_params(params)
        # Se guardan también las emisiones
        mlflow.log_metric("emissions", emissions)
        mlflow.set_tag("GBDT experiment", "First experiment")
        model_info = mlflow.lightgbm.log_model(
            lgb_model=my_model,
            artifact_path="enefit_model",
            input_example=train_data.drop(columns=["target"]),
            registered_model_name="enefit-lgbt-experiment"
        )

    current_experiment=dict(mlflow.get_experiment_by_name("Enefit-GBDT"))
    model_id = current_experiment['experiment_id']


     

In [10]:
train_data_op = func_to_container_op(train_data, packages_to_install= import_packages)

#### Evaluación

In [11]:
def eval_data(train_path: comp.InputPath(), model_id: comp.InputPath()):

    import mlflow
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import TimeSeriesSplit, cross_val_score
    import lightgbm as lgb

    train_data = pd.read_csv(train_path)


    X, y = train_data.drop(columns=["target"]), train_data["target"]

    # Se crea un Split para series temporales
    tsvc = TimeSeriesSplit(n_splits=6)
    # Se definen los parametros
    params = {
        "learning_rate" : .1,
        "max_depth" : 10,
        "n_estimators" : 500,
        "num_leaves" : 31
    }
    # Se ejecuta la validación cruzada para series temporales
    scores = cross_val_score(lgb.LGBMRegressor(**params), X,y,cv=tsvc, scoring="neg_mean_absolute_error")

    # Se hace la media de las metricas y se multiplica por -1 porque la librería tiene implementada la metrica en negativo: neg_mean_absolute_error
    mean_score = np.mean(scores)*-1
    with mlflow.start_run(run_id=model_id) as run:
        mlflow.log_metric("mean_absolute_error", mean_score)
    print(f"Mean_score: {mean_score}")
    


In [12]:
eval_data_op = func_to_container_op(eval_data, packages_to_install= import_packages)

####  Se implementa el pipeline

In [13]:
def prod_pipeline(url):
    load_and_preprocess_data_task = load_and_preprocess_data_op(file = url)   
    train_eval_task = train_data_op(train = load_and_preprocess_data_task.outputs['train_output_csv'])
    eval_data_task = eval_data_op(train = load_and_preprocess_data_task.outputs['train_output_csv'],model = train_eval_task.output)
    

In [None]:

client = kfp.Client() # change arguments accordingly

# Running the pipeline
client.create_run_from_pipeline_func(
    prod_pipeline,
    arguments={
        'url': '"../../data"'
    })