In [1]:
import kfp
from kfp import dsl
from kfp.components import (
    InputPath,
    InputTextFile,
    OutputPath,
    OutputTextFile,
    func_to_container_op,
)

import pandas as pd
from typing import NamedTuple

import sys

sys.path.insert(0, "..")
from constants import NAMESPACE, HOST, NAMESPACE
from utils import get_session_cookie, get_or_create_experiment, get_or_create_pipeline

In [2]:
# Where all the runs belong to the pipeline reside in
EXPERIMENT_NAME = "mle-3-house-price-training"

## Define pipeline components

In [3]:
# The first component to download data, train-test split
# and then dump all the data for downstream components to use
def prepare_data(
    url: str,
    X_train_path: OutputPath("PKL"),
    y_train_path: OutputPath("PKL"),
    X_val_path: OutputPath("PKL"),
    y_val_path: OutputPath("PKL"),
    X_test_path: OutputPath("PKL"),
    y_test_path: OutputPath("PKL"),
):
    import pandas as pd
    import wget
    from sklearn.model_selection import train_test_split
    import joblib

    # Download housing.csv to local
    wget.download(url)

    # Create X and y
    df = pd.read_csv("housing.csv")
    X = df.drop(columns=["price"])
    y = df["price"]

    # Create train and test set
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.1, random_state=42
    )

    # Continue to split train set into train and validation sets
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.1, random_state=42
    )

    # Dump data to pkl for downstream components to use
    joblib.dump(X_train, X_train_path)
    joblib.dump(y_train, y_train_path)
    joblib.dump(X_val, X_val_path)
    joblib.dump(y_val, y_val_path)
    joblib.dump(X_test, X_test_path)
    joblib.dump(y_test, y_test_path)


# Instead of using create_component_from_func,
# you can use this instead
prepare_data_op = func_to_container_op(
    func=prepare_data,
    packages_to_install=[
        "scikit-learn==1.0.2",
        "joblib==1.1.0",
        "pandas==1.3.5",
        "wget==3.2",
    ],
)

In [4]:
# The 2nd component receives outputs from the 1st component
# and train
def train(
    X_train_path: InputPath("PKL"),
    y_train_path: InputPath("PKL"),
    X_val_path: InputPath("PKL"),
    y_val_path: InputPath("PKL"),
    clf_path: OutputPath("Model"),
):
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.pipeline import Pipeline
    from sklearn.linear_model import LinearRegression
    from sklearn.compose import ColumnTransformer
    from sklearn.metrics import r2_score
    import joblib

    # Load data from the previous steps
    X_train = joblib.load(X_train_path)
    y_train = joblib.load(y_train_path)
    X_val = joblib.load(X_val_path)
    y_val = joblib.load(y_val_path)

    # Do some feature engineering tasks
    categorical_features = X_train.loc[:, X_train.dtypes == object].columns

    categorical_transformer = OneHotEncoder()

    preprocessor = ColumnTransformer(
        transformers=[
            ("cat", categorical_transformer, categorical_features),
        ],
        remainder="passthrough",
    )

    # Define the sklearn pipeline and fit it
    clf = Pipeline(
        steps=[("preprocessor", preprocessor), ("regressor", LinearRegression())]
    )

    clf.fit(X_train, y_train)

    # Make prediction on the val data
    y_val_pred = clf.predict(X_val)
    # Evaluate on the val data
    print("r2_score: ", r2_score(y_val, y_val_pred))

    joblib.dump(clf, clf_path)


train_op = func_to_container_op(
    func=train,
    packages_to_install=["scikit-learn==1.0.2", "joblib==1.1.0", "pandas==1.3.5"],
)

In [5]:
# The 3rd component receives outputs from the 2nd component
# in combination with prediction data from the 1st component
# to evaluate the model
def evaluate(
    X_test_path: InputPath("PKL"),
    y_test_path: InputPath("PKL"),
    clf_path: InputPath("Model"),
    y_test_pred_path: OutputPath("PKL"),
) -> NamedTuple("Outputs", [("mlpipeline_metrics", "Metrics"),]):
    import joblib
    from sklearn.metrics import r2_score
    import json

    # load data
    X_test = joblib.load(X_test_path)
    y_test = joblib.load(y_test_path)

    # load model
    clf = joblib.load(clf_path)

    # make prediction on the test data
    y_test_pred = clf.predict(X_test)

    joblib.dump(y_test_pred, y_test_pred_path)

    # evaluate on the test data
    metrics = {
        "metrics": [
            {
                "name": "r2_score",  # The name of the metric. Visualized as the column name in the runs table.
                "numberValue": r2_score(
                    y_test, y_test_pred
                ),  # The value of the metric. Must be a numeric value.
                "format": "RAW",  # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
            }
        ]
    }
    return [json.dumps(metrics)]


evaluate_op = func_to_container_op(
    func=evaluate,
    packages_to_install=["scikit-learn==1.0.2", "joblib==1.1.0", "pandas==1.3.5"],
)

In [6]:
# The final component visualize evaluation metrics
def visualize(
    X_test_path: InputPath("PKL"),
    y_test_path: InputPath("PKL"),
    y_test_pred_path: InputPath("PKL"),
    mlpipeline_ui_metadata_path: kfp.components.OutputPath(),
):
    import joblib
    import matplotlib.pyplot as plt
    import base64
    from io import BytesIO
    import json

    # Load data from the previous step
    X_test = joblib.load(X_test_path)
    y_test = joblib.load(y_test_path)
    y_test_pred = joblib.load(y_test_pred_path)

    ncols = 4
    nrows = 3

    fig, axs = plt.subplots(
        ncols=ncols, nrows=nrows, figsize=(10, 5), constrained_layout=True
    )

    for row in range(nrows):
        for col in range(ncols):
            # Corresponding feature index to this subplot
            feature_index = row * nrows + col
            axs[row, col].scatter(X_test.iloc[:, feature_index], y_test, color="red")
            axs[row, col].scatter(
                X_test.iloc[:, feature_index], y_test_pred, color="blue"
            )
            axs[row, col].set_title(X_test.columns[feature_index])

    fig.suptitle("Test data")

    # Ref: https://stackoverflow.com/questions/48717794/matplotlib-embed-figures-in-auto-generated-html
    tmpfile = BytesIO()
    fig.savefig(tmpfile, format="png")
    encoded = base64.b64encode(tmpfile.getvalue()).decode("utf-8")
    html = "<img src='data:image/png;base64,{}'>".format(encoded)

    with open("test.html", "w") as f:
        f.write(html)

    metadata = {
        "outputs": [
            {
                "type": "web-app",
                "storage": "inline",
                "source": html,
            }
        ]
    }

    with open(mlpipeline_ui_metadata_path, "w") as metadata_file:
        json.dump(metadata, metadata_file)


visualize_op = func_to_container_op(
    func=visualize,
    packages_to_install=["matplotlib==3.5.1", "joblib==1.1.0", "pandas==1.3.5"],
)

## Define some pipelines

In [7]:
@dsl.pipeline(
    name="House price training", description="Training models to predict house price."
)
def house_price_training_pipeline(url):
    # A sample pipeline showing how to pass data (small) between components.
    prepare_data_task = prepare_data_op(url=url)
    print("***")
    print("prepare_data_task.outputs:")
    print(prepare_data_task.outputs)
    train_task = train_op(
        x_train=prepare_data_task.outputs["X_train"],
        y_train=prepare_data_task.outputs["y_train"],
        x_val=prepare_data_task.outputs["X_val"],
        y_val=prepare_data_task.outputs["y_val"],
    )
    print("***")
    print("train_task.outputs:")
    print(train_task.outputs)
    evaluate_task = evaluate_op(
        x_test=prepare_data_task.outputs["X_test"],
        y_test=prepare_data_task.outputs["y_test"],
        clf=train_task.outputs["clf"],
    )
    visualize_task = visualize_op(
        x_test=prepare_data_task.outputs["X_test"],
        y_test=prepare_data_task.outputs["y_test"],
        y_test_pred=evaluate_task.outputs["y_test_pred"],
    )

## Run the pipelines

In [8]:
# Get the token to authenticate to the `ml-pipeline` service
session_cookie = get_session_cookie()

# Initialize the client
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

In [9]:
client.create_run_from_pipeline_func(
    house_price_training_pipeline,
    arguments={
        "url": "https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv"
    },
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

***
prepare_data_task.outputs:
{'X_train': {{pipelineparam:op=prepare-data;name=X_train}}, 'y_train': {{pipelineparam:op=prepare-data;name=y_train}}, 'X_val': {{pipelineparam:op=prepare-data;name=X_val}}, 'y_val': {{pipelineparam:op=prepare-data;name=y_val}}, 'X_test': {{pipelineparam:op=prepare-data;name=X_test}}, 'y_test': {{pipelineparam:op=prepare-data;name=y_test}}, 'x_train': {{pipelineparam:op=prepare-data;name=X_train}}, 'x_val': {{pipelineparam:op=prepare-data;name=X_val}}, 'x_test': {{pipelineparam:op=prepare-data;name=X_test}}}
***
train_task.outputs:
{'clf': {{pipelineparam:op=train;name=clf}}}


RunPipelineResult(run_id=2dd4a45d-8f12-4869-a565-b7b5cbc349c5)

## Compile the pipelines

In [10]:
# Define the compiled pipeline version, each time
# you change the pipeline, change the version also
PIPELINE_VERSION = "0.0.5"
PIPELINE_NAME = "house_price_training_pipeline"
PIPELINE_DESCRIPTION = "A pipeline to train models for predicting house price"

# Define the name of the compiled pipeline
pipeline_package_path = (
    f"../../compiled_pipelines/{PIPELINE_NAME}_{PIPELINE_VERSION}.yaml"
)

# Compile the pipeline into a YAML file, you will see it
# is an Argo Workflow object
kfp.compiler.Compiler().compile(
    pipeline_func=house_price_training_pipeline,
    package_path=pipeline_package_path,
)

***
prepare_data_task.outputs:
{'X_train': {{pipelineparam:op=prepare-data;name=X_train}}, 'y_train': {{pipelineparam:op=prepare-data;name=y_train}}, 'X_val': {{pipelineparam:op=prepare-data;name=X_val}}, 'y_val': {{pipelineparam:op=prepare-data;name=y_val}}, 'X_test': {{pipelineparam:op=prepare-data;name=X_test}}, 'y_test': {{pipelineparam:op=prepare-data;name=y_test}}, 'x_train': {{pipelineparam:op=prepare-data;name=X_train}}, 'x_val': {{pipelineparam:op=prepare-data;name=X_val}}, 'x_test': {{pipelineparam:op=prepare-data;name=X_test}}}
***
train_task.outputs:
{'clf': {{pipelineparam:op=train;name=clf}}}


## Run the pre-compiled pipelines

In [11]:
# Get the existing experiment or create a new one if not exist
experiment = get_or_create_experiment(client, name=EXPERIMENT_NAME, namespace=NAMESPACE)

# Get or create a pipeline to save all runs if not exist
pipeline = get_or_create_pipeline(
    client,
    pipeline_name=PIPELINE_NAME,
    pipeline_package_path=pipeline_package_path,
    version=PIPELINE_VERSION,
    pipeline_description=PIPELINE_DESCRIPTION,
)

In [12]:
from datetime import datetime

# Run from the compiled pipeline
now = datetime.now().strftime("%Y%m%d%H%M%S")  # Get the current time to version the job

# Read the docs here for all possible args
# https://github.com/kubeflow/pipelines/blob/1.8.0/sdk/python/kfp/_client.py
client.run_pipeline(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME}-{PIPELINE_VERSION}-{now}",
    version_id=pipeline.id,
    params={
        "url": "https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv"
    },
)

{'created_at': datetime.datetime(2024, 8, 28, 14, 23, 22, tzinfo=tzutc()),
 'description': None,
 'error': None,
 'finished_at': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=tzutc()),
 'id': 'cb0352de-6219-4721-b697-cb7bed60452b',
 'metrics': None,
 'name': 'house_price_training_pipeline-0.0.5-20240828212322',
 'pipeline_spec': {'parameters': [{'name': 'url',
                                   'value': 'https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv'}],
                   'pipeline_id': None,
                   'pipeline_manifest': None,
                   'pipeline_name': None,
                   'runtime_config': None,
                   'workflow_manifest': '{"kind":"Workflow","apiVersion":"argoproj.io/v1alpha1","metadata":{"generateName":"house-price-training-","creationTimestamp":null,"labels":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.22"},"annotations":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.22","pipelines.kubeflow.org/pipeline_co

## Create a recurring run

In [13]:
# Dont forget to disable recurring run in case you don't need anymore
client.create_recurring_run(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME}-{PIPELINE_VERSION}-{now}",
    cron_expression="0 0 * * * *",  # hourly
    version_id=pipeline.id,
    params={
        "url": "https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv"
    },
)

{'created_at': datetime.datetime(2024, 8, 28, 14, 23, 22, tzinfo=tzutc()),
 'description': None,
 'enabled': True,
 'error': None,
 'id': 'd312da6d-3b10-4c6c-aad0-5edd972835b5',
 'max_concurrency': '1',
 'mode': None,
 'name': 'house_price_training_pipeline-0.0.5-20240828212322',
 'no_catchup': None,
 'pipeline_spec': {'parameters': [{'name': 'url',
                                   'value': 'https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv'}],
                   'pipeline_id': None,
                   'pipeline_manifest': None,
                   'pipeline_name': None,
                   'runtime_config': None,
                   'workflow_manifest': '{"kind":"Workflow","apiVersion":"argoproj.io/v1alpha1","metadata":{"generateName":"house-price-training-","creationTimestamp":null,"labels":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.22"},"annotations":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.22","pipelines.kubeflow.org/pipeline_compilatio