# Scaling Jupyter Notebook with Kubeflow Pipelines

In this notebook, you will write a Machine Learning (ML) model using a Jupyter notebook and scale it with a [Kubeflow pipeline](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/).

**The focus of the notebook is not on how to write an ML classifier. This notebook explains how notebooks can be integrated with Kubeflow pipelines.**

You will use the [basic classification with fashion MNIST and Tensorflow](https://www.tensorflow.org/tutorials/keras/classification) to test it.

_Please note this notebook should be run on a [notebook server](https://www.kubeflow.org/docs/notebooks/) within Kubeflow._

## Classify Clothing

The [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist) dataset contains 70,000 grayscale images in 10 clothing categories. Each image is 28x28 pixels in size.

_Can you write an model that when given an image it can tell what kind of clothing it is?_

That's exactly what you will build when you complete this notebook.

You might have [seen this example before](https://www.tensorflow.org/tutorials/keras/classification) as it's a popular classifier, well suited to learning how to use Tensorflow and machine learning.

<table>
  <tr><td>
    <img src="https://tensorflow.org/images/fashion-mnist-sprite.png"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
    <b>Figure 1.</b> <a href="https://github.com/zalandoresearch/fashion-mnist">Fashion-MNIST samples</a> (by Zalando, MIT License).<br/>&nbsp;
  </td></tr>
</table>


### Before you start

You should install a few packages to explore the dataset:

In [None]:
!python -m pip install --user --upgrade pip
!pip install --user --upgrade pandas matplotlib numpy

You should restart kernel for the changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

It's time to write the model. You should import the following libraries.

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras

# Data exploration libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

You should import the fashion MNIST dataset:

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

In the dataset, each image is mapped to a label.

You can imagine having a picture of a shirt and the string "shirt" next to it.

In other words, someone else did the hard work of mapping images and labels (at least part of it).

_But there's a gotcha._

Instead of expressing the labels as words, the dataset has a mapping of image-integer.

Like this.

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
pd.DataFrame(class_names, columns=["Cloth type"])


_How many images are available as training data?_


In [None]:
print(f'Number of training images: {train_images.shape[0]}\n')
print(f'Number of labels: {len(train_labels)}\n')

_How many images are available as testing data?_

In [None]:
print(f'Number of test images: {test_images.shape[0]}\n')
print(f'Number of test labels: {len(test_labels)}')

What's the size of the images?

In [None]:
print(f'Image size: {train_images.shape[1:]}')


The images are in grayscale:

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_images[i], cmap=plt.cm.binary)
    plt.xlabel(class_names[train_labels[i]])
plt.show()

The grayscale goes from 0 (black) to (255) white.

> Please notice that, while working with the model, you will have to normalise the values and make sure they are between 0 and 1.

Now that you have familiriased with the data, let's have a look at how to train the model.

## Building a Kubeflow pipeline

The training model should have two functions:

- a function that ingests (picture, label) pairs and trains a model
- a function that, given an image, predicts the label

[You can learn how the model works in this detailed tutorial](https://www.tensorflow.org/tutorials/keras/classification). For now knowing that there are two functions is enough to move to the next step.

Since you will run the model on Kubeflow, you should install the [Kubeflow Pipelines SDK](https://www.kubeflow.org/docs/pipelines/sdk/sdk-overview/) package.



In [None]:
!pip install --user --upgrade kfp

You should restart kernel for changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

If the installation was successful you should be able to see the following file:

**/usr/local/bin/dsl-compile**

In [None]:
!which dsl-compile

You should import the Kubeflow Pipelines SDK in your project:

In [None]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from typing import NamedTuple

The first function to be run in the pipeline is the `train` function.

As you can imagine, here the model is trained using the test data.

Let's focus on the Kubeflow specific features.

In [None]:
def train(data_path, model_file)-> NamedTuple('output', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
    
    import pickle
    import json
    import tensorflow as tf
    from tensorflow.python import keras
    
    # Download the dataset and split into training and test data. 
    fashion_mnist = keras.datasets.fashion_mnist

    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

    # Normalize the data so that the values all fall between 0 and 1.
    train_images = train_images / 255.0
    test_images = test_images / 255.0

    # Define the model using Keras.
    model = keras.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
                  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Run a training job with specified number of epochs
    model.fit(train_images, train_labels, epochs=10)

    # Evaluate the model and print the results
    test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
    print('Test accuracy:', test_acc)

    # Save the model to the designated 
    model.save(f'{data_path}/{model_file}')

    # Save the test_data as a pickle file to be used by the predict component.
    with open(f'{data_path}/test_data', 'wb') as f:
        pickle.dump((test_images,test_labels), f)
    
    metadata = {
        'outputs' : [{
          'type': 'web-app',
          'storage': 'inline',
          'source': "<div>Done</div>",
        }]
      }

    metrics = {
      'metrics': [{
          'name': 'Accuracy',
          'numberValue':  float(test_acc),
        }, {
          'name': 'Loss',
          'numberValue':  float(test_loss),
        }]}
    
    from collections import namedtuple
    print_output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return print_output(json.dumps(metadata), json.dumps(metrics))

You should notice that the return value of the function is annotated with a NamedTuple.

The NamedTuple is used to return multiple outputs.

In this case, the two output are:

1. [Metadata](https://www.kubeflow.org/docs/components/metadata/) which is used to [visualise the output](https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/) of the current step.
1. [Metrics](https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/) which collects the metrics for the current step.

You can have more outputs, these are convenient when you wish to have a quick visual clue of what the steps were about. An example of one is shown below:

![Output view in Kubeflow pipelines](https://www.kubeflow.org/docs/images/taxi-tip-run-output.png)

The `predict` function is similar:

In [None]:
def predict(data_path, model_file, image_number)-> NamedTuple('output', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
    
    import pickle
    import json
    
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'Pillow'])
    import base64
    from PIL import Image
    from io import BytesIO

    import tensorflow as tf
    from tensorflow import keras

    import numpy as np
    
    # Load the saved Keras model
    model = keras.models.load_model(f'{data_path}/{model_file}')

    # Load and unpack the test_data
    with open(f'{data_path}/test_data','rb') as f:
        test_data = pickle.load(f)
    # Separate the test_images from the test_labels.
    test_images, test_labels = test_data
    # Define the class names.
    class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                   'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

    # Define a Softmax layer to define outputs as probabilities
    probability_model = tf.keras.Sequential([model, 
                                            tf.keras.layers.Softmax()])

    # See https://github.com/kubeflow/pipelines/issues/2320 for explanation on this line.
    image_number = int(image_number)

    # Grab an image from the test dataset.
    selected_image = test_images[image_number]

    # Add the image to a batch where it is the only member.
    img = (np.expand_dims(selected_image,0))

    # Predict the label of the image.
    predictions = probability_model.predict(img)

    # Take the prediction with the highest probability
    prediction = np.argmax(predictions[0])

    # Retrieve the true label of the image from the test labels.
    true_label = test_labels[image_number]
    
    class_prediction = class_names[prediction]
    confidence = 100*np.max(predictions)
    actual = class_names[true_label]
    
    
    with open(f'{data_path}/result.txt', 'w') as result:
      result.write(" Prediction: {} | Confidence: {:2.0f}% | Actual: {}".format(class_prediction,
                                                                                confidence,
                                                                                actual))

    PIL_image = Image.fromarray(np.uint8(selected_image * 255)).convert('RGB')                                                                            
    buffered = BytesIO()
    PIL_image.save(buffered, format="JPEG")
    img_str = base64.b64encode(buffered.getvalue()).decode()
    metadata = {
        'outputs' : [{
          'type': 'web-app',
          'storage': 'inline',
          'source': "<div>Input: <img width=\"200\" src=\"data:image/jpeg;base64,{}\"/></div><p>Prediction: {}</p>".format(img_str, actual),
        }]
      }

    metrics = {
      'metrics': [{
          'name': 'Confidence',
          'numberValue': confidence,
        }]}

    from collections import namedtuple
    print_output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return print_output(json.dumps(metadata), json.dumps(metrics))

It uses the same NamedTuple to export multiple output: Metadata and Metrics.

Notice how the output includes the image used for prediction and the actual prediction.

A part from the NamedTuple, the two functions are just two regular Python functions.

However, in a Kubeflow Pipeline, each function runs in a separate container.

_How can you pass arguments to a function that lives in another container?_

_Also, how do you specify which container image it should use?_

You can decorate your functions with `func_to_container_op` to:

1. Tell Kubeflow that this is a function that can be run as part of Kubeflow Pipelines.
1. Specify in which container the function should run.

`func_to_container_op` is also taking care of function arguments and return values so that when the [Pod](https://kubernetes.io/docs/concepts/workloads/pods/pod/) starts it receives the incoming data from standard input and it writes to standard output.

This step is crucial since the two functions run in different Python runtime environments.

> If you're interested in exploring how this works, you should know that the arguments and output are serialised/deserialised for every invocation. You can [learn more about the process here.](https://towardsdatascience.com/a-dummies-guide-to-build-a-kubeflow-pipeline-c1f61160cba6)

And here's `func_to_container_op` in action:

In [None]:
train_op = comp.func_to_container_op(train, base_image='tensorflow/tensorflow:2.2.0rc2-py3')
predict_op = comp.func_to_container_op(predict, base_image='tensorflow/tensorflow:2.2.0rc2-py3')

You have now wrapped the two functions to run in thier respective containers. However, there is still no information on which order these those two steps should be executed.

_Sounds like a job for pipeline_

In [None]:
DATA_PATH = '/mnt'
MODEL_PATH='mnist_model.h5'
# An integer representing an image from the test set that the model will attempt to predict the label for.
IMAGE_NUMBER = 0

# Define the pipeline
@dsl.pipeline(
   name='Fashion MNIST Pipeline',
   description='A pipeline that performs fashion MNIST model training and prediction.'
)

# Define parameters to be fed into pipeline
def mnist_container_pipeline(
    data_path: str = DATA_PATH,
    model_file: str = MODEL_PATH, 
    image_number: int = IMAGE_NUMBER
):
    
    # 1. Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    # 2. Create MNIST training component.
    mnist_training_container = train_op(data_path, model_file) \
                                    .add_pvolumes({data_path: vop.volume})

    # 3. Create MNIST prediction component.
    mnist_predict_container = predict_op(data_path,
                                         model_file,
                                         image_number
                                         ) \
                                    .add_pvolumes({data_path: mnist_training_container.pvolume})
    
    # 4. Print the result of the prediction
    mnist_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: mnist_predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )

    vop.delete().after(mnist_result_container)

The pipeline is just another function which is annotated with [`@dsl.pipeline`](https://www.kubeflow.org/docs/pipelines/sdk/sdk-overview/#sdk-packages).

Inside the function there are 4 main blocks:

1. In the first block, a [VolumeOp](https://www.kubeflow.org/docs/pipelines/sdk/manipulate-resources/#persistent-volume-claims-pvcs) is created to persist the data. In the background, Kubeflow creates a Persistent Volume Claim (PVC) and Persistent Volume (PV).
1. The second block is the training function. Notice how it uses the wrapped function. Also, the volume is attached to it.
1. The predict block comes third and it is similar to the previous one. You still have to attach the volume to pass the data to it.
1. The last block is a container that display the result. Since the result is stored in the volume, the last block has to mount the volume.

_How do you submit the pipeline to the cluster?_

You could put all of this pipline code in `pipeline.py`, compile it using [`dsl-compile`](https://www.kubeflow.org/docs/pipelines/sdk/build-component/#compile-the-pipeline) and upload the compiled artefact to Kubeflow Pipelines via dashboard.

But there's a better way.

You can connect to the Kubeflow Pipelines API server with:

In [None]:
client = kfp.Client(host='ml-pipeline.kubeflow.svc.cluster.local:8888')

The host is the name of the Kubernetes Service that is exposed by the API and it is available only iniside the cluster.

The last step is to use the client to submit the pipeline:

In [None]:
pipeline_func = mnist_container_pipeline
experiment_name = 'fashion_minist_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "model_file":MODEL_PATH,
             "image_number": IMAGE_NUMBER}

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

You should see two links to inspect the pipeline.

Congratulations, you just built a Kubeflow Pipeline!