In [None]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Train tensorflow or keras model on GCP or Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train the model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud AI Platform training. This notebook demonstrate how to:
 
* Train an Keras model in a local notebook,
* Use Kubeflow Fairing to train an Keras model remotely on Kubeflow cluster,
* Use Kubeflow Fairing to train an Keras model remotely on AI Platform training,
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and Call the deployed endpoint for predictions.

**You need Python 3.6 to use Kubeflow Fairing.**

## Setups

* Pre-conditions
    - Deployed a kubeflow cluster through https://deploy.kubeflow.cloud/
    - Have the following environment variable ready: 
        - PROJECT_ID # project host the kubeflow cluster or for running AI platform training
        - DEPLOYMENT_NAME # kubeflow deployment name, the same the cluster name after delpoyed
        - GCP_BUCKET # google cloud storage bucket

* Create service account
```bash
export SA_NAME = [service account name]
gcloud iam service-accounts create ${SA_NAME}
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
    --member serviceAccount:${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
    --role 'roles/editor'
gcloud iam service-accounts keys create ~/key.json \
    --iam-account ${SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com
```

* Authorize for Source Repository
```bash
gcloud auth configure-docker
```

* Update local kubeconfig (for submiting job to kubeflow cluster)
```bash
export CLUSTER_NAME=${DEPLOYMENT_NAME} # this is the deployment name or the kubenete cluster name
export ZONE=us-central1-c
gcloud container clusters get-credentials ${CLUSTER_NAME} --region ${ZONE}
```

* Set the environmental variable: GOOGLE_APPLICATION_CREDENTIALS
```bash
export GOOGLE_APPLICATION_CREDENTIALS = ....
```
```python
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=...
```

* Install the lastest version of fairing
```python
pip install git+https://github.com/kubeflow/fairing@master
```

**Please not that the above configuration is required for notebook service running outside Kubeflow environment. And the examples demonstrated in the notebook is fully tested on notebook service outside Kubeflow cluster also.**

**The environemt variables, e.g. service account, projects and etc, should have been pre-configured while setting up the cluster.**

In [1]:
import os
import logging
import tensorflow as tf
import fairing
import numpy as np
from datetime import datetime
from fairing.cloud import gcp

In [3]:
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
# For local notebook, GCP_PROJECT should be set explicitly
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
GCP_Bucket = os.environ['GCP_BUCKET'] # e.g., 'gs://kubeflow-demo-g/'

# This is for local notebook instead of that in kubeflow cluster
# os.environ['GOOGLE_APPLICATION_CREDENTIALS']=

## Define the model logic

In [4]:
def gcs_copy(src_path, dst_path):
    import subprocess
    print(subprocess.run(['gsutil', 'cp', src_path, dst_path], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
    
def gcs_download(src_path, file_name):
    import subprocess
    print(subprocess.run(['gsutil', 'cp', src_path, file_name], stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))

In [9]:
class TensorflowModel(object):
    
    def __init__(self):
        self.model_file = "mnist_model.h5"
        self.model = None    
    
    def build(self):
        self.model = tf.keras.models.Sequential([
          tf.keras.layers.Flatten(input_shape=(28, 28)),
          tf.keras.layers.Dense(512, activation=tf.nn.relu),
          tf.keras.layers.Dropout(0.2),
          tf.keras.layers.Dense(10, activation=tf.nn.softmax)
        ])
        self.model.compile(optimizer='adam',
                      loss='sparse_categorical_crossentropy',
                      metrics=['accuracy'])
        print(self.model.summary())
    
    def save_model(self):
        self.model.save(self.model_file)
        gcs_copy(self.model_file, GCP_Bucket + self.model_file)
    
    def train(self):
        self.build()
        
        mnist = tf.keras.datasets.mnist
        (x_train, y_train),(x_test, y_test) = mnist.load_data()
        x_train, x_test = x_train / 255.0, x_test / 255.0
        
        callbacks = [
          # Interrupt training if `val_loss` stops improving for over 2 epochs
          tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),
          # Write TensorBoard logs to `./logs` directory
          tf.keras.callbacks.TensorBoard(log_dir=GCP_Bucket + 'logs/' 
                                         + datetime.now().date().__str__())
        ]
        self.model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,
                  validation_data=(x_test, y_test))
        self.save_model()
        
    def predict(self, X):
        if not self.model:
            self.model = tf.keras.models.load_model(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)

## Train an Keras model in a notebook

In [10]:
TensorflowModel().train()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_2 (Flatten)          (None, 784)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 512)               401920    
_________________________________________________________________
dropout_2 (Dropout)          (None, 512)               0         
_________________________________________________________________
dense_5 (Dense)              (None, 10)                5130      
Total params: 407,050
Trainable params: 407,050
Non-trainable params: 0
_________________________________________________________________
None
Train on 60000 samples, validate on 10000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5



## Spicify a image registry that will hold the image built by fairing

In [None]:
# In this demo, I use gsutil, therefore i compile a special image to install GoogleCloudSDK as based image
base_image = 'gcr.io/{}/fairing-predict-example:latest'.format(GCP_PROJECT)
!docker build --build-arg PY_VERSION=3.6.4 . -t {base_image}
!docker push {base_image}

In [11]:
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
BASE_IMAGE = 'gcr.io/{}/fairing-predict-example:latest'.format(GCP_PROJECT)
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job-tf'.format(GCP_PROJECT)

## Deploy the training job to kubeflow cluster

In [19]:
from fairing import TrainJob
from fairing.backends import GKEBackend

train_job = TrainJob(TensorflowModel, BASE_IMAGE, input_files=["requirements.txt"],
                     docker_registry=DOCKER_REGISTRY, backend=GKEBackend())
train_job.submit()

Using preprocessor: <class 'fairing.preprocessors.function.FunctionPreProcessor'>
Using docker registry: gcr.io/gojek-kubeflow/fairing-job-tf
Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_2n45lxud
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Context: /tmp/fairing_context_2n45lxud, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py at /app/fairing/__init__.py
Context: /tmp/fairing_context_2n45lxud, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/runtime_config.py

From /usr/local/lib/python3.6/site-packages/tensorflow/python/ops/resource_variable_ops.py:435: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
From /usr/local/lib/python3.6/site-packages/tensorflow/python/keras/layers/core.py:143: calling dropout (from tensorflow.python.ops.nn_ops) with keep_prob is deprecated and will be removed in a future version.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
2019-05-10 05:43:28.440970: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-05-10 05:43:28.448538: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-05-10 05:43:2

   32/60000 [..............................] - ETA: 20s - loss: 0.1088 - acc: 0.9688  256/60000 [..............................] - ETA: 14s - loss: 0.0768 - acc: 0.9766  480/60000 [..............................] - ETA: 14s - loss: 0.0793 - acc: 0.9771  704/60000 [..............................] - ETA: 14s - loss: 0.0809 - acc: 0.9759  928/60000 [..............................] - ETA: 14s - loss: 0.0716 - acc: 0.9795 1152/60000 [..............................] - ETA: 14s - loss: 0.0649 - acc: 0.9826

   32/60000 [..............................] - ETA: 19s - loss: 0.0316 - acc: 0.9688  288/60000 [..............................] - ETA: 13s - loss: 0.0620 - acc: 0.9757  544/60000 [..............................] - ETA: 13s - loss: 0.0514 - acc: 0.9798  800/60000 [..............................] - ETA: 13s - loss: 0.0532 - acc: 0.9825 1056/60000 [..............................] - ETA: 12s - loss: 0.0506 - acc: 0.9839 1312/60000 [..............................] - ETA: 12s - loss: 0.0454 - acc: 0.9870

   32/60000 [..............................] - ETA: 19s - loss: 0.0076 - acc: 1.0000  288/60000 [..............................] - ETA: 13s - loss: 0.0248 - acc: 0.9965  544/60000 [..............................] - ETA: 13s - loss: 0.0290 - acc: 0.9926  800/60000 [..............................] - ETA: 12s - loss: 0.0329 - acc: 0.9900 1024/60000 [..............................] - ETA: 12s - loss: 0.0389 - acc: 0.9883 1248/60000 [..............................] - ETA: 13s - loss: 0.0381 - acc: 0.9888

Cleaning up job fairing-job-2xzs8...


## Deploy distributed training job to kubeflow cluster

In [20]:
fairing.config.set_builder(name='docker', registry=DOCKER_REGISTRY, 
                           base_image=BASE_IMAGE, push=True)
fairing.config.set_deployer(name='tfjob', worker_count=1, ps_count=1)
run_fn = fairing.config.fn(TensorflowModel)

In [21]:
run_fn()

Using preprocessor: <fairing.preprocessors.function.FunctionPreProcessor object at 0x13ec8b8d0>
Using builder: <fairing.builders.docker.docker.DockerBuilder object at 0x13db86be0>
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_umpxhanz
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Context: /tmp/fairing_context_umpxhanz, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py at /app/fairing/__init__.py
Context: /tmp/fairing_context_umpxhanz, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/runtime_config.py at /app/fairing/runtime_config.py
Context: /tmp/fairing_co

From /usr/local/lib/python3.6/site-packages/tensorflow/python/ops/resource_variable_ops.py:435: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
From /usr/local/lib/python3.6/site-packages/tensorflow/python/keras/layers/core.py:143: calling dropout (from tensorflow.python.ops.nn_ops) with keep_prob is deprecated and will be removed in a future version.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
2019-05-10 05:47:42.392693: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-05-10 05:47:42.399617: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-05-10 05:47:4

Epoch 3/5


   32/60000 [..............................] - ETA: 25s - loss: 0.0568 - acc: 0.9688  224/60000 [..............................] - ETA: 18s - loss: 0.0452 - acc: 0.9777  416/60000 [..............................] - ETA: 17s - loss: 0.0442 - acc: 0.9808  608/60000 [..............................] - ETA: 17s - loss: 0.0615 - acc: 0.9770  800/60000 [..............................] - ETA: 16s - loss: 0.0576 - acc: 0.9787  992/60000 [..............................] - ETA: 16s - loss: 0.0571 - acc: 0.9808

   32/60000 [..............................] - ETA: 28s - loss: 0.0101 - acc: 1.0000  192/60000 [..............................] - ETA: 22s - loss: 0.0162 - acc: 1.0000  352/60000 [..............................] - ETA: 21s - loss: 0.0239 - acc: 0.9972  512/60000 [..............................] - ETA: 20s - loss: 0.0273 - acc: 0.9941  672/60000 [..............................] - ETA: 20s - loss: 0.0308 - acc: 0.9926  832/60000 [..............................] - ETA: 20s - loss: 0.0309 - acc: 0.9928

   32/60000 [..............................] - ETA: 29s - loss: 0.0380 - acc: 1.0000  224/60000 [..............................] - ETA: 17s - loss: 0.0555 - acc: 0.9821  416/60000 [..............................] - ETA: 16s - loss: 0.0464 - acc: 0.9832  608/60000 [..............................] - ETA: 16s - loss: 0.0352 - acc: 0.9885  832/60000 [..............................] - ETA: 16s - loss: 0.0359 - acc: 0.9892 1024/60000 [..............................] - ETA: 16s - loss: 0.0424 - acc: 0.9873

rpc error: code = Unknown desc = Error: No such container: 902d4d41f1f6986dbd1383869a361f63c4bd57ec3a4b4f45f6d22e86bb841162


## Deploy the training job as CMLE training job

Doesn’t support CMLE distributed training

In [22]:
from fairing import TrainJob
from fairing.backends import GCPManagedBackend
train_job = TrainJob(TensorflowModel, BASE_IMAGE, input_files=["requirements.txt"],
                     docker_registry=DOCKER_REGISTRY, backend=GCPManagedBackend())
train_job.submit()

Using preprocessor: <class 'fairing.preprocessors.function.FunctionPreProcessor'>
Using docker registry: gcr.io/gojek-kubeflow/fairing-job-tf
Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_ql6o52sy
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Context: /tmp/fairing_context_ql6o52sy, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py at /app/fairing/__init__.py
Context: /tmp/fairing_context_ql6o52sy, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/runtime_config.py

Creating training job with the following options: {'jobId': 'fairing_job_64b00bf8', 'trainingInput': {'scaleTier': 'BASIC', 'masterConfig': {'imageUri': 'gcr.io/gojek-kubeflow/fairing-job-tf/fairing-job:258D8D01'}, 'region': 'us-central1'}}
Job submitted successfully.
Access job logs at the following URL:
https://console.cloud.google.com/mlengine/jobs/fairing_job_64b00bf8?project=gojek-kubeflow


## Inspect training process with tensorboard

In [22]:
# ! tensorboard --logdir=gs://kubeflow-demo-g/logs --host=localhost --port=8777

## Deploy the trained model to Kubeflow for predictions

In [13]:
from fairing import PredictionEndpoint
from fairing.backends import KubeflowGKEBackend
# The trained_ames_model.joblib is exported during the above local training
endpoint = PredictionEndpoint(TensorflowModel, BASE_IMAGE, input_files=['mnist_model.h5', "requirements.txt"],
                              docker_registry=DOCKER_REGISTRY, backend=KubeflowGKEBackend())
endpoint.create()

Using preprocessor: <class 'fairing.preprocessors.function.FunctionPreProcessor'>
Using docker registry: gcr.io/gojek-kubeflow/fairing-job-tf
Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_ftqfzvuc
/Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Context: /tmp/fairing_context_ftqfzvuc, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/__init__.py at /app/fairing/__init__.py
Context: /tmp/fairing_context_ftqfzvuc, Adding /Users/luoshixin/LocalSim/virtualPython36/lib/python3.6/site-packages/fairing/runtime_config.py

Waiting for prediction endpoint to come up...


Cluster endpoint: http://35.184.251.118:5000/predict
Prediction endpoint: http://35.184.251.118:5000/predict


In [14]:
endpoint.delete()

Deleted service: kubeflow/fairing-service-vrhnq
Deleted deployment: kubeflow/fairing-deployer-fd2bz
