In [1]:
import kfp
import kfp.components as components
from kfp.components import OutputPath
from kfp.components import create_component_from_func
from kfp import dsl
from typing import NamedTuple

In [2]:
def get_data_batch(
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_data_bucket_name: str,
    mlpipeline_ui_metadata_path: OutputPath("Metadata")
):
    """
    Get data from Minio object store
    """
    import numpy as np
    import json
    from tensorflow import keras
    from minio import Minio
    from collections import namedtuple

    def load_data():
        with np.load("/tmp/mnist.npz", allow_pickle=True) as f:
            x_train, y_train = f["x_train"], f["y_train"]
            x_test, y_test = f["x_test"], f["y_test"]

        return (x_train, y_train), (x_test, y_test)
    
    # First, download the MNIST dataset locally
    minio_client = Minio(minio_url, access_key=minio_access_key, secret_key=minio_secret_key, secure=False)
    minio_client.fget_object(minio_data_bucket_name, "mnist.npz", "/tmp/mnist.npz")
    
    # Extract the MINST dataset into training and testing datasets
    (x_train, y_train), (x_test, y_test) = load_data()

    # Convert each set of data and save them back to Minio bucket
    np.save("/tmp/x_train.npy", x_train)
    np.save("/tmp/y_train.npy", y_train)
    np.save("/tmp/x_test.npy", x_test)
    np.save("/tmp/y_test.npy", y_test)
    minio_client.fput_object(minio_data_bucket_name, "x_train", "/tmp/x_train.npy")
    minio_client.fput_object(minio_data_bucket_name, "y_train", "/tmp/y_train.npy")  
    minio_client.fput_object(minio_data_bucket_name, "x_test", "/tmp/x_test.npy")    
    minio_client.fput_object(minio_data_bucket_name, "y_test", "/tmp/y_test.npy")
    
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    metadata_data = {
        "outputs": [
            {
                'storage': 'inline',
                'source': '''
**Number of training data rows:** {}; **Number of testing data rows:** {}
'''.format(x_train.shape[0], x_test.shape[0]),
                'type': 'markdown',
            }
        ]
    }
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata_data, metadata_file)

In [3]:
def get_latest_data(
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_data_bucket_name: str
):
    """
    Dummy functions for showcasing
    """
    print("Adding latest data")

In [4]:
def reshape_data(
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_data_bucket_name: str
):
    """
    Reshape the data for model building
    """
    import numpy as np
    from minio import Minio
    
    minio_client = Minio(minio_url, access_key=minio_access_key, secret_key=minio_secret_key, secure=False)
    
    # Load data from minio
    minio_client.fget_object(minio_data_bucket_name, "x_train", "/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_data_bucket_name, "x_test", "/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    # Reshaping the data:
    # Reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1, 28, 28, 1)
    x_test = x_test.reshape(-1, 28, 28, 1)

    # Normalizing the data:
    # Each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    # Save reshaped data from minio
    np.save("/tmp/x_train.npy", x_train)
    minio_client.fput_object(minio_data_bucket_name, "x_train", "/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_data_bucket_name, "x_test", "/tmp/x_test.npy")

In [5]:
def model_building(
    num_of_epochs: int,
    batch_size: int,
    optimizer: str,
    minio_url: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_data_bucket_name: str,
    run_id: str,
    mlpipeline_ui_metadata_path: OutputPath("Metadata"),
    mlpipeline_metrics_path: OutputPath("Metrics"),
    model_output_path: OutputPath("Trained Model")
) :
    """
    Build the model with Keras API. Export model parameters.
    """
    from tensorflow import keras
    from minio import Minio
    from collections import namedtuple
    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import json
    import os
    import glob
    
    # Load back the prepared training datasets
    minio_client = Minio(minio_url, access_key=minio_access_key, secret_key=minio_secret_key, secure=False)
    minio_client.fget_object(minio_data_bucket_name, "x_train", "/tmp/x_train.npy")
    minio_client.fget_object(minio_data_bucket_name, "y_train", "/tmp/y_train.npy")
    minio_client.fget_object(minio_data_bucket_name,"x_test","/tmp/x_test.npy")
    minio_client.fget_object(minio_data_bucket_name,"y_test","/tmp/y_test.npy")
    x_train = np.load("/tmp/x_train.npy")
    y_train = np.load("/tmp/y_train.npy")    
    x_test = np.load("/tmp/x_test.npy")
    y_test = np.load("/tmp/y_test.npy")
    
    # Build an artifical neural network using Keras
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(32, activation='relu'))
    model.add(keras.layers.Dense(10, activation='softmax')) # Output are 10 classes, numbers from 0-9
    
    # Compile the model - we want to have a catagorical outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    # Fit the model by start training and return the history while training
    history = model.fit(
        x=x_train,
        y=y_train,
        epochs=num_of_epochs,
        batch_size=batch_size,
    )
    
    # Evaluate the model
    model_loss, model_accuracy = model.evaluate(x=x_test, y=y_test)
    y_pred_probs = model.predict(x=x_test)
    y_pred_catagorical = np.argmax(y_pred_probs, axis=1) # The prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model
    
    # Save all training results as Kubeflow run metadata (shown in Visualization)
    metadata_data = {"outputs": []}
        
    def record_model_summary(_model, _metadata_data):
        stringlist = []
        _model.summary(print_fn=lambda x: stringlist.append(x))
        metric_model_summary = "\n".join(stringlist)
        _metadata_data["outputs"].append({
            "storage": "inline",
            "source": '''
# Model Summary
```
{}
```
        '''.format(metric_model_summary),
            'type': 'markdown'
        })
        
    def record_loss_and_accuracy(_model_loss, _model_accuracy, _metadata_data):
        _metadata_data["outputs"].append({
            "storage": "inline",
            "source": '''
# Model Performance
- **Accuracy:** {}
- **Loss:** {}
        '''.format(_model_accuracy, _model_loss),
            'type': 'markdown'
        })
        
    def record_confusion_matrix(_y_test, _y_pred_catagorical, _metadata_data):
        confusion_matrix = tf.math.confusion_matrix(labels=_y_test, predictions=_y_pred_catagorical)
        confusion_matrix = confusion_matrix.numpy()
        vocab = list(np.unique(_y_test))
        data = []
        for target_index, target_row in enumerate(confusion_matrix):
            for predicted_index, count in enumerate(target_row):
                data.append((vocab[target_index], vocab[predicted_index], count))

        _metadata_data["outputs"].append({
            "type": "confusion_matrix",
            "format": "csv",
            "schema": [
                {'name': 'target', 'type': 'CATEGORY'},
                {'name': 'predicted', 'type': 'CATEGORY'},
                {'name': 'count', 'type': 'NUMBER'},
              ],
            "target_col" : "actual",
            "predicted_col" : "predicted",
            "source": pd.DataFrame(data, columns=['target', 'predicted', 'count']).to_csv(header=False, index=False),
            "storage": "inline",
            "labels": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        })
    
    record_model_summary(model, metadata_data)
    record_loss_and_accuracy(model_loss, model_accuracy, metadata_data)
    record_confusion_matrix(y_test, y_pred_catagorical, metadata_data)
    
    # Result recording 1. With all the metadata, save and commit them into Kubeflow
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata_data, metadata_file)
    
    # Result recording 2. Save training metrics to be shown in Kubeflow run output
    with open(mlpipeline_metrics_path, 'w') as metrics_file:
        json.dump({
          'metrics': [{
              'name': 'model_accuracy',
              'numberValue':  float(model_accuracy),
              'format' : "PERCENTAGE"
            },{
              'name': 'model_loss',
              'numberValue':  float(model_loss),
              'format' : "PERCENTAGE"
            }]
        }, metrics_file)
        
    # Result recording 3. Save the model to Kubeflow's artifact folder
    keras.models.save_model(model, model_output_path)

    # Finally, save all unzipped files to Minio for future uses
    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    keras.models.save_model(model, "/tmp/detect-digits")
    upload_local_directory_to_minio("/tmp/detect-digits", minio_data_bucket_name, "models/detect-digits/" + run_id + "/")

In [6]:
def model_serving_on_kubernetes(
    run_id: str
):
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()
    name='digits-recognizer'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://mlpipeline/models/detect-digits/" + run_id + "/"))))
    )

    KServe = KServeClient()
    try:
        KServe.create(isvc, namespace=namespace)
    except:
        print("The KServer service already exist, try to replace.")
        KServe.replace(name, isvc, namespace=namespace)

In [7]:
def verify_model_serving_inferernce_result():
    """
    Simulate to upload an image to let the model to infer
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    import numpy as np
    import pandas as pd
    import requests
    import json

    print("Actual Number: 5")
    x_number_five = np.array([[[[  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   3,
             18,  18,  18, 126, 136, 175,  26, 166, 255, 247, 127,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,  30,  36,  94, 154, 170,
            253, 253, 253, 253, 253, 225, 172, 253, 242, 195,  64,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,  49, 238, 253, 253, 253, 253,
            253, 253, 253, 253, 251,  93,  82,  82,  56,  39,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,  18, 219, 253, 253, 253, 253,
            253, 198, 182, 247, 241,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,  80, 156, 107, 253, 253,
            205,  11,   0,  43, 154,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,  14,   1, 154, 253,
             90,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0, 139, 253,
            190,   2,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,  11, 190,
            253,  70,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,  35,
            241, 225, 160, 108,   1,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
             81, 240, 253, 253, 119,  25,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,  45, 186, 253, 253, 150,  27,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,  16,  93, 252, 253, 187,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0, 249, 253, 249,  64,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,  46, 130, 183, 253, 253, 207,   2,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,  39,
            148, 229, 253, 253, 253, 250, 182,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,  24, 114, 221,
            253, 253, 253, 253, 201,  78,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,  23,  66, 213, 253, 253,
            253, 253, 198,  81,   2,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,  18, 171, 219, 253, 253, 253, 253,
            195,  80,   9,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,  55, 172, 226, 253, 253, 253, 253, 244, 133,
             11,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0, 136, 253, 253, 253, 212, 135, 132,  16,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0],
           [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,
              0,   0]]]])

    KServe = KServeClient()

    isvc_resp = KServe.get("digits-recognizer", namespace="kubeflow-user-example-com")
    isvc_url = isvc_resp['status']['address']['url']

    t = np.array(x_number_five)
    t = t.reshape(-1, 28, 28, 1)

    inference_input = {
      'instances': t.tolist()
    }

    response = requests.post(isvc_url, json=inference_input)
    r = json.loads(response.text)
    print("Predicted: {}".format(np.argmax(r["predictions"])))
    
    assert int(np.argmax(r["predictions"])) == 5

In [8]:
container_baseimage_url = "public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0"

comp_get_data_batch = components.create_component_from_func(get_data_batch, base_image=container_baseimage_url)
comp_get_latest_data = components.create_component_from_func(get_latest_data, base_image=container_baseimage_url)
comp_reshape_data = components.create_component_from_func(reshape_data, base_image=container_baseimage_url)
comp_model_building = components.create_component_from_func(model_building, base_image=container_baseimage_url)
comp_model_serving_on_kubernetes = components.create_component_from_func(model_serving_on_kubernetes, base_image=container_baseimage_url, packages_to_install=['kserve==0.8.0.1'])
comp_verify_model_serving_inferernce_result = components.create_component_from_func(verify_model_serving_inferernce_result, base_image=container_baseimage_url, packages_to_install=['kserve==0.8.0.1'])

def disable_cache(step_list):
    for step in step_list:
        step.execution_options.caching_strategy.max_cache_staleness = "P0D"

@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def digit_recognizer_pipeline(
    num_of_epochs: int = 1,
    batch_size: int = 20,
    optimizer: str = 'adam',
    minio_url: str = 'minio-service.kubeflow:9000',
    minio_access_key: str = 'minio',
    minio_secret_key: str = 'minio123',
    minio_data_bucket_name: str = 'mlpipeline'
):
    step1_1 = comp_get_data_batch(
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        minio_data_bucket_name=minio_data_bucket_name
    )
    
    step1_2 = comp_get_latest_data(
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        minio_data_bucket_name=minio_data_bucket_name
    )
    
    step2 = comp_reshape_data(
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        minio_data_bucket_name=minio_data_bucket_name
    )
    step2.after(step1_1)
    step2.after(step1_2)
    
    step3 = comp_model_building(
        num_of_epochs=num_of_epochs,
        batch_size=batch_size,
        optimizer=optimizer,
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        minio_data_bucket_name=minio_data_bucket_name,
        run_id=kfp.dsl.RUN_ID_PLACEHOLDER
    )
    step3.after(step2)
    
    step4 = comp_model_serving_on_kubernetes(
        run_id=kfp.dsl.RUN_ID_PLACEHOLDER
    )
    step4.after(step3)
    
    step5 = comp_verify_model_serving_inferernce_result()
    step5.after(step4)

    disable_cache([step1_1, step1_2, step2, step3, step4, step5])

In [9]:
if __name__ == "__main__":
    next_version_num = "1." + str((kfp.Client().list_pipeline_versions(pipeline_id='12f1e58c-a1ac-47f3-81d1-8a0ba5707cfe').total_size - 1) + 1)
    print("The next pipeline version will be: " + next_version_num)
    next_version_description = input("Please give a version description: ")
    
    kfp.compiler.Compiler().compile(
        pipeline_func=digit_recognizer_pipeline,
        package_path='digit_recognizer_pipeline.yaml'
    )
    
    kfp.Client().upload_pipeline_version(
        pipeline_package_path='digit_recognizer_pipeline.yaml',
        pipeline_version_name=next_version_num, 
        pipeline_name="Digit Recognizer Training and Serving Pipeline", 
        description=next_version_description
    )

The next pipeline version will be: 1.26


Please give a version description:  Fix a minor bug.
