# MNIST End to End examples with Kubeflow compoenents

This pipeline contains 5 steps, it finds the best hyperparameter using Katib, creates PVC for storing models, processes the hyperparameter results, distributedly trains the model on TFJob with the best hyperparameter using more iterations, and finally serves the model using KFServing. You can visit this [medium blog](https://medium.com/@liuhgxa/an-end-to-end-use-case-by-kubeflow-b2f72b0b587) for more details on this pipeline.

### Define the pipeline name and Kubeflow user namespace

Change the below cell with a name you want to use for this pipeline and which namespace you want to execute on Kubeflow.

In [1]:
pipeline_name = "mnist-demo"
user_namespace = "anonymous"

Import the DSL package and define the Kubeflow pipeline

In [2]:
import json
from string import Template

import kfp
from kfp import components
from kfp.components import func_to_container_op
import kfp.dsl as dsl

In [3]:
def convert_mnist_experiment_result(experiment_result) -> str:
    import json
    r = json.loads(experiment_result)
    args = []
    for hp in r:
        print(hp)
        args.append("%s=%s" % (hp["name"], hp["value"]))

    return " ".join(args)

def add_istio_annotation(op):
    op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')
    return op

@dsl.pipeline(
    name="End to end pipeline",
    description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
        name=pipeline_name,
        namespace=user_namespace,
        step=4000):
    # step 1: create a Katib experiment to tune hyperparameters
    objectiveConfig = {
      "type": "minimize",
      "goal": 0.001,
      "objectiveMetricName": "loss",
    }
    algorithmConfig = {"algorithmName" : "random"}
    parameters = [
      {"name": "--tf-learning-rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.03"}},
      {"name": "--tf-batch-size", "parameterType": "discrete", "feasibleSpace": {"list": ["16", "32", "64"]}},
    ]
    rawTemplate = {
      "apiVersion": "kubeflow.org/v1",
      "kind": "TFJob",
      "metadata": {
         "name": "{{.Trial}}",
         "namespace": "{{.NameSpace}}"
      },
      "spec": {
        "tfReplicaSpecs": {
          "Chief": {
            "replicas": 1,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [
                    "python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          },
          "Worker": {
            "replicas": 3,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [ 
                    "python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          }
        }
      }
    }
    
    trialTemplate = {
      "goTemplate": {
        "rawTemplate": json.dumps(rawTemplate)
      }
    }

    metricsCollectorSpec = {
      "source": {
        "fileSystemPath": {
          "path": "/tmp/tf",
          "kind": "Directory"
        }
      },
      "collector": {
        "kind": "TensorFlowEvent"
      }
    }

    katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
    op1 = katib_experiment_launcher_op(
            experiment_name=name,
            experiment_namespace=namespace,
            parallel_trial_count=3,
            max_trial_count=12,
            objective=str(objectiveConfig),
            algorithm=str(algorithmConfig),
            trial_template=str(trialTemplate),
            parameters=str(parameters),
            metrics_collector=str(metricsCollectorSpec),
            # experiment_timeout_minutes=experimentTimeoutMinutes,
            delete_finished_experiment=False)

    # step2: create a TFJob to train your model with best hyperparameter tuned by Katib
    tfjobjson_template = Template("""
{
  "apiVersion": "kubeflow.org/v1",
  "kind": "TFJob",
  "metadata": {
    "name": "$name",
    "namespace": "$namespace",
    "annotations": {
        "sidecar.istio.io/inject": "false"
    }
  },
  "spec": {
    "tfReplicaSpecs": {
      "Chief": {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "volumes": [
              {
                "name": "export-model",
                "persistentVolumeClaim": {
                  "claimName": "$modelpvc"
                }
              }
            ],
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "volumeMounts": [
                  {
                    "mountPath": "/mnt/export",
                    "name": "export-model"
                  }
                ]
              }
            ]
          }
        }
      },
      "Worker": {
        "replicas": 3,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "volumes": [
              {
                "name": "export-model",
                "persistentVolumeClaim": {
                  "claimName": "$modelpvc"
                }
              }
            ],
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "volumeMounts": [
                  {
                    "mountPath": "/mnt/export",
                    "name": "export-model"
                  }
                ]
              }
            ]
          }
        }
      }
    }
  }
}
""")

    convert_op = func_to_container_op(convert_mnist_experiment_result)
    op2 = convert_op(op1.output)

    modelvolop = dsl.VolumeOp(
        name="modelpvc",
        resource_name="modelpvc",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWM
    )

    tfjobjson = tfjobjson_template.substitute(
            {'args': op2.output,
             'name': name,
             'namespace': namespace,
             'step': step,
             'modelpvc': modelvolop.outputs["name"]
            })

    tfjob = json.loads(tfjobjson)

    train = dsl.ResourceOp(
        name="train",
        k8s_resource=tfjob,
        success_condition='status.replicaStatuses.Worker.succeeded==3,status.replicaStatuses.Chief.succeeded==1'
    )

    # step 3: model inferencese by KFServing Inferenceservice
    inferenceservice_template = Template("""
{
  "apiVersion": "serving.kubeflow.org/v1alpha2",
  "kind": "InferenceService",
  "metadata": {
    "name": "$name",
    "namespace": "$namespace"
  },
  "spec": {
    "default": {
      "predictor": {
        "tensorflow": {
          "storageUri": "pvc://$modelpvc/"
        }
      }
    }
  }
}
""")
    inferenceservicejson = inferenceservice_template.substitute({'modelpvc': modelvolop.outputs["name"],
                                                                 'name': name,
                                                                 'namespace': namespace})
    inferenceservice =  json.loads(inferenceservicejson)
    inference = dsl.ResourceOp(
      name="inference",
      k8s_resource=inferenceservice,
      success_condition='status.url').after(train)
    
    dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)

Compile the pipeline into Tekton yaml using the kfp-tekton SDK

In [4]:
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(mnist_pipeline, 'mnist.yaml')

Assign permission to Tekton's default service account and run this pipeline

In [5]:
!kubectl create clusterrolebinding $user_namespace-edit --clusterrole kubeflow-edit --serviceaccount=$user_namespace:default
!kubectl apply -f mnist.yaml -n $user_namespace

clusterrolebinding.rbac.authorization.k8s.io/default-edit created
task.tekton.dev/kubeflow-launch-experiment configured
task.tekton.dev/convert-mnist-experiment-result configured
task.tekton.dev/modelpvc unchanged
task.tekton.dev/train configured
task.tekton.dev/inference configured
pipeline.tekton.dev/end-to-end-pipeline configured
pipelinerun.tekton.dev/end-to-end-pipeline-run created


Stream the pipeline logs

In [6]:
!tkn pipelinerun logs end-to-end-pipeline-run -n $user_namespace -f

[92m[92;1m[modelpvc : modelpvc] [0mtime="2020-04-24T16:48:17Z" level=error msg="Initialize script failed: exit status 2:"
[92m[92;1m[modelpvc : modelpvc] [0mtime="2020-04-24T16:48:17Z" level=info msg="kubectl create -f /tmp/manifest.yaml -o json"
[92m[92;1m[modelpvc : modelpvc] [0mtime="2020-04-24T16:48:18Z" level=info msg=anonymous/PersistentVolumeClaim./end-to-end-pipeline-run-modelpvc
[92m[92;1m[modelpvc : modelpvc] [0mtime="2020-04-24T16:48:18Z" level=info msg="Saving resource output parameters"
[93m[93;1m[kubeflow-launch-experiment : kubeflow-launch-experiment] [0mINFO:root:Generating experiment template.
[92m[92;1m[modelpvc : modelpvc] [0mtime="2020-04-24T16:48:18Z" level=info msg="[kubectl get PersistentVolumeClaim./end-to-end-pipeline-run-modelpvc -o jsonpath={} -n anonymous]"
[93m[93;1m[kubeflow-launch-experiment : kubeflow-launch-experiment] [0mINFO:root:Creating kubeflow.org/experiments mnist-demo in namespace anonymous.
[92m[92;1m[modelpvc : modelpvc]

[95m[95;1m[train : train] [0mtime="2020-04-24T16:57:04Z" level=info msg="{\"apiVersion\": \"kubeflow.org/v1\",\"kind\": \"TFJob\",\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"},\"creationTimestamp\": \"2020-04-24T16:57:02Z\",\"generation\": 1,\"name\": \"mnist-demo\",\"namespace\": \"anonymous\",\"resourceVersion\": \"31482534\",\"selfLink\": \"/apis/kubeflow.org/v1/namespaces/anonymous/tfjobs/mnist-demo\",\"uid\": \"4e4f7e8d-562a-4f1d-a6cc-235c17cc2a38\"},\"spec\": {\"tfReplicaSpecs\": {\"Chief\": {\"replicas\": 1,\"restartPolicy\": \"OnFailure\",\"template\": {\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"}},\"spec\": {\"containers\": [{\"args\": [\"python /opt/model.py --tf-train-steps=2000 --tf-export-dir=/mnt/export --tf-learning-rate=0.020268038798600188 --tf-batch-size=16\"],\"command\": [\"sh\",\"-c\"],\"image\": \"liuhougangxa/tf-estimator-mnist\",\"name\": \"tensorflow\",\"volumeMounts\": [{\"mountPath\": \"/mnt/export\",\

[95m[95;1m[train : train] [0mtime="2020-04-24T16:57:08Z" level=info msg="{\"apiVersion\": \"kubeflow.org/v1\",\"kind\": \"TFJob\",\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"},\"creationTimestamp\": \"2020-04-24T16:57:02Z\",\"generation\": 1,\"name\": \"mnist-demo\",\"namespace\": \"anonymous\",\"resourceVersion\": \"31482611\",\"selfLink\": \"/apis/kubeflow.org/v1/namespaces/anonymous/tfjobs/mnist-demo\",\"uid\": \"4e4f7e8d-562a-4f1d-a6cc-235c17cc2a38\"},\"spec\": {\"tfReplicaSpecs\": {\"Chief\": {\"replicas\": 1,\"restartPolicy\": \"OnFailure\",\"template\": {\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"}},\"spec\": {\"containers\": [{\"args\": [\"python /opt/model.py --tf-train-steps=2000 --tf-export-dir=/mnt/export --tf-learning-rate=0.020268038798600188 --tf-batch-size=16\"],\"command\": [\"sh\",\"-c\"],\"image\": \"liuhougangxa/tf-estimator-mnist\",\"name\": \"tensorflow\",\"volumeMounts\": [{\"mountPath\": \"/mnt/export\",\

[95m[95;1m[train : train] [0mtime="2020-04-24T16:57:44Z" level=info msg="{\"apiVersion\": \"kubeflow.org/v1\",\"kind\": \"TFJob\",\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"},\"creationTimestamp\": \"2020-04-24T16:57:02Z\",\"generation\": 1,\"name\": \"mnist-demo\",\"namespace\": \"anonymous\",\"resourceVersion\": \"31482952\",\"selfLink\": \"/apis/kubeflow.org/v1/namespaces/anonymous/tfjobs/mnist-demo\",\"uid\": \"4e4f7e8d-562a-4f1d-a6cc-235c17cc2a38\"},\"spec\": {\"tfReplicaSpecs\": {\"Chief\": {\"replicas\": 1,\"restartPolicy\": \"OnFailure\",\"template\": {\"metadata\": {\"annotations\": {\"sidecar.istio.io/inject\": \"false\"}},\"spec\": {\"containers\": [{\"args\": [\"python /opt/model.py --tf-train-steps=2000 --tf-export-dir=/mnt/export --tf-learning-rate=0.020268038798600188 --tf-batch-size=16\"],\"command\": [\"sh\",\"-c\"],\"image\": \"liuhougangxa/tf-estimator-mnist\",\"name\": \"tensorflow\",\"volumeMounts\": [{\"mountPath\": \"/mnt/export\",\

[95m[95;1m[train : train] [0mtime="2020-04-24T16:57:46Z" level=info msg="Saved output parameter: name, value: mnist-demo"

[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:53Z" level=error msg="Initialize script failed: exit status 2:"
[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:53Z" level=info msg="kubectl create -f /tmp/manifest.yaml -o json"
[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:53Z" level=info msg=anonymous/InferenceService.serving.kubeflow.org/mnist-demo
[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:53Z" level=info msg="Waiting for conditions: status.url"
[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:53Z" level=info msg="kubectl get InferenceService.serving.kubeflow.org/mnist-demo -w -o json -n anonymous"
[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:57:54Z" level=info msg="{\"apiVersion\": \"serving.kubeflow.org/v1alpha2\",\"kind\": \"InferenceService\",\"metadata\": {\"cr

[96m[96;1m[inference : inference] [0mtime="2020-04-24T16:58:14Z" level=info msg="Saved output parameter: name, value: mnist-demo"



When the pipeline done, you can get `inferenceservice` name using the below command, for example in this case in my cluster, the `inference-name` is `mnist-demo`

In [7]:
!kubectl get inferenceservice -n $user_namespace

NAME         URL                                                            READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
mnist-demo   http://mnist-demo.anonymous.example.com/v1/models/mnist-demo   True    100                                8m19s


Download a mnist picture for inference test if it's not in this directory, such as 9.bmp from [here](https://raw.githubusercontent.com/hougangliu/pipelines/e2e-pipeline-sample/samples/contrib/e2e-mnist/9.bmp). Then upload it to the notebook.

Update the **cluster_ip** below with your kfserving ingress endpoint. Then, execute the below cell to send a sample payload to the deployed model.

In [8]:
import numpy as np
from PIL import Image
import requests

# Get cluster_ip by "kubectl -n istio-system get service kfserving-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}'"
cluster_ip = "xxx.xx.xxx.xx"

# Get inference_name as above step 1
inference_name = pipeline_name
inference_namespace = user_namespace
# image_file is the mnist picture uploaded as above step 2
image_file = '9.bmp'
data = np.array(Image.open(image_file).convert('L').resize((28, 28))).astype(np.float).reshape(-1, 28, 28, 1)
np.set_printoptions(threshold=np.inf)       
json_request = '{{ "instances" : {} }}'.format(np.array2string(data, separator=',', formatter={'float':lambda x: "%.1f" % x}))
headers={"Host": "%s.%s.example.com" % (inference_name, inference_namespace)}

response = requests.post("http://%s/v1/models/%s:predict" % (cluster_ip, inference_name), data = json_request, headers = headers)
print(response.json())

{'predictions': [{'predictions': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0], 'classes': 9}]}
