# Kubeflow Pipelines (KFP)
To learn how to use Kubeflow Pipelines, it is important to understand the following concepts:

*KFP pipeline*: A KFP pipeline is a directed acyclic graph of ML tasks. A KFP pipeline can consist of multiple tasks, such as data preprocessing, model training, evaluation, and deployment. The pipeline defines the dependencies between these tasks and the order in which they should be executed.

*KFP component*: A KFP component performs a specific task of a KFP pipeline. In other words, a KFP pipeline consist of one or more KFP components.

*KFP run*: A KFP run is a single execution of a KFP pipeline.

*KFP experiment*: A KFP experiment has one or more KFP runs. It can be seen as a workspace where you can try different configurations (e.g., different training datasets or hyperparameters) of a KFP pipeline.

Kubeflow Pipelines provides a Python SDK for building KFP components and pipelines. You will see a concrete example of using the SDK in the following sections. 

*More reading material: [Kubeflow Pipelines docs](https://www.kubeflow.org/docs/components/pipelines/v2/introduction/).*


# A demo pipeline
This demo shows how to use Kubeflow Pipelines to define KFP components and chain them into a KFP pipeline that can complete multiple tasks, from data preparation to model deployment.

In this example, we'll continue using the red wine quality dataset. At the end of this tutorial, we'll build a KFP pipeline that performs the following tasks:

<img src="./images/kfp-demo.jpg" />

Let's get started!

In [1]:
# Import packages
import warnings
warnings.filterwarnings("ignore")
import kfp
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
)

## 1. Connect to kfp client


In [2]:
client = kfp.Client(host=None)

If **host** is left as **None**, it will use the cluster set in the current context of kubectl. The context of kubectl can be checked and configured using the following commands: 
```bash
# Check current context, you should see "kind-kind-ep" as the output
kubectl config current-context

# If the output is not "kind-kind-ep", set current context to "kind-kind-ep"
kubectl config use-context kind-kind-ep
```

## 2. Build KFP Components
There are two ways to define components in KFP: [defining the components through Python functions](https://www.kubeflow.org/docs/components/pipelines/v1/sdk/python-function-components/) and [directly defining the component specification](https://www.kubeflow.org/docs/components/pipelines/v1/sdk/component-development/#creating-a-component-specification). In this tutorial (and the assignments), we'll use the first approach as it's more intuitive but feel free to explore the second approach especially if you want to use different languages other than Python.

A KFP component can accept inputs and create outputs. The outputs of a KFP component can be used as another KFP component's inputs. There will be a concrete example later. 

**Component for downloading the data**

In [3]:
@component(
    base_image="python:3.10",
    packages_to_install=["pandas~=1.5.3"],
    output_component_file="components/pull_data_component.yaml"
)
def pull_data(url: str, data: Output[Dataset]):
    """
    Download a dataset and save it to a file
    Args:
        url: Dataset URL
        data: Output of type Dataset where the downloaded dataset is saved
    """
    import pandas as pd
    df = pd.read_csv(url, sep=";")
    df.to_csv(data.path, index=None)

Let's take a deeper look at the code above.  

When building a component, it's a good practice to start by considering its inputs and outputs. There are two categories of inputs and outputs in Kubeflow Pipelines: [Parameters](https://www.kubeflow.org/docs/components/pipelines/v2/data-types/parameters/) that pass small amount of data such as strings, numbers, and booleans, and [Artifacts](https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/) that pass large amount of data using files, such as a dataset and a model. 

- The dataset we want to use is a CSV file hosted in a website. Instead of hardcoding the URL in the code, we pass the URL as an *input* to the component so that this component can be reused in case the data location changes. 
- As for the *output*, we want to pass the downloaded dataset to the next task, so we specify that the component output should be a *Dataset* type (`Output[Dataset]`). All artifacts are saved by Kubeflow Pipelines to a storage service, which is MinIO in our setup. (Notice that the MinIO service used by Kubeflow Pipelines is a different instance from the one used by MLflow. The one used by Kubeflow Pipelines is running under "kubeflow" namespace, while the one used by MLflow is running under "mlflow" namespace). The *path* property of the Dataset class refers to the location where this artifact will be saved. We save the dataset to that location by using `df.to_csv(data.path, index=None)`. 
- The Python function used to create a KFP component needs to be standalone, meaning the packages needed by the function need to be imported inside the function. For example, we import the Pandas package inside the `pull_data` function. 

The component inputs are always declared as parameters in the function definition. When an output is an Artifact, it's also declared as a function parameter.
```python
def pull_data(url: str, data: Output[Dataset])
```

The `@component` annotation is needed to specify that the `pull_data` function needs to be transformed into a KFP component. Under the hood, KFP creates a factory function, which then creates a pipeline task that executes the `pull_data` function. [Several parameters](https://github.com/kubeflow/pipelines/blob/sdk/release-1.8/sdk/python/kfp/v2/components/component_decorator.py) can be specified for the `@component` annotation. In our code, we use `base_image` to specify the Docker image where the function is running. We use `packages_to_install` to specify the Python packages that need to be installed before running the function. Finally, we use `output_component_file` to create a file that contains the specification for the newly built component. After running the above code cell, you'll see a file named "pull_data_component.yaml" appear in the "components" directory. You'll see your code there as well as other configurations for creating the KFP component. This way you can reuse the component in another project without needing to write the function again. For example, if you need to use this component for downloading other data, you can load the component using the following code:
```python
pull_data_component = kfp.components.load_component_from_file("components/pull_data_component.yaml")
```

**Component for splitting data into training and testing datasets**

Again, we start by considering the inputs and outputs of the component. The input of this component will come from the output Artifact of the `pull_data()` function above, so we annotate the parameter with the `Input[Dataset]` type. We want to have two outputs: a training and a testing dataset. 

In [9]:
@component(
    base_image="python:3.10",
    packages_to_install=["pandas~=1.5.3", "scikit-learn~=1.3.2"],
    output_component_file="components/preprocess_component.yaml"
)
def preprocess(
    data: Input[Dataset],
    train_set: Output[Dataset],
    test_set: Output[Dataset],
):
    """
    Read a dataset from a file, split it into a training and test dataset, and save the training and test dataset
    into separate files
    Args:
        data: Input of type Dataset which the dataset is read from
        train_set: Output of type Dataset where the training dataset is saved
        test_set: Output of type Dataset where the test dataset is saved  
    """
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    data = pd.read_csv(data.path)
    
    # Split the data into training and test sets. (0.75, 0.25) split
    train, test = train_test_split(data, random_state=42)
    
    # Save the training and test datasets into separate files
    train.to_csv(train_set.path, index=None)
    test.to_csv(test_set.path, index=None)
    

**Component for training a model**

This component accepts multiple inputs. Most of which are easy to understand (as you can see from the documentation strings inside the function) except an input named "model_artifact_path" which may be a bit confusing. Let's use an example to explain what this input means. Recall that in our setup, the MLflow service uses an MinIO storage service as the artifact store. MinIO is an open-source S3-compatible storage service. 

When we register a model to the MLflow service, we actually upload a few artifacts to MinIO:

<img src="./images/mlflow-artifacts.png" width="800" />

<img src="./images/minio.png" width="1000" />

The model's S3 URI (e.g., s3://mlflow/9/fd05ffb7b4f541278966f292dd7dabba/artifacts/wine-quality in the example) follows the following structure: 
- "s3://" indicates that this is an S3 URI;
- "mlflow" is the name of the bucket used by the MLflow service. A bucket can be seen as a top-level directory;
- "/9/fd05f.../artifacts/wine-quality" can be seen as a hierarchy structure of subdirectories: "9" is the MLflow Experiment ID, "fd05f..." is the MLflow Run ID, "artifacts" is a default name picked up by MLflow, and "wine-quality" indicates the lowest-level subdirectory where the artifacts are stored. This is the value passed to the "model_artifact_path" input. We say this "model_artifact_path" is relative to the MLflow Run creating the artifacts. 

In [4]:
from typing import NamedTuple

@component(
    base_image="python:3.10",
    packages_to_install=["pandas~=1.5.3", "numpy", "scikit-learn~=1.3.2", "mlflow==2.3.2", "boto3~=1.28.85"],
    output_component_file="components/train_component.yaml"
)
def train(
    train_set: Input[Dataset],
    test_set: Input[Dataset],
    mlflow_experiment_name: str,
    mlflow_tracking_uri: str,
    mlflow_s3_endpoint_url: str,
    model_artifact_path: str,
    alpha: float,
    l1_ratio: float,
    target: str = "quality"
) -> NamedTuple("Output", [("storage_uri", str), ("run_id", str),]):
    """
    Train a model, save the training metadata and the resulted artifacts to MLflow
    Args:
        train_set: Input where the training dataset is saved
        test_set: Input where the test dataset is saved
        mlflow_experiment_name: Name of the MLflow experiment
        mlflow_tracking_uri: URI of MLflow's tracking server
        mlflow_s3_endpoint_url: URL of MLflow's artifact store (MinIO)
        model_artifact_path: The path where the artifacts of the model are stored in MLflow's artifact store (MinIO). It's relative to the MLflow Run. 
        alpha, l1_ratio: Hyperparameters that need to be configured
        target: Target column name
    
    Returns: 
        namedtuple("Output", ["storage_uri", "run_id"]) where storage_uri is the S3 URI of the saved model artifact 
        in the Mlflow's artifact store (MinIO) and run_id the ID of the MLflow run that produces the model
    """
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import ElasticNet
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import mlflow
    import mlflow.sklearn
    import os
    import logging
    from collections import namedtuple
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    def eval_metrics(actual, pred):
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2
    
    os.environ["MLFLOW_S3_ENDPOINT_URL"] = mlflow_s3_endpoint_url
    
    # Load data
    train = pd.read_csv(train_set.path)
    test = pd.read_csv(test_set.path)
    
    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop([target], axis=1)
    test_x = test.drop([target], axis=1)
    train_y = train[[target]]
    test_y = test[[target]]
    
    logger.info(f"Using MLflow tracking URI: {mlflow_tracking_uri}")
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    
    logger.info(f"Using MLflow experiment: {mlflow_experiment_name}")
    mlflow.set_experiment(mlflow_experiment_name)
    
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        logger.info(f"Run ID: {run_id}")
        model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        
        logger.info("Fitting model...")
        model.fit(train_x, train_y)
        
        logger.info("Predicting...")
        predicted_qualities = model.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
        logger.info("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        logger.info("RMSE: %s" % rmse)
        logger.info("MAE: %s" % mae)
        logger.info("R2: %s" % r2)
        
        logger.info("Logging parameters and metrics to MLflow")
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)
        
        logger.info("Logging trained model")
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path=model_artifact_path,
            registered_model_name="ElasticnetWineModel",
            serialization_format="pickle"
        )
        
        logger.info("Logging predictions artifact to MLflow")
        np.save("predictions", predicted_qualities)
        mlflow.log_artifact(
            local_path="predictions.npy",
            artifact_path="predicted_qualities"
        )
        
        # Prepare output
        output = namedtuple("Output", ["storage_uri", "run_id"])

        # use get_artifact_uri to get the absolute S3 URI (s3://mlflow/<mlflow-experiment-id>/<mlflow-run-id>/artifacts/<model_artifact_path>)
        return output(mlflow.get_artifact_uri(artifact_path=model_artifact_path), run_id)
   

Unlike the *pull_data* and *preprocess* components whose outputs are Artifacts, the outputs of the *train* components are Parameters. In this case, the component output is not declared as function parameters but indicated via return annotations. 

Since the *train* component outputs include more than one Parameter (`storage_uri` and `run_id`), we need to use [namedtuple](https://docs.python.org/3/library/collections.html#collections.namedtuple) to return the outputs. 

**Component for deploying a model**


In [5]:
@component(
    base_image="python:3.10",
    packages_to_install=["kserve==0.10.1"],
    output_component_file="components/deploy_model_component.yaml"
)
def deploy_model(model_name: str, storage_uri: str):
    """
    Deploy the model as an inference service to KServe
    Args:
        model_name: the name of the deployed inference service
        storage_uri: the S3 URI of the saved model in MLflow's artifact store (MinIO)
    """
    from kubernetes import client
    from kserve import KServeClient
    from kserve import constants
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    import logging

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    model_uri = storage_uri
    logger.info(f"MODEL URI: {model_uri}")
    namespace = "kserve-inference"
    kserve_version="v1beta1"
    api_version = constants.KSERVE_GROUP + '/' + kserve_version
    
    isvc = V1beta1InferenceService(
        api_version=api_version,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            name=model_name,
            namespace=namespace,
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(
                service_account_name='kserve-sa',
                sklearn=V1beta1SKLearnSpec(
                    storage_uri=model_uri
                )
            )
        )
    )
    kserve = KServeClient()
    try:
        kserve.create(inferenceservice=isvc)
    except RuntimeError:
        kserve.patch(name=model_name, inferenceservice=isvc, namespace=namespace)
    

## 3. Define a KFP pipeline

To build a pipeline, we need to define a function with a sequence of steps and annotate the function using the `dsl.pipeline` decorator. 

In [6]:
@dsl.pipeline(
    name="redwine-pipeline",
    description="An example pipeline that deploys a redwine model"
)
def pipeline(
    url: str,
    target: str,
    mlflow_experiment_name: str,
    mlflow_tracking_uri: str,
    mlflow_s3_endpoint_url: str,
    model_name: str,
    alpha: float,
    l1_ratio: float
):
    """
    Define a pipeline
    Args:
        url: URL for downloading the dataset
        target: Target column name of the dataset
        mlflow_experiment_name: Name of the MLflow experiment
        mlflow_tracking_uri: URI of MLflow's tracking server
        mlflow_s3_endpoint_url: URL of MLflow's artifact store (MinIO)
        model_name: The name of the KServe inference service. It's also used as the model's artifact path relative to the MLflow Run
        alpha, l1_ratio: Hyperparameters that need to be configured
    """

    # When we call a component in a pipeline definition, it constructs a pipeline task (instantiated component)
    pull_task = pull_data(url)
    
    # The preprocess task uses the output Artifact of the pull_data task as the input
    preprocess_task = preprocess(data=pull_task.outputs["data"])
    
    train_task = train(
        train_set=preprocess_task.outputs["train_set"],
        test_set=preprocess_task.outputs["test_set"],
        target=target,
        mlflow_experiment_name=mlflow_experiment_name,
        mlflow_tracking_uri=mlflow_tracking_uri,
        mlflow_s3_endpoint_url=mlflow_s3_endpoint_url,
        model_artifact_path=model_name,
        alpha=alpha,
        l1_ratio=l1_ratio
    )
    
    # The train task uploads the trained model to the MLflow service
    # so it needs to access the required credentials of the MinIO storage service that MLflow uses.
    # These credentials (username and password) have been deployed as a secret object named "aws-secret" to the 
    # Kubernetes cluster during the setup.
    train_task.apply(use_aws_secret(secret_name="aws-secret"))
    
    deploy_model(
        model_name = model_name,
        storage_uri=train_task.outputs["storage_uri"]
    )

    

The data pre-process task uses the output Artifact of the pull_data task as the input. To get an output Artifact of a previous task, we need to use the `outputs` dictionary and use the parameter name as the key.
```python
preprocess_task = preprocess(data=pull_task.outputs["data"])
``` 
The parameter "data" is defined in the `pull_data` function definition:
```python
def pull_data(url: str, data: Output[Dataset]):
```

Similarly, for the model deployment task, we get an output Parameter from another task.
```python
deploy_model_task = deploy_model(
        ...
        storage_uri=train_task.outputs["storage_uri"]
    )
```
The parameter name "storage_uri" is defined via the return annotation in the `train` function.
```python
def train(...):
    ...
    output = namedtuple("Output", ["storage_uri", "run_id"])
    return output(mlflow.get_artifact_uri(artifact_path=model_name), run_id)
```

If a component has a single Parameter output, e.g., 
```python
@component()
def plus(a: int, b: int) -> int:
    return a + b
def plus_task = plus(1, 2)
```
its output can be accessed by `plus_task.output`.


Let's then specify arguments for the `pipeline` function.

In [7]:
# Specify pipeline argument values
arguments = {
    "url": "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv",
    "target": "quality",
    "mlflow_tracking_uri": "http://mlflow.mlflow.svc.cluster.local:5000", # this is how a service can be accessed inside the platform. The URL is in the format
                                                                          # of http://<service-object-name>.<namespace>.svc.cluster.local:<service-port>
    "mlflow_s3_endpoint_url": "http://mlflow-minio-service.mlflow.svc.cluster.local:9000",
    "mlflow_experiment_name": "demo-notebook",
    "model_name": "wine-quality",
    "alpha": 0.5,
    "l1_ratio": 0.5,
}

## 4. Submit a KFP run
After we've defined the pipeline function, we can execute it using the `create_run_from_pipeline_func` function. 

In [11]:
run_name = "demo-run"
experiment_name = "demo-experiment"

client.create_run_from_pipeline_func(
    pipeline_func=pipeline,
    run_name=run_name,
    experiment_name=experiment_name,
    arguments=arguments, # These are the arguments passed to the pipeline function
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, # KFP SDK has two versions (v1 and v2). We use v2 here
    enable_caching=False # Disable caching for this pipeline, more details at https://www.kubeflow.org/docs/components/pipelines/v1/overview/caching-v2/
)

RunPipelineResult(run_id=5cd93fd6-17d9-4039-bf93-9aa5215ccd6e)

You'll see two links in the output but they don't work here. You can simply ignore them. 

We can now go to the Kubeflow Pipeline UI at [http://ml-pipeline-ui.local](http://ml-pipeline-ui.local) and navigate to the "Experiments" field. For example, we can see a KFP experiment called "demo-experiment" has been created, and there is a KFP run called "demo-run" under the "demo-experiment" KFP experiment.

<img src="./images/kfp-experiment-overview.png" width=900/>



<img src="./images/kfp-run-overview.png" width=900/>


We can follow the progress of the "demo-run" KFP run by clicking the run name:

<img src="./images/kfp-run-progress.png" width=900/>
<img src="./images/kfp-run-finish.png" width=900/>

We can also check the logging hyperparameters and metrics, as well as the registered model from MLflow at [http://mlflow-server.local](http://mlflow-server.local):

<img src="./images/demo-mlflow-logging.png" width=1200/>
<img src="./images/demo-mlflow-model.png" width=1200/>




## 5. Testing the deployed inference service

When the KFP run is completed, let's check if the inference service is ready.

In [13]:
!kubectl get isvc wine-quality -n kserve-inference

NAME           URL                                                READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                    AGE
wine-quality   http://wine-quality.kserve-inference.example.com   True           100                              wine-quality-predictor-default-00001   55s


Example output:

```
NAME           URL                                                READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                    AGE
wine-quality   http://wine-quality.kserve-inference.example.com   True           100                              wine-quality-predictor-default-00001   2m7s
```

When the inference service is ready, we can send some requests to it.

In [14]:
import requests

input_sample = [
        [7.8, 0.58, 0.02, 2, 0.073, 9, 18, 0.9968, 3.36, 0.57, 9.5],
        [8.9, 0.22, 0.48, 1.8, 0.077, 29, 60, 0.9968, 3.39, 0.53, 9.4]
    ]
model_artifact_path = arguments["model_name"]
req_data = {
    "instances": input_sample
}
headers = {}
headers["Host"] = f"{model_artifact_path}.kserve-inference.example.com"
url = f"http://kserve-gateway.local:30200/v1/models/{model_artifact_path}:predict"
result = requests.post(url, json=req_data, headers=headers)
print(result.json())

{'predictions': [5.655508704978511, 5.5175364387082615]}


Example output: 
```text
{'predictions': [5.655508704978511, 5.5175364387082615]}
```

Clean up by deleting the "wine-quality" inference service.

In [15]:
!kubectl -n kserve-inference delete isvc wine-quality

inferenceservice.serving.kserve.io "wine-quality" deleted


For deleting KFP Experiments and Runs, please check [this notebook](./delete_from_kfp.ipynb).