# Demo KFP pipeline

Install requirements:

In [1]:
%%bash

pip install kfp~=1.8.14

Collecting kfp~=1.8.14
  Downloading kfp-1.8.22.tar.gz (304 kB)
��━━━━━━━━━━[0m [32m304.9/304.9 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m [36m-:--:--[0m
[?25h  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting absl-py<2,>=0.9 (from kfp~=1.8.14)
  Downloading absl_py-1.4.0-py3-none-any.whl.metadata (2.3 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5 (from kfp~=1.8.14)
  Downloading google_api_core-2.24.1-py3-none-any.whl.metadata (3.0 kB)
Collecting google-cloud-storage<3,>=1.20.0 (from kfp~=1.8.14)
  Downloading google_cloud_storage-2.19.0-py2.py3-none-any.whl.metadata (9.1 kB)
Collecting kubernetes<26,>=8.0.0 (from kfp~=1.8.14)
  Downloading

Imports:

In [2]:
import warnings
warnings.filterwarnings("ignore")

import kfp
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Metrics,
    Artifact,
    Model
)

## 1. Connect to client

Run the following to port-forward to the KFP UI:

```sh
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80
```

Now the KFP UI should be reachable at [`http://localhost:8080`](http://localhost:8080).

In [3]:
import kfp

KFP_ENDPOINT = "http://localhost:8080"

client = kfp.Client(host=KFP_ENDPOINT)
# print(client.list_experiments())

## 2. Components

There are different ways to define components in KFP. Here, we use the **@component** decorator to define the components as Python function-based components.

The **@component** annotation converts the function into a factory function that creates pipeline steps that execute this function. This example also specifies the base container image to run you component in.

Pull data component:

In [4]:
@component(
    base_image="python:3.10",
    packages_to_install=["numpy~=1.26.4", "pandas~=1.4.2"],
    output_component_file='components/pull_data_component.yaml',
)
def pull_data(url: str, data: Output[Dataset]):
    """
    Pull data component.
    """
    import pandas as pd

    df = pd.read_csv(url, sep=";")
    df.to_csv(data.path, index=None)

Preprocess component:

In [5]:
@component(
    base_image="python:3.10",
    packages_to_install=["numpy~=1.26.4", "pandas~=1.4.2", "scikit-learn~=1.0.2"],
    output_component_file='components/preprocess_component.yaml',
)
def preprocess(
    data: Input[Dataset],
    scaler_out: Output[Artifact],
    train_set: Output[Dataset],
    test_set: Output[Dataset],
    target: str = "quality",
):
    """
    Preprocess component.
    """
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler

    data = pd.read_csv(data.path)

    # Split the data into training and test sets. (0.75, 0.25) split.
    train, test = train_test_split(data)

    scaler = StandardScaler()

    train[train.drop(target, axis=1).columns] = scaler.fit_transform(train.drop(target, axis=1))
    test[test.drop(target, axis=1).columns] = scaler.transform(test.drop(target, axis=1))

    with open(scaler_out.path, 'wb') as fp:
        pickle.dump(scaler, fp, pickle.HIGHEST_PROTOCOL)

    train.to_csv(train_set.path, index=None)
    test.to_csv(test_set.path, index=None)

Train component:

In [6]:
from typing import NamedTuple

@component(
    base_image="python:3.10",
    packages_to_install=["numpy~=1.26.4", "pandas~=1.4.2", "scikit-learn~=1.0.2", "mlflow~=2.4.1", "boto3~=1.21.0"],
    output_component_file='components/train_component.yaml',
)
def train(
    train_set: Input[Dataset],
    test_set: Input[Dataset],
    saved_model: Output[Model],
    mlflow_experiment_name: str,
    mlflow_tracking_uri: str,
    mlflow_s3_endpoint_url: str,
    model_name: str,
    alpha: float,
    l1_ratio: float,
    target: str = "quality",
) -> NamedTuple("Output", [('storage_uri', str), ('run_id', str),]):
    """
    Train component.
    """
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import ElasticNet
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import mlflow
    import mlflow.sklearn
    import os
    import logging
    import pickle
    from collections import namedtuple

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    def eval_metrics(actual, pred):
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2

    os.environ['MLFLOW_S3_ENDPOINT_URL'] = mlflow_s3_endpoint_url

    # load data
    train = pd.read_csv(train_set.path)
    test = pd.read_csv(test_set.path)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop([target], axis=1)
    test_x = test.drop([target], axis=1)
    train_y = train[[target]]
    test_y = test[[target]]

    logger.info(f"Using MLflow tracking URI: {mlflow_tracking_uri}")
    mlflow.set_tracking_uri(mlflow_tracking_uri)

    logger.info(f"Using MLflow experiment: {mlflow_experiment_name}")
    mlflow.set_experiment(mlflow_experiment_name)

    with mlflow.start_run() as run:

        run_id = run.info.run_id
        logger.info(f"Run ID: {run_id}")

        model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)

        logger.info("Fitting model...")
        model.fit(train_x, train_y)

        logger.info("Predicting...")
        predicted_qualities = model.predict(test_x)

        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        logger.info("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        logger.info("  RMSE: %s" % rmse)
        logger.info("  MAE: %s" % mae)
        logger.info("  R2: %s" % r2)

        logger.info("Logging parameters to MLflow")
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        # save model to mlflow
        logger.info("Logging trained model")
        mlflow.sklearn.log_model(
            model,
            model_name,
            registered_model_name="ElasticnetWineModel",
            serialization_format="pickle"
        )

        logger.info("Logging predictions artifact to MLflow")
        np.save("predictions.npy", predicted_qualities)
        mlflow.log_artifact(
        local_path="predictions.npy", artifact_path="predicted_qualities/"
        )

        # save model as KFP artifact
        logging.info(f"Saving model to: {saved_model.path}")
        with open(saved_model.path, 'wb') as fp:
            pickle.dump(model, fp, pickle.HIGHEST_PROTOCOL)

        # prepare output
        output = namedtuple('Output', ['storage_uri', 'run_id'])

        # return str(mlflow.get_artifact_uri())
        return output(mlflow.get_artifact_uri(), run_id)

Evaluate component:

In [7]:
@component(
    base_image="python:3.10",
    packages_to_install=["numpy", "mlflow~=2.4.1"],
    output_component_file='components/evaluate_component.yaml',
)
def evaluate(
    run_id: str,
    mlflow_tracking_uri: str,
    threshold_metrics: dict
) -> bool:
    """
    Evaluate component: Compares metrics from training with given thresholds.

    Args:
        run_id (string):  MLflow run ID
        mlflow_tracking_uri (string): MLflow tracking URI
        threshold_metrics (dict): Minimum threshold values for each metric
    Returns:
        Bool indicating whether evaluation passed or failed.
    """
    from mlflow.tracking import MlflowClient
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    client = MlflowClient(tracking_uri=mlflow_tracking_uri)
    info = client.get_run(run_id)
    training_metrics = info.data.metrics

    logger.info(f"Training metrics: {training_metrics}")

    # compare the evaluation metrics with the defined thresholds
    for key, value in threshold_metrics.items():
        if key not in training_metrics or training_metrics[key] > value:
            logger.error(f"Metric {key} failed. Evaluation not passed!")
            return False
    return True

Deploy model component:

In [8]:
@component(
    base_image="python:3.9",
    packages_to_install=["kserve==0.11.0"],
    output_component_file='components/deploy_model_component.yaml',
)
def deploy_model(model_name: str, storage_uri: str):
    """
    Deploy the model as an inference service with Kserve.
    """
    import logging
    from kubernetes import client
    from kserve import KServeClient
    from kserve import constants
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    from kubernetes.client import V1ResourceRequirements

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    model_uri = f"{storage_uri}/{model_name}"
    logger.info(f"MODEL URI: {model_uri}")

    namespace = 'kserve-inference'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(
        api_version = api_version,
        kind = constants.KSERVE_KIND,
        metadata = client.V1ObjectMeta(
            name = model_name,
            namespace = namespace,
            annotations = {'sidecar.istio.io/inject':'false'}
        ),
        spec = V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name="kserve-sa",
                min_replicas=1,
                max_replicas = 1,
                sklearn=V1beta1SKLearnSpec(
                    storage_uri=model_uri,
                    resources=V1ResourceRequirements(
                        requests={"cpu": "100m", "memory": "512Mi"},
                        limits={"cpu": "300m", "memory": "512Mi"}
                    )
                ),
            )
        )
    )
    KServe = KServeClient()
    KServe.create(isvc)

Inference component:

In [9]:
@component(
    base_image="python:3.9",  # kserve on python 3.10 comes with a dependency that fails to get installed
    packages_to_install=["kserve==0.11.0", "scikit-learn~=1.0.2"],
    output_component_file='components/inference_component.yaml',
)
def inference(
    model_name: str,
    scaler_in: Input[Artifact]
):
    """
    Test inference.
    """
    from kserve import KServeClient
    import requests
    import pickle
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    namespace = 'kserve-inference'
    
    input_sample = [[5.6, 0.54, 0.04, 1.7, 0.049, 5, 13, 0.9942, 3.72, 0.58, 11.4],
                    [11.3, 0.34, 0.45, 2, 0.082, 6, 15, 0.9988, 2.94, 0.66, 9.2]]

    logger.info(f"Loading standard scaler from: {scaler_in.path}")
    with open(scaler_in.path, 'rb') as fp:
        scaler = pickle.load(fp)

    logger.info(f"Standardizing sample: {scaler_in.path}")
    input_sample = scaler.transform(input_sample)

    # get inference service
    KServe = KServeClient()

    # wait for deployment to be ready
    KServe.get(model_name, namespace=namespace, watch=True, timeout_seconds=120)

    inference_service = KServe.get(model_name, namespace=namespace)
    header = {"Host": f"{model_name}.{namespace}.example.com"}
    is_url = f"http://istio-ingressgateway.istio-system.svc.cluster.local:80/v1/models/{model_name}:predict"
    
    logger.info(f"\nInference service status:\n{inference_service['status']}")
    logger.info(f"\nInference service URL:\n{is_url}\n")

    inference_input = {
        'instances': input_sample.tolist()
    }
    response = requests.post(
        is_url,
        json=inference_input,
        headers=header,
    )
    if response.status_code != 200:
        raise RuntimeError(f"HTTP status code '{response.status_code}': {response.json()}")
    
    logger.info(f"\nPrediction response:\n{response.json()}\n")

## 3. Pipeline

Pipeline definition:

In [10]:
@dsl.pipeline(
      name='demo-pipeline',
      description='An example pipeline that performs addition calculations.',
)
def pipeline(
    url: str,
    target: str,
    mlflow_experiment_name: str,
    mlflow_tracking_uri: str,
    mlflow_s3_endpoint_url: str,
    model_name: str,
    alpha: float,
    l1_ratio: float,
    threshold_metrics: dict,
):
    pull_task = pull_data(url=url)

    preprocess_task = preprocess(data=pull_task.outputs["data"])

    train_task = train(
        train_set=preprocess_task.outputs["train_set"],
        test_set=preprocess_task.outputs["test_set"],
        target=target,
        mlflow_experiment_name=mlflow_experiment_name,
        mlflow_tracking_uri=mlflow_tracking_uri,
        mlflow_s3_endpoint_url=mlflow_s3_endpoint_url,
        model_name=model_name,
        alpha=alpha,
        l1_ratio=l1_ratio
    )
    train_task.apply(use_aws_secret(secret_name="aws-secret"))

    evaluate_trask = evaluate(
        run_id=train_task.outputs["run_id"],
        mlflow_tracking_uri=mlflow_tracking_uri,
        threshold_metrics=threshold_metrics
    )

    eval_passed = evaluate_trask.output

    with dsl.Condition(eval_passed == "true"):
        deploy_model_task = deploy_model(
            model_name=model_name,
            storage_uri=train_task.outputs["storage_uri"],
        )

        inference_task = inference(
            model_name=model_name,
            scaler_in=preprocess_task.outputs["scaler_out"]
        )
        inference_task.after(deploy_model_task)

Pipeline arguments:

In [11]:
# Specify pipeline argument values

eval_threshold_metrics = {'rmse': 0.9, 'r2': 0.3, 'mae': 0.8}

arguments = {
    "url": "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv",
    "target": "quality",
    "mlflow_tracking_uri": "http://mlflow.mlflow.svc.cluster.local:5000",
    "mlflow_s3_endpoint_url": "http://mlflow-minio-service.mlflow.svc.cluster.local:9000",
    "mlflow_experiment_name": "demo-notebook",
    "model_name": "wine-quality",
    "alpha": 0.5,
    "l1_ratio": 0.5,
    "threshold_metrics": eval_threshold_metrics
}

## 4. Submit run

In [12]:
run_name = "demo-run"
experiment_name = "demo-experiment"

client.create_run_from_pipeline_func(
    pipeline_func=pipeline,
    run_name=run_name,
    experiment_name=experiment_name,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
    enable_caching=False,
)

RunPipelineResult(run_id=c5728692-2488-4a11-b4a9-e5d53ec7b62b)

## 5. Check run

### Kubeflow Pipelines UI

The default way of accessing KFP UI is via port-forward. This enables you to get started quickly without imposing any requirements on your environment. Run the following to port-forward KFP UI to local port `8080`:

```sh
kubectl port-forward svc/ml-pipeline-ui -n kubeflow 8080:80
```

Now the KFP UI should be reachable at [`http://localhost:8080`](http://localhost:8080).

### MLFlow UI

To access MLFlow UI, open a terminal and forward a local port to MLFlow server:

<br>

```bash
$ kubectl -n mlflow port-forward svc/mlflow 5000:5000
```

<br>

Now MLFlow's UI should be reachable at [`http://localhost:5000`](http://localhost:5000).

## 6. Check deployed model

```bash
# get inference services
kubectl -n kserve-inference get inferenceservice

# get deployed model pods
kubectl -n kserve-inference get pods

# delete inference service
kubectl -n kserve-inference delete inferenceservice wine-quality
```
<br>

If something goes wrong, check the logs with:

<br>

```bash
kubectl logs -n kserve-inference <pod-name> kserve-container

kubectl logs -n kserve-inference <pod-name> queue-proxy

kubectl logs -n kserve-inference <pod-name> storage-initializer
```
