# Simple pipeline with MLflow model tracking

An experiment with iris dataset. In general, to use MLflow in a Kubeflow Pipeline, the necessary environment variables should be passed to the containers using the MLflow logic. This is implemented in the `add_env_vars_to_tasks` function. 
  
MLflow also needs to know the local URI for MLflow server, which is available in the notebook (and in containers through the mentioned function) using `os.environ["MLFLOW_URI"]`.

This notebook defines a simple pipeline for preprocessing data, training and logging model, and prediction on test data. It also shows one way to handle MLflow experiment info inside the notebook and passing it between experiments - by saving a run dictionary.

In [None]:
!pip install -q kfp[all]==2.0.1

In [None]:
import os

import kfp.dsl as dsl
from kfp.client import Client
from kfp.dsl import Dataset, Input, Model, Output, Artifact
from kfp.kubernetes import use_secret_as_env

In [None]:
def add_env_vars_to_tasks(task_list: list[dsl.PipelineTask]) -> None:
    """Adds environment variables for MinIO to the MLflow tasks"""
    for task in task_list:
        task.set_env_variable("MLFLOW_URI", os.environ["MLFLOW_URI"])
        task.set_env_variable("AWS_ENDPOINT_URL", os.environ["AWS_ENDPOINT_URL"])
        use_secret_as_env(
            task,
            secret_name="s3creds",
            secret_key_to_env={
                "AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID",
                "AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY",
            },
        )

## Preprocess data

In [None]:
@dsl.component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn"],
    base_image="python:3.11",
)
def preprocess_data(
    x_train_df: Output[Dataset],
    y_train_df: Output[Dataset],
    x_test_df: Output[Dataset],
    y_test_df: Output[Dataset],
    test_size: float = 0.2,
    seed: int = 42,
):
    """Reads iris data and writes it to pipeline artifacts as parquet."""
    from sklearn import datasets
    from sklearn.model_selection import train_test_split

    df = datasets.load_iris(as_frame=True)
    x = df.data
    y = df.target.to_frame()

    x_train, x_test, y_train, y_test = train_test_split(
        x, y, test_size=test_size, random_state=seed
    )

    for obj, artifact in zip(
        (x_train, x_test, y_train, y_test),
        (x_train_df, x_test_df, y_train_df, y_test_df)
    ):
        obj.to_parquet(artifact.path)

## Train and log model

As an example of how to use MLflow with pipelines, this notebook saves MLflow run parameters as a dict. This dict can be loaded from other KFP tasks.

In [None]:
@dsl.component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.11",
)
def train_and_log_model(
    x_train: Input[Dataset],
    y_train: Input[Dataset],
    seed: int = 42,
) -> dict:
    
    import os

    import mlflow
    import pandas as pd
    from mlflow.models import infer_signature
    from sklearn.linear_model import LogisticRegression


    x_train = pd.read_parquet(x_train.path)
    y_train = pd.read_parquet(y_train.path)

    # Define the model hyperparameters
    params = {
        "solver": "lbfgs",
        "max_iter": 1000,
        "multi_class": "auto",
        "random_state": seed,
    }

    # Train the model
    lr = LogisticRegression(**params)
    lr.fit(x_train, y_train)

    # Set tracking server URI for logging
    mlflow.set_tracking_uri(uri=os.environ["MLFLOW_URI"])

    # Create MLflow Experiment name
    mlflow.set_experiment("MLflow Quickstart with KFP")

    # Start an MLflow run
    with mlflow.start_run() as run:
        # Log the hyperparameters
        mlflow.log_params(params)

        # Set a tag that we can use to remind ourselves what this run was for
        mlflow.set_tag("Training Info", "Basic LR model for iris data, KFP")

        # Infer the model signature
        signature = infer_signature(x_train, lr.predict(x_train))

        # Log the model
        model_info = mlflow.sklearn.log_model(
            sk_model=lr,
            artifact_path="iris-model",
            signature=signature,
            input_example=x_train,
            registered_model_name="tracking-quickstart-pipeline",
        )
    
    # Save run as dict
    return run.to_dictionary()

## Load the model from MLflow and make predictions

This component loads model saved to MLflow based on the run ID. Requires the dictionary with MLflow run information as an input.

In [None]:
@dsl.component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn", "mlflow", "boto3"],
    base_image="python:3.11",
)
def predict(
    x_test: Input[Dataset],
    y_test: Input[Dataset],
    mlflow_run: dict,
):
    import os

    import mlflow
    import pandas as pd
    from sklearn.metrics import accuracy_score


    mlflow.set_tracking_uri(uri=os.environ["MLFLOW_URI"])
    
    # Load trained model
    run_id = mlflow_run["info"]["run_id"]
    model_path = f"runs:/{run_id}/iris-model"  # model name (iris-model) corresponds to artifact path 
    model = mlflow.sklearn.load_model(model_path)

    # Load test data
    x_test = pd.read_parquet(x_test.path)
    y_test = pd.read_parquet(y_test.path)

    # Predict on the test set
    y_pred = model.predict(x_test)

    # Calculate metric
    accuracy = accuracy_score(y_test, y_pred)
    
    with mlflow.start_run(run_id=run_id):
        # Log the loss metric
        mlflow.log_metric("accuracy", accuracy)

## Build and run pipeline

In [None]:
@dsl.pipeline
def simple_pipeline():

    # Step 1: Preprocess the data
    preprocess_data_task = preprocess_data()

    # Step 2: Train the model and add necessary env vars
    train_and_log_model_task = train_and_log_model(
        x_train=preprocess_data_task.outputs['x_train_df'],
        y_train=preprocess_data_task.outputs['y_train_df'],
    )

    # Step 3: Predict on test data
    predict_task = predict(
        x_test=preprocess_data_task.outputs['x_test_df'],
        y_test=preprocess_data_task.outputs['y_test_df'],
        mlflow_run=train_and_log_model_task.output,
    )
    
    # Add env vars
    add_env_vars_to_tasks([train_and_log_model_task, predict_task])


# Initialize the Kubeflow Pipelines client
client = Client()

# Create a new run from the pipeline function
client.create_run_from_pipeline_func(
    simple_pipeline,
    experiment_name="iris-dataset-classification",
    enable_caching=True,
)

# kfp.compiler.Compiler().compile(simple_pipeline, 'simple_pipeline.yaml')