# Demo KFP pipeline

Install requirements:

In [5]:
%%bash

pip install kfp~=1.8.14

Imports:

In [6]:
import warnings
warnings.filterwarnings("ignore")

import kfp
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
    Metrics,
    Artifact,
    Model
)

## 1. Connect to client

In [7]:
client = kfp.Client(host=None)

If "**host**" if left as "**None**", it will use the cluster set in in kubectl current-context. E.g.

<br>

```bash
# see current context
kubectl config current-context

# see all contexts
kubectl config get-contexts

# set context
kubectl config use-context kind-ep
```

<br>

## 2. Components

There are different ways to define components in KFP. Here, we use the **@component** decorator to define the components as Python function-based components.

The **@component** annotation converts the function into a factory function that creates pipeline steps that execute this function. This example also specifies the base container image to run you component in.

Pull data component:

In [8]:
@component(
    base_image="python:3.9",
    packages_to_install=["pandas"],
    output_component_file='pull_data_component.yaml',
)
def pull_data(url: str, data: Output[Dataset]):
    """
    Pull data component.
    """
    import pandas as pd

    df = pd.read_csv(url, sep=";")
    df.to_csv(data.path, index=None)

Preprocess component:

In [9]:
@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn"],
    output_component_file='preprocess_component.yaml',
)
def preprocess(
    data: Input[Dataset],
    scaler_out: Output[Artifact],
    train_set: Output[Dataset],
    test_set: Output[Dataset],
    target: str = "quality",
):
    """
    Preprocess component.
    """
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler

    data = pd.read_csv(data.path)

    # Split the data into training and test sets. (0.75, 0.25) split.
    train, test = train_test_split(data)

    scaler = StandardScaler()

    train[train.drop(target, axis=1).columns] = scaler.fit_transform(train.drop(target, axis=1))
    test[test.drop(target, axis=1).columns] = scaler.transform(test.drop(target, axis=1))

    with open(scaler_out.path, 'wb') as fp:
        pickle.dump(scaler, fp, pickle.HIGHEST_PROTOCOL)

    train.to_csv(train_set.path, index=None)
    test.to_csv(test_set.path, index=None)

Train component:

In [10]:
@component(
    base_image="python:3.9",
    packages_to_install=["numpy", "pandas", "scikit-learn", "mlflow~=1.22.0", "boto3"],
    output_component_file='train_component.yaml',
)
def train(
    train_set: Input[Dataset],
    test_set: Input[Dataset],
    saved_model: Output[Model],
    alpha: float,
    l1_ratio: float,
    target: str = "quality",
):
    """
    Train component.
    """
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import ElasticNet
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import mlflow
    import mlflow.sklearn
    import os
    import logging
    import pickle

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    def eval_metrics(actual, pred):
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2

    os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://mlflow-minio-service.mlflow.svc.cluster.local:9000"

    MLFLOW_EXPERIMENT_NAME = "demo-notebook"
    MLFLOW_TRACKING_URI = "http://mlflow.mlflow.svc.cluster.local:5000"

    # load data
    train = pd.read_csv(train_set.path)
    test = pd.read_csv(test_set.path)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop([target], axis=1)
    test_x = test.drop([target], axis=1)
    train_y = train[[target]]
    test_y = test[[target]]

    logger.info(f"Using MLflow tracking URI: {MLFLOW_TRACKING_URI}")
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

    logger.info(f"Using MLflow experiment: {MLFLOW_EXPERIMENT_NAME}")
    mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)

    with mlflow.start_run():
        model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)

        logger.info("Fitting model...")
        model.fit(train_x, train_y)

        logger.info("Predicting...")
        predicted_qualities = model.predict(test_x)

        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        logger.info("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        logger.info("  RMSE: %s" % rmse)
        logger.info("  MAE: %s" % mae)
        logger.info("  R2: %s" % r2)

        logger.info("Logging parameters to MLflow")
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        # save model to mlflow
        logger.info("Logging trained model")
        mlflow.sklearn.log_model(model, "model", registered_model_name="ElasticnetWineModel")

        logger.info("Logging predictions artifact to MLflow")
        np.save("predictions.npy", predicted_qualities)
        mlflow.log_artifact(
        local_path="predictions.npy", artifact_path="predicted_qualities/"
        )

        # save model as KFP artifact
        logging.info(f"Saving model to: {saved_model.path}")
        with open(saved_model.path, 'wb') as fp:
            pickle.dump(model, fp, pickle.HIGHEST_PROTOCOL)

## 3. Pipeline

Pipeline definition:

In [11]:
@dsl.pipeline(
      name='demo-pipeline',
      description='An example pipeline that performs addition calculations.',
)
def pipeline(url: str, target: str, alpha: float, l1_ratio: float):

    pull_task = pull_data(url=url)

    preprocess_task = preprocess(data=pull_task.outputs["data"])

    train_task = train(
        train_set=preprocess_task.outputs["train_set"],
        test_set=preprocess_task.outputs["test_set"],
        target=target,
        alpha=alpha,
        l1_ratio=l1_ratio
    )
    train_task.apply(use_aws_secret(secret_name="aws-secret"))

Pipeline arguments:

In [12]:
# Specify pipeline argument values
arguments = {
    "url": "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv",
    "target": "quality",
    "alpha": 0.5,
    "l1_ratio": 0.5
}

## 4. Submit run

In [13]:
run_name = "demo-run"
experiment_name = "demo-experiment"

client.create_run_from_pipeline_func(
    pipeline_func=pipeline,
    run_name=run_name,
    experiment_name=experiment_name,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE
)

RunPipelineResult(run_id=7cc490cb-587a-4b19-b57d-95d19742d2fe)

## 5. Check run

### Kubeflow Pipelines UI

To access MLFlow UI, open a terminal and forward a local port to KFP UI server:

<br>

```bash
$ kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
```

<br>

Now KFP UI should be reachable at [http://localhost:8080/](http://localhost:8080/).

### MLFlow UI

To access MLFlow UI, open a terminal and forward a local port to MLFlow server:

<br>

```bash
$ kubectl -n mlflow port-forward svc/mlflow 5000:5000
```

<br>

Now MLFlow's UI should be reachable at [`http://localhost:5000`](http://localhost:5000).