# Kubeflow Pipelines Using Fashion MNIST

[Kubeflow Pipelines](https://https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) is a machine learning workflow platform that is helping data scientists and ML engineers tackle experimentation and productionization of ML workloads. It allows users to easily orchestrate scalable workloads using an SDK right from the comfort of a Jupyter Notebook.

[Microk8s](https://https://microk8s.io/docs) is a service that gives you the ability to spin up a lightweight Kubernetes cluster right on your local machine. It comes with Kubeflow built right in. 

This notebook will walk you through the steps of taking a current workload you may have and converting it to a notebook which can then be run using Kubeflow. 

This notebook features code from a tutorial which can found on the [Tensorflow website](https://https://www.tensorflow.org/tutorials/keras/classification).

## Fashion MNIST dataset Exploration

The [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist)  dataset contains 70,000 grayscale images in 10 clothing categories. Each image is 28x28 pixels in size. We chose this dataset to demonstrate the funtionality of Kubeflow Pipelines without introducing too much complexity in the implementation of the ML model.

To familiarize you with the dataset we will do a short exploration. It is always a good idea to understand your data before you begin any kind of analysis.

<table>
  <tr><td>
    <img src="https://tensorflow.org/images/fashion-mnist-sprite.png"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
    <b>Figure 1.</b> <a href="https://github.com/zalandoresearch/fashion-mnist">Fashion-MNIST samples</a> (by Zalando, MIT License).<br/>&nbsp;
  </td></tr>
</table>


Install packages for data exploration:

In [9]:
!python -m pip install --user --upgrade pip
!pip install --user --upgrade pandas matplotlib numpy

Requirement already up-to-date: pip in ./.local/lib/python3.6/site-packages (20.1.1)
Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Collecting pandas
  Using cached pandas-1.0.5-cp36-cp36m-manylinux1_x86_64.whl (10.1 MB)
Requirement already up-to-date: matplotlib in ./.local/lib/python3.6/site-packages (3.2.2)
Requirement already up-to-date: numpy in ./.local/lib/python3.6/site-packages (1.19.0)
[31mERROR: google-colab 1.0.0 has requirement pandas~=0.24.0, but you'll have pandas 1.0.5 which is incompatible.[0m
Installing collected packages: pandas
  Attempting uninstall: pandas
    Found existing installation: pandas 0.24.2
    Uninstalling pandas-0.24.2:
      Successfully uninstalled pandas-0.24.2
Successfully installed pandas-1.0.5


Restart kernel to allow installs to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras

# Helper libraries
import os
import subprocess

# Data exploration
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

First we will download the dataset and split it into train and test images.

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

Each image is mapped to a single label. Since the *class names* are not included with the dataset, store them here to use later when plotting the images:

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

Let's look at the format of each dataset split. The training set contains 60,000 images and the test set contains 10,000 images which are each 28x28 pixels.

---



In [None]:
print(f'Number of training images: {train_images.shape[0]}\n')
print(f'Number of test images: {test_images.shape[0]}\n')

print(f'Image size: {train_images.shape[1:]}')


There are logically 60,000 training labels and 10,000 test labels.

In [None]:
print(f'Number of labels: {len(train_labels)}\n')
print(f'Number of test labels: {len(test_labels)}')

Each label is an integer between 0 and 9 corresponding to the 10 class names.

In [None]:
unique_train_labels = np.unique(train_labels)

for label in zip(class_names, train_labels):
  label_name, label_num = label
  print(f'{label_name}: {label_num}')

To properly train the model, the data must be normalized so each value will fall between 0 and 1. Later on this step will be done inside of the training script but we will show what that process looks like here.

The first image shows that the values fall in a range between 0 and 255.

In [None]:
plt.figure()
plt.imshow(train_images[0])
plt.colorbar()
plt.grid(False)
plt.show()

To scale the data we divide the training and test values by 255.

In [None]:
train_images = train_images / 255.0

test_images = test_images / 255.0

We plot the first 25 images from the training set to show that the data is in fact in the form we expect.

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_images[i], cmap=plt.cm.binary)
    plt.xlabel(class_names[train_labels[i]])
plt.show()

## Building a Kubeflow Pipeline

Now that we have a good handle on what the data looks like. The first step is to install the Kubeflow Pipelines SDK package.



In [2]:
!pip install --user --upgrade kfp

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.
Requirement already up-to-date: kfp in ./.local/lib/python3.6/site-packages (0.5.1)


Restart the kernel for the installs to take effect.

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

If the install was successful then you should see:

/bin/local/dsl <-- This needs to be corrected as it is not the correct output.

In [1]:
!which dsl-compile

/usr/local/bin/dsl-compile


In [3]:
# Kubeflow SDK
import kfp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp

Create a client to communicate with the Pipelines API server.


In [4]:
client = kfp.Client(host='pipelines-api.kubeflow.svc.cluster.local:8888')

### Building Container Images using Google Cloud Build

Cloud Build is a wonderful service offerred by Google Cloud Platform. It allows you to build containers in the cloud which allows you to build large containers without local compute constraints. Your images are then stored in a secure private Google Container Registry. The only charges that apply are those pertaining to storing the images on Google Cloud Storage.

In [None]:
PROJECT_ID = 'manceps-labs'

In [None]:
# Enable the Cloud Build and Container Registry APIs
!gcloud services enable cloudbuild.googleapis.com containerregistry.googleapis.com

Since the only services we are using are Cloud Build and Container Registry, set the default credentials to use the default Cloud Build service account. This will narrow the scope of what the application is capable of doing on Google Cloud Platform.

In [None]:
%%bash -s "$PROJECT_ID"

export SERVICE_ACCOUNT_EMAIL=$(gcloud iam service-accounts list | grep 'cloud-build' | awk '{ print $2 }')

gcloud config set project $1

gcloud iam service-accounts keys create key.json \
    --iam-account=$SERVICE_ACCOUNT_EMAIL

export GOOGLE_APPLICATION_CREDENTIALS=key.json

Define your local paths where your component files will be stored. In our case we will be creating two components. The first component will be a training component which will pull the data, process it, and train a model. 

The second component will be a prediction component which will acually be predicting the label of a specified image from the test data.

In [None]:
TRAINING_PATH = '/tmp/components/mnist_training/'
PREDICT_PATH = '/tmp/components/mnist_predict/'
DATA_PATH = '/mnt'


The following cells define strings that will be written to the training and test *app.py* file. It is recommended to look at the corresponding Fashion MNIST notebook to match the code to understand what was included from there and what needed to be added to ensure that it runs properly during pipeline execution.

In [None]:
train_file = '''import argparse
from datetime import datetime
import pickle

import tensorflow as tf
from tensorflow import keras

# Use the argparse package to define command line arguments.
parser = argparse.ArgumentParser()

parser.add_argument('--epochs', type=str, required=False, default=10, help='Number of epochs to train the model for.')
parser.add_argument('--data-path', type=str, required=False, default='/mnt', help='Absolute path where the persistent volume will be mounted.'
parser.add_argument('--model-file', type=str, required=True, help='Name of the model file (ex. model.h5).')
    
args = parser.parse_args()

epochs = parser.epochs
data_path = parser.data_path
model_file = args.model_file

# Download the dataset and split into training and test data. 
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Normalize the data so that the values all fall between 0 and 1.
train_images = train_images / 255.0

test_images = test_images / 255.0

# Define the model using Keras.
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(512, activation=tf.nn.relu),
    keras.layers.Dropout(0.25),
    keras.layers.Dense(10, activation=tf.nn.softmax)
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

print(model.summary())    

# Run a training job with specified number of epochs (Default = 10)
model.fit(train_images, train_labels, epochs=epochs)

# Evaluate the model and print the results
test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
print('Test accuracy:', test_acc)

# Save the model to the designated 
model.save(model_file)

# Save the test_data as a pickle file to be used by the predict component.
with open(f'{data_path}/test_data', 'wb') as f:
    pickle.dump((test_images,test_labels), f)

'''

In [None]:
predict_file = '''import argparse
import pickle

import tensorflow as tf
from tensorflow import keras

import numpy as np
import matplotlib.pyplot as plt

# Use the argparse package to define command line arguments.
parser = argparse.ArgumentParser()

parser.add_argument('--data-path', type=str, required=False, default='/mnt', help='Absolute path where the persistent volume will be mounted.')
parser.add_argument('--image-number', type=int, required=False, default=0, help='Image to predict (0-9999).')
parser.add_argument('--model-path', type=str, required=True, help='Name of the saved Keras model file (ex. model.h5).')

args = parser.parse_args()

image_number = args.image_number
model_path = args.model_path

# Load the saved Keras model
model = keras.models.load_model(model_path)

# Load and unpack the test_data
with open('/mnt/test_data','rb') as f:
    test_data = pickle.load(f)

# Separate the test_images from the test_labels.
test_images, test_labels = test_data

# Define the class names.
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

# Define a Softmax layer to define outputs as probabilities
probability_model = tf.keras.Sequential([model, 
                                        tf.keras.layers.Softmax()])

# Grab an image from the test dataset.
img = test_images[image_number]

# Add the image to a batch where it is the only member.
img = (np.expand_dims(img,0))

# Predict the label of the image.
predictions = probability_model.predict(img)

# Take the prediction with the highest probability
prediction = np.argmax(predictions[0])

# Retrieve the true label of the image from the test labels.
true_label = test_labels[image_number]

with open(f'{data_path}/result.txt, 'w') as f:
  "Prediction: {} | Confidence: {:2.0f}% | Actual: {}".format(class_names[prediction],
                                100*np.max(predictions),
                                class_names[true_label]))
'''

In [None]:
dockerfile ='''FROM docker pull tensorflow/serving:2.2.0-gpu-py3
ARG is_predict
# Change current working directory.
WORKDIR /opt

COPY . /opt

RUN if ['$is_predict'= True]; then python3 -m pip install numpy
'''

In [None]:
def create_files(path, app_text, dockerfile=dockerfile):
    '''
    Creates app.py file and Dockerfile at specified path.

    Args:
      path (str): Destination path where app.py and Dockerfile will be created.
      app_text (str): A string to be used as the bodyReturns:
        str: User choice.of the app.py file.
      dockerfile (str): A string specifying the Dockerfile structure to be used.

    Returns:
      None
    '''
    if not os.path.isdir(path):
        os.makedirs(path)

    with open(f'{path}/app.py', 'w') as f:
        f.writelines(app_text)
        
    with open(f'{path}/Dockerfile', 'w') as f:
        f.writelines(dockerfile)

    return print('File path: ', path, '\n' \
                 'Files created: ', os.listdir(path), '\n')

Create the files needed to build our component containers.

In [None]:
create_files(TRAINING_PATH, train_file)
create_files(PREDICT_PATH, predict_file)

In [None]:
def create_image_string(image_name, project_id):
    '''
    Creates Docker image string.
    
    Args:
      image_name (str): Name for Docker image.
      project_id (str): GCP Project ID.

    Returns:
      str: A GCR Docker image tag.
    '''
    gcr_image = f"us.gcr.io/{project_id}/{image_name}:latest"
    print(gcr_image)
    return gcr_image


def build_docker_image(gcr_image, app_folder):
    '''
    Builds Docker image using Google Cloud Build and stores in Google Cloud Repository

    Args:
      gcr_image (str): A Google Container Registry tag for Docker container build.
      app_folder (str): Path to app.py and Dockerfile.

    Return:
      None
    '''
    cmd = ['gcloud', 'builds', 'submit', '--tag', '--build-arg', 'is_predict=False', gcr_image, app_folder]

    if predict_gcr_image.find('train') != -1:
        cmd[-3] = 'is_predict=True'
    build_log = subprocess.run(cmd, stdout=subprocess.PIPE).stdout[:-1].decode('utf-8')
    print(build_log)

Here we produce the Docker image tags we will use to store our images on Google Container Registry.

In [None]:
train_gcr_image = create_image_string('train', PROJECT_ID)
predict_gcr_image = create_image_string('predict', PROJECT_ID)

Now we build the train and predict containers we will be using in our Kubeflow Pipeline. Make sure that the message that you receive once the operation is complete signals that the builds were successful before moving on.

In [None]:
build_docker_image(train_gcr_image, TRAINING_PATH)
build_docker_image(predict_gcr_image, PREDICT_PATH)

### Defining our Kubeflow Pipeline

Kubeflow Pipelines are created decalaratively. The code is not run until the pipeline is compiled. 

We first define some environment variables which are to be used as inputs at various points in the pipeline.

In [None]:
# Name of the Keras model that will be saved.
MODEL_FILE = 'mnist_model.h5'

# An integer representing an image from the test set that the model will attempt to predict the label for.
IMAGE_NUMBER = 1

Our next step will be to create the various components that will make up the pipeline. We define our pipeline using the *@dsl.pipeline* decorator at the top.

We then define our pipeline function which takes in a number of paramters that will be fed into our various components throughout execution.

Here we attach a create a [Persistent Volume Claim](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) using the [VolumeOp](https://) method to save our data across our components. Note that while this is a great method to use locally, you could also use a cloud bucket for your persistent storage.

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='MNIST Pipeline',
   description='A toy pipeline that performs mnist model training and prediction.'
)
# Define parameters to be fed into pipeline
def mnist_container_pipeline(
    data_path: str = DATA_PATH
    model_file: str = f'{DATA_PATH}/{MODEL_FILE}', 
    project_id: str = PROJECT_ID,
    image_number: int = IMAGE_NUMBER
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWM
)
    
    # Create MNIST training component.
    mnist_training_container = dsl.ContainerOp(
      name="model_training",
      image='us.gcr.io/{}/train:latest'.format(project_id),
      command=['python', '/opt/app.py'],
      pvolumes={data_path: vop.volume}, # Mount volume to specific data path.
      arguments=['--model_file', model_file],
    )

    # Create MNIST prediction component.
    mnist_predict_container = dsl.ContainerOp(
        name="prediction",
        image='us.gcr.io/{}/predict:latest'.format(project_id),
        command=['python', '/opt/app.py'],
        pvolumes={data_path: mnist_training_container.pvolume}, # Mount same volume from training component.
        arguments=['--image-number', image_number,
                   '--model-path', model_file]
    )

    mnist_result_container = dsl.ContainerOp(
        name="echo_result",
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        pvolumes={data_path: mnist_predict_container.pvolume}, # Mount the same volume from the training and predict components.
        arguments=['cat', f'{data_path}/results.txt']
    )

Finally we feed our pipeline definition into the compiler and run it as an experiment. This will give us 2 links at the bottom that we can follow to the [Kubeflow Pipelines UI](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.

In [None]:
pipeline_func = mnist_container_pipeline

In [None]:
experiment_name = 'minist_kubeflow'

arguments = {"model_file":f"{DATA_PATH}/{MODEL_NAME}",
             "project_id":PROJECT_ID,
             "image_number": 1}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)