# Kubeflow E2E MNIST Case: Building, Distributed Training and Serving

This example guides you through:
  1. Taking an example TensorFlow model and modifying it to support distributed training.
  1. Using `Kubeflow Fairing` to build docker image and launch a TFJob to train model.
  1. Using `Kubeflow Fairing` to create InferenceService (KFServing) for the trained model.
  1. Clean up the TFJob and InferenceService using `kubeflow-tfjob` and `kfserving` SDK client.

## Requirements

  * The Kubeflow Fairing, TF-Operator and KFServing have been installed in Kubenertes Cluster.

### Prepare Training Code

We modified the [examples](https://github.com/tensorflow/tensorflow/blob/9a24e8acfcd8c9046e1abaac9dbf5e146186f4c2/tensorflow/examples/learn/mnist.py) to be better suited for distributed training and model serving. There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob. The updated training code is [mnist.py](mnist.py). 

### Install Required Libraries

In [1]:
!pip show kubeflow-fairing

Name: kubeflow-fairing
Version: 1.0.2
Summary: Kubeflow Fairing Python SDK.
Home-page: https://github.com/kubeflow/fairing
Author: Kubeflow Authors
Author-email: hejinchi@cn.ibm.com
License: Apache License Version 2.0
Location: /home/jupyter/anaconda3/envs/mlpipeline/lib/python3.7/site-packages
Requires: six, urllib3, grpcio, setuptools, notebook, oauth2client, cloudpickle, kubeflow-pytorchjob, kubernetes, httplib2, kubeflow-tfjob, azure-mgmt-storage, python-dateutil, ibm-cos-sdk, tornado, future, kfserving, retrying, google-cloud-logging, google-cloud-storage, numpy, nbconvert, google-auth, requests, azure-storage-file, google-api-python-client, boto3, docker
Required-by: 


### Configure the Docker Registry for Kubeflow Fairing

* In order to build docker images from your notebook we need a docker registry where the images will be stored

**Note:** The below section must be updated to your values.

In [2]:
# Set docker registry to store image.
# Ensure you have permission for pushing docker image requests. 
DOCKER_REGISTRY = 'index.docker.io/yiluxiangbei'

# Set namespace. Note that the created PVC should be in the namespace.
my_namespace = 'mnist'
# You also can get the default target namepspace using below API.
#namespace = fairing_utils.get_default_target_namespace()
print(my_namespace)

mnist


## Create PV/PVC to Store the Exported Model 

Create Persistent Volume(PV) and Persistent Volume Claim(PVC), the PVC will be used by pods of training and serving for local mode in steps below.

**Note:** The below section must be updated to your values.

In [3]:
# To satify the distributed training, the PVC should be access from all nodes in the cluster.
# The example creates a NFS PV to satify that.
nfs_server = '172.17.56.182'
nfs_path = '/root/nfs_root/'
pv_name = 'mnist-e2e-pv'
pvc_name = 'mnist-e2e-pvc'

(Optional) Skip below creating PV/PVC step if you set an existing PV and PVC.

In [5]:
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.fairing.utils import is_running_in_k8s
import yaml

pv_yaml = f'''
apiVersion: v1
kind: PersistentVolume
metadata:
  name: {pv_name}
spec:
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  nfs:
    path: {nfs_path}
    server: {nfs_server}
'''
pvc_yaml = f'''
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: {pvc_name}
  namespace: {my_namespace}
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: ""
  resources:
    requests:
      storage: 10Gi
'''

if is_running_in_k8s():
    print('in k8s')
    k8s_config.load_incluster_config()
else:
    print('not in k8s')
    k8s_config.load_kube_config()

k8s_core_api = k8s_client.CoreV1Api()
k8s_core_api.create_persistent_volume(yaml.safe_load(pv_yaml))
k8s_core_api.create_namespaced_persistent_volume_claim(my_namespace, yaml.safe_load(pvc_yaml))

not in k8s


TypeError: argument of type 'NoneType' is not iterable

## Use Kubeflow fairing to build the docker image and launch a TFJob for training

* Use kubeflow fairing to build a docker image that includes all your dependencies
* Launch a TFJob in the on premise cluster to taining model.

Firstly set some custom training parameters for TFJob.

In [4]:
num_chief = 1 #number of Chief in TFJob 
num_ps = 1  #number of PS in TFJob 
num_workers = 2  #number of Worker in TFJob 
model_dir = "/mnt"
export_path = "/mnt/export" 
train_steps = "1000"
batch_size = "100"
learning_rate = "0.01"

Use Kubeflow Fairing to build a docker image and push to docker registry, and then launch a TFJob in the on-prem cluster for distributed training model.

In [5]:
import uuid
from kubeflow import fairing   
from kubeflow.fairing.kubernetes.utils import mounting_pvc

tfjob_name = f'mnist-training-{uuid.uuid4().hex[:4]}'

output_map =  {
    "Dockerfile": "Dockerfile",
    "mnist.py": "mnist.py"
}

command=["python",
         "/opt/mnist.py",
         "--tf-model-dir=" + model_dir,
         "--tf-export-dir=" + export_path,
         "--tf-train-steps=" + train_steps,
         "--tf-batch-size=" + batch_size,
         "--tf-learning-rate=" + learning_rate]

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)
fairing.config.set_builder(name='docker', registry=DOCKER_REGISTRY, base_image="",
                           image_name="mnist", dockerfile_path="Dockerfile")
fairing.config.set_deployer(name='tfjob', namespace=my_namespace, stream_log=False, job_name=tfjob_name,
                            chief_count=num_chief, worker_count=num_workers, ps_count=num_ps, 
                            pod_spec_mutators=[mounting_pvc(pvc_name=pvc_name, pvc_mount_path=model_dir)])
fairing.config.run()

[W 200902 09:23:25 utils:51] The function mounting_pvc has been deprecated,                     please use `volume_mounts`
[I 200902 09:23:25 config:134] Using preprocessor: <kubeflow.fairing.preprocessors.base.BasePreProcessor object at 0x7f1f713d6f60>
[I 200902 09:23:25 config:136] Using builder: <kubeflow.fairing.builders.docker.docker.DockerBuilder object at 0x7f1f713d6eb8>
[I 200902 09:23:25 config:138] Using deployer: <kubeflow.fairing.deployers.tfjob.tfjob.TfJob object at 0x7f1f476cca20>
[I 200902 09:23:25 docker:32] Building image using docker


DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

### Get the Created TFJobs

In [6]:
from kubeflow.tfjob import TFJobClient
tfjob_client = TFJobClient()

tfjob_client.get(tfjob_name, namespace=my_namespace)

RuntimeError: Exception when calling CustomObjectsApi->get_namespaced_custom_object:            (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Wed, 02 Sep 2020 09:25:37 GMT', 'Content-Length': '246'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"tfjobs.kubeflow.org \"mnist-training-dcde\" not found","reason":"NotFound","details":{"name":"mnist-training-dcde","group":"kubeflow.org","kind":"tfjobs"},"code":404}




### Wait For the Training Job to finish

In [None]:
tfjob_client.wait_for_job(tfjob_name, namespace=my_namespace, watch=True)

### Check if the TFJob succeeded.

In [None]:
tfjob_client.is_job_succeeded(tfjob_name, namespace=my_namespace)

### Get the Training Logs

In [None]:
tfjob_client.get_logs(tfjob_name, namespace=my_namespace)

## Deploy Service using KFServing

In [None]:
from kubeflow.fairing.deployers.kfserving.kfserving import KFServing
isvc_name = f'mnist-service-{uuid.uuid4().hex[:4]}'
isvc = KFServing('tensorflow', namespace=my_namespace, isvc_name=isvc_name,
                 default_storage_uri='pvc://' + pvc_name + '/export')
isvc.deploy(isvc.generate_isvc())

### Get the InferenceService

In [None]:
from kfserving import KFServingClient
kfserving_client = KFServingClient()
kfserving_client.get(namespace=my_namespace)

### Get the InferenceService and Service Endpoint

In [None]:
mnist_isvc = kfserving_client.get(isvc_name, namespace=my_namespace)
mnist_isvc_name = mnist_isvc['metadata']['name']
mnist_isvc_endpoint = mnist_isvc['status'].get('url', '')
print("MNIST Service Endpoint: " + mnist_isvc_endpoint)

### Run a prediction to the InferenceService

In [None]:
ISTIO_CLUSTER_IP=!kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.clusterIP}'
CLUSTER_IP=ISTIO_CLUSTER_IP[0]
MODEL_HOST=f"Host: {mnist_isvc_name}.{my_namespace}.example.com"
!curl -v -H "{MODEL_HOST}" http://{CLUSTER_IP}/v1/models/{mnist_isvc_name}:predict -d @./input.json

## Clean Up

Delete the TFJob

In [None]:
tfjob_client.delete(tfjob_name, namespace=my_namespace)

Delete the InferenceService.

In [None]:
kfserving_client.delete(isvc_name, namespace=my_namespace)