# End to end experiment: EfficientNet Image Classification

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Implement transfer learning with EfficientNet in Keras
* Submit and run the pipeline
* Add a step in the pipeline

This example pipeline trains a [EfficientNet](https://github.com/SemanticMD/efficientnet_keras_transfer_learning) model on medical X-ray image data, learning to classify new object categories. It then exports the trained model and deploys the exported model to [Seldon Core](https://github.com/SeldonIO/seldon-core). 
The final step in the pipeline launches a web app which interacts with the EfficientNet model via REST API in order to get model predictions.

## Enviroinment Setup

Before any experiment can be conducted. We need to setup and initialize an environment: ensure all Python modules has been setup and configured, as well as python modules

### Imports
Setting up python modules

In [102]:
%reload_ext autoreload
%autoreload 2

%reload_ext nbextensions
%load_nbvars

import kfp.dsl as dsl
import kfp.gcp as gcp
import pandas as pd
from ipython_secrets import get_secret
from kfp.compiler import Compiler
from kfp.components import load_component_from_file
from os import environ
import boto3, kfp

from nbextensions.pv import use_pvc
from nbextensions.kubernetes import dockerjson_pv
from nbextensions.aws import upload_to_s3

import nbextensions.utils as utils
from datetime import datetime
from urllib.parse import urlparse

import warnings
warnings.filterwarnings('ignore')

Loading notebook variables from configmap: `kubeflow/efficientnet1-nb-vars`

All notebook variables has been already loaded

In [115]:
!pip install pixiedust

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [116]:
import pixiedust

Pixiedust database opened successfully


### Define global variables

Initialize global namespace variables. It is a good practice to place all global namespace variables in one cell. So, the notebook could be configured all-at-once. 

To enhance readability we would advice to capitalize such variables.

NOTE: Use TAG variable to force a rebuild of container images, as a workaround for caching issues.

In [332]:
USER = environ.get('NB_USER', 'John Doe')
# TAG = 'latest'
TAG = 'v24'

BUILD_CONTEXT = f"{TAG}/buildcontext"
TRAINING_IMAGE = f"{DOCKER_REGISTRY}/training:{TAG}"
SERVING_IMAGE = f"{DOCKER_REGISTRY}/seldon:{TAG}"
FLASK_APP_IMAGE = f"{DOCKER_REGISTRY}/flask:{TAG}"
TRAINING_ROOT = f"{MOUNT_PATH}/{TAG}/training"

s3 = boto3.session.Session().client('s3', endpoint_url=BUCKET_ENDPOINT)

client = kfp.Client()
try:
    exp = client.get_experiment(experiment_name=APPLICATION_NAME)
except:
    exp = client.create_experiment(APPLICATION_NAME)

### Define variables for experiment
In the beginning of the scrip we define all necessary variables. We have a single cell to define all experiment configuration in one place.

In [333]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import models, layers, optimizers
from tensorflow.keras.utils import get_custom_objects
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import TensorBoard

#from minio import Minio
#from minio.error import ResponseError

# Options: EfficientNetB0, EfficientNetB1, EfficientNetB2, EfficientNetB3
# Higher the number, the more complex the model is.
from efficientnet import EfficientNetB0 as Net
from efficientnet import center_crop_and_resize, preprocess_input
from efficientnet.layers import Swish, DropConnect
from efficientnet.model import ConvKernalInitializer

In [334]:
## Globals
REMOTE_MINIO_SERVER = get_secret('REMOTE_MINIO_SERVER')
ACCESS_KEY = get_secret('ACCESS_KEY')
SECRET_KEY = get_secret('SECRET_KEY')

BASEDIR = f"{ARTIFACTS_ROOT}/santosh-test"
DATASET_DIR = os.path.join(BASEDIR, 'datasets')

FILETYPES = ('*.jpg', '*.jpeg', '*.png')
MODEL_VERSION='1'
MODEL_FNAME='pneumothorax_'  + datetime.now().strftime("%m_%d_%S") + '.h5'
DATASET_NAME='normal_pneumothorax'
LABELS='normal,pneumothorax'

In [335]:
#%%pixie_debugger
aws_access_key_id=get_secret('aws_access_key_id')
aws_secret_access_key=get_secret('aws_secret_access_key')
print (f"aws key_id:{aws_access_key_id} secret: {aws_secret_access_key}")
ext_access_key_id=get_secret('ext_access_key_id')
ext_secret_access_key=get_secret('ext_secret_access_key')
print (f"ext key_id:{ext_access_key_id} secret: {ext_secret_access_key}")



aws key_id:admin secret: Minio12345
ext key_id:admin secret: deeplearning


In [336]:
ARTIFACTS_ROOT = f"{MOUNT_PATH}"
TEST_IMAGE_FILE = f"{ARTIFACTS_ROOT}/panda.jpg"
MODEL_FILE = f"{MOUNT_PATH}/models/{MODEL_VERSION}/{MODEL_FNAME}"
MODEL_DIR = f"{MOUNT_PATH}/models/"

In [340]:
print (f"Local dataset: {DATASET_DIR}")
print (f"Remote dataset: REMOTE_MINIO_SERVER {REMOTE_MINIO_SERVER}")
print (f"Test image: {TEST_IMAGE_FILE}")
print (f"Model file: {MODEL_FILE}")

Local dataset: /mnt/s3/santosh-test/datasets
Remote dataset: REMOTE_MINIO_SERVER 206.189.86.150:32782
Test image: /mnt/s3/panda.jpg
Model file: /mnt/s3/models/1/pneumothorax_03_04_08.h5


### Define build docker image pipeline

Define build pipeline. Yes, we arguably using KFP to build images  that will be de-facto used by final pipeline.

We use [Kaniko](https://github.com/GoogleContainerTools/kaniko) and Kubernetes to handle build operations. Build status can be tracked via KFP pipeline dashboard

In fact build image job can be even combined with primary pipeline as physically it will be different Kubernetes pods. However for sake of general purpose efficiency we schedule build process via separate pipeline step

In [341]:
kaniko_op = load_component_from_file('components/kaniko/deploy.yaml')

@dsl.pipeline(
  name='Pipeline images',
  description='Build images that will be used by the pipeline'
)
def build_image(
        image, 
        context=None, 
        dockerfile: dsl.PipelineParam=dsl.PipelineParam(name='dockerfile', value='Dockerfile')):
    dockerjson = dockerjson_pv(pull_secret=DOCKER_REGISTRY_PULL_SECRET)
    kaniko_op(
        image=image,
        dockerfile=dockerfile,
        context=context
    ).add_pvolumes({
        '/mnt/s3': dsl.PipelineVolume(pvc=BUCKET_PVC),
        '/kaniko/.docker': dsl.PipelineVolume(volume=dockerjson),
    })
        
Compiler().compile(build_image, 'argo-kaniko.yaml')
utils.patch_pvolumes('argo-kaniko.yaml')

Compiler transforms Python DSL into an [Argo Workflow](https://argoproj.github.io/docs/argo/readme.html). And stores generated artifacts in [`.kaniko.tar.gz`](.kaniko.tar.gz). So it could be executed multiple times. Perhaps with different parameters

## Distributed Training
Training is an integral part of our experiment. Distributed training means that it will be executed outside of a Jupyter Notebook and utilize maximum capacity of the current cluster. To achieve this we need to perform following actions:
* Build a docker image for training
* Define a training pipeline
* Run the experiment

### Building a training image
Once pipeline has been defined we can reuse it multiple times by supplying different input parameters.

Next section will upload all files to s3, to share access with the pipeline. Files that should be ignored can be customized in [kanikoignore.txt](./kanikoignore.txt). To understand upload scenario you can review and modify: [aws.py](./extensions/kaniko/aws.py)

In [342]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3
)

run = client.run_pipeline(
    exp.id, f'Build image: training:{TAG}', 'argo-kaniko.yaml', 
    params={
        'image': TRAINING_IMAGE,
        'context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/training"
    })

Build process may take more than 12 minutes, becaause Docker images used for machine learning can be very large. If your imeages are very large, you may need to increase the `timeout` parameter

In [300]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

Waiting for run: 1829b069-5dc9-11ea-9586-122862a16a39...
Finished with: Succeeded
CPU times: user 5.73 ms, sys: 891 µs, total: 6.62 ms
Wall time: 5.02 s


### Define Pipeline
We have extracted code for training pipeline into a [component](components/training). Python code that defines `training_op` as well as a `http_download_op` can be found [here](components/training/component.py)

Below we will define a pipeline that will run the training pipeline as an experimnet. This pipeline will do the following. Every training operation (except download) will be encapsulated by the python script. You can change the scripts at your will however, you will need to rebuild a training image.

* Download dataset from http 
* Split data into sample and test. It can also put a rownum limit into a dataset to increase feedback
* Preprocess data for machine learning (clean, tokenize and transform text into vector)
* Apply sequence to sequence training with Keras. By the completion trained model will be uplooaded into s3 bucket 

In [349]:
from components.training import (http_download_op, training_op)

@dsl.pipeline(
  name='Training',
  description="""
  Download dataset, 
  Split data set for training and validation, 
  Clean and preprocess data, 
  Train the model
  """
)
def training_pipeline(
    datasetDir: dsl.PipelineParam, 
    datasetName: dsl.PipelineParam,
    labels: dsl.PipelineParam,
    remoteMinioServer: dsl.PipelineParam,
    accessKey: dsl.PipelineParam,
    secretKey: dsl.PipelineParam,
    batchSize: dsl.PipelineParam,
    width: dsl.PipelineParam,    
    height: dsl.PipelineParam,
    epochs: dsl.PipelineParam,
    dropoutRate: dsl.PipelineParam,
    learningRate: dsl.PipelineParam,
    trainInput: dsl.PipelineParam,
    modelVersion: dsl.PipelineParam,
    modelDir: dsl.PipelineParam,
    modelFname: dsl.PipelineParam
):  

    download = training_op(
        script='download.py',
        arguments=[
            '--dataset_dir', datasetDir,
            '--dataset_name', datasetName,
            '--labels', labels,
            '--remote_minio_server', remoteMinioServer,
            '--access_key', accessKey,
            '--secret_key', secretKey,
        ]
    ).add_pvolumes({
        '/mnt/s3': dsl.PipelineVolume(pvc=BUCKET_PVC)
    })

    # Training
    training = training_op(
        script='train.py',
        arguments=[
            '--batch_size', batchSize,
            '--width', width,
            '--height', height,
            '--epochs', epochs,
            '--dropout_rate', dropoutRate,
            '--learning_rate', learningRate,
            '--dataset_name', datasetName,
            '--train_input', trainInput,
            '--model_version', modelVersion,
            '--model_dir', modelDir,
            '--model_fname', modelFname,
            '--labels', labels,
            '--tempfile', "True",
        ],
        file_outputs={'train': '/tmp/seq2seq.log'},
    ).add_pvolumes({
        '/mnt/s3': dsl.PipelineVolume(pvc=BUCKET_PVC)
    }).after(download)

Compiler().compile(training_pipeline, 'argo-distr-training.yaml')
utils.patch_pvolumes('argo-distr-training.yaml')

### Run the pipeline

Code below will run a pipeline and inject some pipeline parameters. Here we provide two versions of data sets
* `SAMPLE_DATA_SET` - Data set that has just over 2 megabytes. Not enough for sufficient training. However ideal for development, because of faster feedback.
* `FULL_DATA_SET` - Precreated data set with all github issues. 3 gigabytes. Good enough for sufficient model

Depending on your needs you can choose one or another data set and pass it as a pipeline parameter `data-set`

In [350]:

run = client.run_pipeline(exp.id, f'Training model {TAG}: {datetime.now():%m%d-%H%M}', 'argo-distr-training.yaml',
                          params={
                              'datasetDir': DATASET_DIR,
                              'datasetName': DATASET_NAME,
                              'labels': LABELS,
                              'remoteMinioServer': REMOTE_MINIO_SERVER,
                              'accessKey': ACCESS_KEY,
                              'secretKey': SECRET_KEY,
                              'batchSize': 32,
                              'width': 150,
                              'height': 150,
                              'epochs': 1,
                              'dropoutRate': 0.2,
                              'learningRate': 0.00002,
                              'datasetName': DATASET_NAME,
                              'trainInput': os.path.join(DATASET_DIR, DATASET_NAME),
                              'modelVersion': MODEL_VERSION,
                              'modelDir': MODEL_DIR,
                              'modelFname': MODEL_FNAME,
                          })

In [314]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

Waiting for run: e0411abb-5dcc-11ea-9586-122862a16a39...
Finished with: Failed
CPU times: user 29.2 ms, sys: 8.46 ms, total: 37.7 ms
Wall time: 35.1 s


In [356]:
NB_MODEL_FILE = f"/home/jovyan/data/models/{MODEL_VERSION}/{MODEL_FNAME}"
!ls $NB_MODEL_FILE

/home/jovyan/data/models/1/pneumothorax_03_04_08.h5


In [363]:
from efficientnet import load_model
get_custom_objects().update({
            'ConvKernalInitializer': ConvKernalInitializer,
            'Swish': Swish,
            'DropConnect':DropConnect
})

mymodel = load_model(NB_MODEL_FILE)

TypeError: __init__() got an unexpected keyword argument 'ragged'

In [360]:
# eval model
get_custom_objects().update({
            'ConvKernalInitializer': ConvKernalInitializer,
            'Swish': Swish,
            'DropConnect':DropConnect
})

#model = load_model(NB_MODEL_FILE)
# Do any preprocessing
#prediction = model.predict(data=X)

## Serving with Seldon
Prepping a container for serving. 

Here we define all variables that will be needed for our dockerfile tempalte. 

- `MODEL_WRAPPER`: is a name of a python class that adapts keras model for serving
- `MODEL_NAME`: used in seldon deployment
- `MODEL_VERSION`: one model can be served multiple times with different versions simulteniously
- `SELDON_DEPLOYMENT`: name of kubernetes resource
- `SELDON_OAUTH_KEY`: part of shared secret between `SeldonDeployment` and a client application
- `SELDON_OAUTH_SECRET`: part of shared secret between `SeldonDeployment` and a client application
- `REPLICAS`: number of replicas for `SeldonDeployment` pod

In [None]:
import re

MODEL_WRAPPER = 'EfficientNetModel'
MODEL_NAME = re.sub(r'\W+', '-', MODEL_WRAPPER).lower()
MODEL_VERSION = TAG
SELDON_DEPLOYMENT = f"{MODEL_NAME}-{MODEL_VERSION}"
# here we hash a information about model, so it would be predictable
SELDON_OAUTH_KEY = utils.sha1(MODEL_NAME, MODEL_VERSION, NAMESPACE)
# for secure secret we will use hash of user defined shared secret salted with OAUTH_KEY
SELDON_OAUTH_SECRET = utils.sha1(SELDON_OAUTH_KEY, get_secret('USER_SECRET_FOR_MODEL'))
SELDON_APISERVER_ADDR=f"seldon-seldon-apiserver.{NAMESPACE}:8080"

SELDON_DEPLOYMENT_REPLICAS = 1

### Building a serving image

`SeldonDeployment` needs a docker image that contains a model wrapper written in (but not limited) Python

This step will build a container and serve it

In [361]:
%%template components/serving/Dockerfile
FROM seldonio/seldon-core-s2i-python3

FROM {{TRAINING_IMAGE}}
RUN pip3 install --no-cache-dir -U 'seldon-core'

COPY --from=0 /microservice /microservice
COPY src/serving.py /microservice/{{MODEL_WRAPPER}}.py

WORKDIR /microservice
ENTRYPOINT ["python","-u","microservice.py"]
CMD ["{{MODEL_WRAPPER}}", "REST"]

To be able to serve trained model we build an image with our serving microservice. To achieve this we reuse our kaniko pipeline defined above

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3,
)

run = client.run_pipeline(exp.id, f'Build image: serving:{TAG}', 'argo-kaniko.yaml', 
                          params={
                              'image': SERVING_IMAGE,
                              'context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/serving"
                          })

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Serve model

Then we render our `SeldonDeployment` template and deploy it with `kubectl`, similar as we have done before with `pvc` definition. Here we define reference to the model that will be used for serving

In [198]:
!ls -lrt /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/*.png

-rw-r--r--. 1 jovyan users 181732 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR100_IM-0002-1001.png
-rw-r--r--. 1 jovyan users 153936 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR100_IM-0002-2001.png
-rw-r--r--. 1 jovyan users 202821 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR106_IM-0042-1001.png
-rw-r--r--. 1 jovyan users 154013 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR106_IM-0042-2001.png
-rw-r--r--. 1 jovyan users 141700 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR107_IM-0049-1001.png
-rw-r--r--. 1 jovyan users 119313 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR107_IM-0049-2001.png
-rw-r--r--. 1 jovyan users 195200 Feb 24 21:06 /home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR10_IM-0002-1001.png
-rw-r--r--. 1 jovyan users 175008 F

In [199]:
print(glob.glob("/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/*.png"))

['/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR100_IM-0002-1001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR100_IM-0002-2001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR106_IM-0042-1001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR106_IM-0042-2001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR107_IM-0049-1001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR107_IM-0049-2001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR10_IM-0002-1001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR10_IM-0002-2001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR110_IM-0067-1001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/normal/CXR110_IM-0067-2001.png', '/home/jovyan/data/santosh-test/datasets/normal_pneumothorax/

In [None]:
%templatefile components/serving/templates/seldon.yaml -o seldon.yaml
!kubectl apply -f seldon.yaml --wait
!kubectl get -f seldon.yaml -o jsonpath='{.status.state}'

### Validate

It can take few minutes while a seldon applicaiton will be deployed. Once it will be deployed. Then we can send a test prediction

Test model serving by accessing seldon api server. Because Seldon API server provides an oauth, we need to receive a temporrary bearer token. We can receive this token by providing oauth key and secret that has been used in our `SeldonDeployment`

In [None]:
from IPython.display import Code
import nbextensions.seldon as seldon

test_payload = {
    "data":{"ndarray": [["try to stop flask from using multiple threads"]]},
}
                         
t = seldon.get_token(
    server=SELDON_APISERVER_ADDR,
    oauth_key=SELDON_OAUTH_KEY,
    oauth_secret=SELDON_OAUTH_SECRET,
)
result = seldon.prediction(
    server=SELDON_APISERVER_ADDR,
    payload=test_payload,
    token=t,
)
if result.get('status') == 'FAILURE':
    print("Error connecting to seldon core.", 
          "This may happen when Seldon has not been up and running yet.",
          "Try again later")
    display(Code(f"{result.get('reason')}: {result.get('info')}"))
else:
    display(
        pd.DataFrame.from_dict({
            'test': test_payload['data']['ndarray'][0],
            'prediction': result['data']['ndarray'][0],
    }))


## Deploy a client application

This section will be focused on application deployment routines.
- `FLASK_APP`: name of the kubernetes deployment associated with the applicaiton
- `FLASK_REPLICAS`: number of replicas for application deployment pod
- `GITHUB_TOKEN`: Github Token to access Github API. This will help application to fetch a random github issue

In [None]:
FLASK_APP=APPLICATION_NAME
FLASK_REPLICAS = 1
GITHUB_TOKEN=get_secret('github_token')

### Building an application container

User application has been implemented inside [app.py](components/flaskapp/src/app.py). We bake this applicaiton inside of docker container and deploy it further

In [None]:
%%template components/flaskapp/Dockerfile -v
FROM {{TRAINING_IMAGE}}
RUN pip3 install --no-cache-dir -U 'flask>=0.12.3'
COPY src/ /app
WORKDIR /app
ENTRYPOINT ["python","-u","app.py"]

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3,
)

run = client.run_pipeline(
    exp.id, f'Build image: application:{TAG}', 'argo-kaniko.yaml', 
    params={
      'image': FLASK_APP_IMAGE,
      'context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/flaskapp"
    }
)

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Deploy a web application

Client web application is a simple Python [flask](http://flask.pocoo.org) application. Deployment manifest can be defined via kubernetes deployment template file [link](components/flaskapp/templates/application.yaml). We render this template with current notebook global variables and then use `kubectl` to deploy.

For web access application deployment will use an Ambassador http router, which is part of Kubeflow stack

In [None]:
%templatefile components/flaskapp/templates/application.yaml -o application.yaml
!kubectl apply -f application.yaml --wait

from IPython.display import Markdown, HTML
display(HTML(f'Application can be accessible <a href="/{FLASK_APP}/" target="_blank">here</a>'))

## Tear down

Uppon completion, let's tear everything down

In [None]:
# !kubectl delete -f seldon.yaml
!kubectl delete -f application.yaml