# From Notebook to clothing images classification pipeline using Fashion MNIST

This notebook is based on the github project [From Notebook to Kubeflow Pipeline using Fashion MNIST](https://github.com/manceps/fashion-mnist-kfp-lab/blob/master/KF_Fashion_MNIST.ipynb) under the [MIT License](https://github.com/manceps/fashion-mnist-kfp-lab/blob/master/LICENSE).

# From notebook to Kubeflow Pipeline using Fahion MNIST

In this notebook, we will walk you through the steps of converting a machine learning model, which you may already have on a jupyter notebook, into a Kubeflow pipeline. As an example, we will make use of the fashion we will make use of the fashion MNIST dataset and the [Basic classification with Tensorflow](https://www.tensorflow.org/tutorials/keras/classification) example.

In this example we use:

* **Kubeflow pipelines** - [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) is a machine learning workflow platform that is helping data scientists and ML engineers tackle experimentation and productionization of ML workloads. It allows users to easily orchestrate scalable workloads using an SDK right from the comfort of a Jupyter Notebook.

**Note:** This notebook is to be run on a notebook server inside the Kubeflow environment. 

## Section 1: Data exploration (as in [here](https://www.tensorflow.org/tutorials/keras/classification))

The [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist)  dataset contains 70,000 grayscale images in 10 clothing categories. Each image is 28x28 pixels in size. We chose this dataset to demonstrate the funtionality of Kubeflow Pipelines without introducing too much complexity in the implementation of the ML model.

To familiarize you with the dataset we will do a short exploration. It is always a good idea to understand your data before you begin any kind of analysis.

<table>
  <tr><td>
    <img src="https://tensorflow.org/images/fashion-mnist-sprite.png"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
    <b>Figure 1.</b> <a href="https://github.com/zalandoresearch/fashion-mnist">Fashion-MNIST samples</a> (by Zalando, MIT License).<br/>&nbsp;
  </td></tr>
</table>


### 1.1 Install packages:

In [None]:
!python -m pip install --user --upgrade pip
!pip install --user --upgrade pandas matplotlib numpy

After the installation, we need to restart kernel for changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### 1.2 Import libraries

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras

# Data exploration libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

### 1.2 Import the Fashion MNIST dataset

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

Each image is mapped to a single label. Since the *class names* are not included with the dataset, store them here to use later when plotting the images:

In [None]:
class_names = ['Top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

Let's look at the format of each dataset split. The training set contains 60,000 images and the test set contains 10,000 images which are each 28x28 pixels.

---



### 1.3 Explore the data

In [None]:
print(f'Number of training images: {train_images.shape[0]}\n')
print(f'Number of test images: {test_images.shape[0]}\n')

print(f'Image size: {train_images.shape[1:]}')


There are logically 60,000 training labels and 10,000 test labels.

In [None]:
print(f'Number of labels: {len(train_labels)}\n')
print(f'Number of test labels: {len(test_labels)}')

Each label is an integer between 0 and 9 corresponding to the 10 class names.

In [None]:
unique_train_labels = np.unique(train_labels)

for label in zip(class_names, unique_train_labels):
    label_name, label_num = label
    print(f'{label_name}: {label_num}')

### 1.4 Preprocess the data

To properly train the model, the data must be normalized so each value will fall between 0 and 1. Later on this step will be done inside of the training script but we will show what that process looks like here.

The first image shows that the values fall in a range between 0 and 255.

In [None]:
plt.figure()
plt.imshow(train_images[0])
plt.colorbar()
plt.grid(False)
plt.show()

To scale the data we divide the training and test values by 255.

In [None]:
train_images = train_images / 255.0

test_images = test_images / 255.0

We plot the first 25 images from the training set to show that the data is in fact in the form we expect.

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_images[i], cmap=plt.cm.binary)
    plt.xlabel(class_names[train_labels[i]])
plt.show()

# Section 2: Kubeflow pipeline building

Up until this point, all our steps are similar to what you can find in the [Basic classification with Tensorflow](https://https://www.tensorflow.org/tutorials/keras/classification) example. The next step given in [From Notebook to Kubeflow Pipeline using Fashion MNIST](https://github.com/manceps/fashion-mnist-kfp-lab/blob/master/KF_Fashion_MNIST.ipynb) is to build the containerized approach provided by Kubeflow to allow our model to be run using Kubernetes. We will be adapting the provided code in order to generate some metrics and change some pipeline components connections.

### 2.1 Install Kubeflow pipelines SDK

 The first step is to install the Kubeflow Pipelines SDK package.



In [None]:
!pip install --user --upgrade kfp

After the installation, we need to restart kernel for changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Check if the install was successful:

**/usr/local/bin/dsl-compile**

In [None]:
!which dsl-compile

You should see **/usr/local/bin/dsl-compile** above.

### 2.2 Build Container Components

The following cells define functions that will be transformed into lightweight container components. It is recommended to look at the corresponding Fashion MNIST notebook to match what you see here to the original code.

<table>
  <tr><td>
    <img src="https://www.kubeflow.org/docs/images/pipelines-sdk-lightweight.svg"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
  </td></tr>
</table>


In [None]:
# Import Kubeflow SDK
import kfp
import kfp.dsl as dsl
import kfp.components as comp
import typing

In [None]:
def train(compile_optimizer, epochs, validation_split, data_path: comp.OutputPath(), lossplot_path: comp.OutputPath(str)) -> typing.NamedTuple('loss_plot', [('mlpipeline_ui_metadata', 'UI_metadata')]):

    from tensorflow.python import keras
    import os
    import matplotlib.pyplot as plt
    import base64
    import json
    from collections import namedtuple
    
    # Parse pipeline parameters
    epochs = int(epochs)
    validation_split = float(validation_split)

    def save_loss_plot(history, plot_path):
        """
        history: History object from keras. Its History.history attribute is a record of training loss and validation loss values.
        plot_path: path where plot image will be saved.
        """
        # Creation of the plot
        loss, val_loss = history.history["loss"], history.history["val_loss"]
        fig = plt.figure(figsize=(30, 10))
        plt.subplots_adjust(wspace=0.3, hspace=0.3)
        ax = fig.add_subplot(1, 1, 1)
        ax.cla()
        ax.plot(loss)
        ax.plot(val_loss)
        ax.set_title("Model loss")
        ax.set_xlabel("Epoch")
        ax.set_ylabel("Loss")
        ax.legend(["Train", "Validation"], loc="upper right")

        # Saving plot in specified path
        with open(plot_path, "wb") as fd:
            plt.savefig(fd)

    def get_web_app_from_loss_plot(plot_path):
        """
        plot_path: path where plot image is saved.
        return: JSON object representing kubeflow output viewer for web-app.
        """
        # Retrieve encoded bytes of the specified image path
        with open(plot_path, "rb") as fd:
            encoded = base64.b64encode(fd.read()).decode('latin1')

        web_app_json = {
            'type': 'web-app',
            'storage': 'inline',
            'source': f"""<img width="100%" src="data:image/png;base64,{encoded}"/>"""
        }
        return web_app_json

    # Download the dataset and split into training and test data.
    fashion_mnist = keras.datasets.fashion_mnist

    (train_images, train_labels), _ = fashion_mnist.load_data()

    # Normalize the data so that the values all fall between 0 and 1.
    train_images = train_images / 255.0

    # Define the model using Keras.
    model = keras.Sequential([
        keras.layers.Flatten(input_shape=(28, 28)),
        keras.layers.Dense(128, activation='relu'),
        keras.layers.Dense(10)
    ])

    model.compile(optimizer=compile_optimizer,
                  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Run a training job with specified number of epochs
    history = model.fit(train_images, train_labels, epochs=epochs, validation_split=validation_split)

    # Save loss plot
    save_loss_plot(history, lossplot_path)

    loss_plot = [get_web_app_from_loss_plot(lossplot_path)]

    if not os.path.exists(data_path):
        os.makedirs(data_path)

    # Save the model to the specified output path
    model.save(f'{data_path}/mnist_model.h5')

    print("============== END TRAINING ==============")

    # Return specified loss_plot
    metadata = {
        'outputs' : loss_plot
    }

    loss_plot = namedtuple('loss_plot', ['mlpipeline_ui_metadata'])
    return loss_plot(json.dumps(metadata))

In [None]:
def test(data_path: comp.InputPath(), results_path: comp.OutputPath(), labels_dir: comp.OutputPath()) -> typing.NamedTuple('Outputs', [('mlpipeline_metrics', 'Metrics')]):
    import tensorflow as tf
    import numpy as np
    from tensorflow.python import keras
    import os
    import json

    # Download the dataset and split into training and test data.
    fashion_mnist = keras.datasets.fashion_mnist

    _ , (test_images, test_labels) = fashion_mnist.load_data()

    test_images = test_images / 255.0

    # Load the saved Keras model
    model = keras.models.load_model(f'{data_path}/mnist_model.h5')

    # Evaluate the model and print the results
    _, test_acc = model.evaluate(test_images, test_labels, verbose=2)
    print('Test accuracy:', test_acc)

    # Define the class names.
    class_names = ['Top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

    # Define a Softmax layer to define outputs as probabilities
    probability_model = tf.keras.Sequential([model, tf.keras.layers.Softmax()])

    # Classify all the images from the test dataset
    pred_labels = [0 for k in test_images]
    for image_number in range(len(test_images)):
        # Grab an image from the test dataset.
        img = test_images[image_number]

        # Add the image to a batch where it is the only member.
        img = (np.expand_dims(img, 0))

        # Predict the label of the image.
        predictions = probability_model.predict(img)

        # Take the prediction with the highest probability
        prediction = np.argmax(predictions[0])
        pred_labels[image_number] = prediction

        # Retrieve the true label of the image from the test labels.
        true_label = test_labels[image_number]

        class_prediction = class_names[prediction]
        confidence = 100 * np.max(predictions)
        actual = class_names[true_label]

        # Save results
        with open(results_path, 'a+') as result:
            result.write(" Image #:" + str(image_number) + " | Prediction: {} | Confidence: {:2.0f}% | Actual: {}\n".format(class_prediction, confidence, actual))

    if not os.path.exists(labels_dir):
        os.makedirs(labels_dir)

    # Save true labels and predicted labels and class names for confusion matrix
    with open(f'{labels_dir}/true_labels.txt', 'w') as ft:
        ft.write(str(list(test_labels)))

    with open(f'{labels_dir}/pred_labels.txt', 'w') as fp:
        fp.write(str(pred_labels))

    with open(f'{labels_dir}/class_names.txt', 'w') as fp:
        fp.write(str(class_names))

    # Save metrics
    metrics = {
        'metrics': [{
            'name': 'accuracy',  # The name of the metric. Visualized as the column name in the runs table.
            'numberValue': str(test_acc),  # The value of the metric. Must be a numeric value.
            'format': "RAW",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        }]
    }
    print('Prediction has been saved successfully!')
    
    return [json.dumps(metrics)]


In [None]:
def confusion_matrix(labels_dir: comp.InputPath()) -> typing.NamedTuple('conf_m_result', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    from sklearn.metrics import confusion_matrix
    import json
    from collections import namedtuple

    # Load class names
    with open(f'{labels_dir}/class_names.txt', 'r') as fc:
        class_names = eval(fc.read())

    # Load test labels and predicted labels
    with open(f'{labels_dir}/true_labels.txt', 'r') as ft:
        test_labels = eval(ft.read())

    with open(f'{labels_dir}/pred_labels.txt', 'r') as fp:
        pred_labels = eval(fp.read())

    # Build confusion matrix
    confusion_matrix = confusion_matrix(test_labels, pred_labels)

    csv_literal_confusion_matrix = ""
    for i in range(len(class_names)):
        for j in range(len(class_names)):
            csv_literal_confusion_matrix += "{target},{predicted},{count}\n".format(target=class_names[i], predicted=class_names[j], count=confusion_matrix[i][j])

    kf_literal_confusion_matrix = {
        'outputs' : [{
            'type': 'confusion_matrix',
            'format': 'csv',
            'schema': [
                {'name': 'target', 'type': 'CATEGORY'},
                {'name': 'predicted', 'type': 'CATEGORY'},
                {'name': 'count', 'type': 'NUMBER'},
            ],
            'storage': 'inline',
            'source': csv_literal_confusion_matrix,
            'labels': class_names,
        }]
    }

    confusion_matrix_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    return confusion_matrix_result(json.dumps(kf_literal_confusion_matrix))

In [None]:
# Create train and predict lightweight components.
train_op = comp.func_to_container_op(train, base_image='tensorflow/tensorflow:latest-gpu-py3', packages_to_install=['matplotlib'])
test_op = comp.func_to_container_op(test, base_image='tensorflow/tensorflow:latest-gpu-py3', packages_to_install=['scikit-learn'])
confusion_matrix_op = comp.func_to_container_op(confusion_matrix,  packages_to_install=['scikit-learn'])

### 2.3 Build Kubeflow Pipeline

Our next step will be to create the various components that will make up the pipeline. Define the pipeline using the *@dsl.pipeline* decorator.

The pipeline function is defined and includes a number of paramters that will be fed into our various components throughout execution. Kubeflow Pipelines are created decalaratively. This means that the code is not run until the pipeline is compiled. 

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='MNIST Pipeline',
   description='A toy pipeline that performs mnist model training and prediction.'
)

# Define parameters to be fed into pipeline
def mnist_container_pipeline(
    compile_optimizer: str = "adam",
    epochs: int = 50,
    validation_split: float = 0.15,
):
    # Create MNIST training component.
    mnist_training_container = train_op(compile_optimizer, epochs, validation_split)
    mnist_training_container.execution_options.caching_strategy.max_cache_staleness = "P0D"


    # Create MNIST prediction component.
    mnist_predict_container = test_op(mnist_training_container.outputs['data'])
    
    confusion_matrix_op(mnist_predict_container.outputs['labels_dir'])

### 2.4 Compile pipeline

Finally we feed our pipeline definition into the compiler. We create a zip that can be uploaded as a pipeline in the kubeflow UI.

In [None]:
experiment_name = 'fashion_mnist_kubeflow'
pipeline_func= mnist_container_pipeline

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))


### 2.5 Run pipeline
Create a client to enable communication with the Pipelines API server. Please, provide your security token and your authservice_session to communicate with the pipeline from the notebook.

In [None]:
COOKIE ='YOUR_COOKIE'

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Disable ssl verification
from kfp_server_api.configuration import Configuration as Configuration
if 'old_init' not in globals():
    old_init = Configuration.__init__

def new_init(self, *k, **kw):
    old_init(self, *k, **kw)
    self.verify_ssl = False
Configuration.__init__ = new_init
cookies = COOKIE
client = kfp.Client(host='http://istio-ingressgateway.istio-system.svc/pipeline', namespace='admin', cookies=cookies)
client.list_experiments(namespace="admin")

Finally we run it as an experiment. This will give us 2 links at the bottom that we can follow to the Kubeflow Pipelines UI where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.

Define some environment variables which are to be used as inputs at various points in the pipeliner arguments, and run the experiment from the notebook!

In [None]:
experiment_name = 'fashion_mnist_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {
    "compile_optimizer" : "adam",
    "epochs" : 1,
    "validation_split" : 0.15,
}
# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name,
                                                  namespace='admin',
                                                  run_name=run_name, 
                                                  arguments=arguments)