## Define Kubeflow Pipelines to train a model

## Create entry point using fairing
Kubeflow [Fairing](https://www.kubeflow.org/docs/fairing/) is a Python package that makes training and deploying machine learning models on Kubeflow easier.

Here, we use the preprocessor in Kubeflow Fairing to convert a notebook to be a Python script and create an entry point for that script. After preprocessing the notebook, we can call the command in the command line like the following to run
```Python
$ python repo_mlp.py train
```

In [1]:
import os
from fairing.preprocessors.converted_notebook import ConvertNotebookPreprocessorWithFire

In [2]:
preprocessor = ConvertNotebookPreprocessorWithFire('IssuesLoader', notebook_file='issues_loader.ipynb')

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files = ['embeddings.py', 'inference.py', 'repo_config.py']
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()

Converting issues_loader.ipynb to issues_loader.py
Creating entry point for the class name IssuesLoader


[PosixPath('issues_loader.py'),
 'inference.py',
 'embeddings.py',
 'repo_config.py']

In [3]:
preprocessor = ConvertNotebookPreprocessorWithFire('RepoMLP', notebook_file='repo_mlp.ipynb')

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files = ['mlp.py', 'repo_config.py']
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()

Converting repo_mlp.ipynb to repo_mlp.py
Creating entry point for the class name RepoMLP


[PosixPath('repo_mlp.py'), 'mlp.py', 'repo_config.py']

## Use Fairing to build docker image

In [4]:
import os
import sys
import fairing
from fairing.builders import append
from fairing.builders import cluster
from fairing.deployers import job
from fairing.preprocessors.converted_notebook import ConvertNotebookPreprocessorWithFire

In [5]:
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
print(GCP_PROJECT)
DOCKER_REGISTRY = 'gcr.io/{}/training'.format(GCP_PROJECT)
print(DOCKER_REGISTRY)
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'python:{}'.format(PY_VERSION)
# ucan use Dockerfile in this repo to build and use the base_image
base_image = 'gcr.io/issue-label-bot-dev/ml-gpu-lite-py3.6'

issue-label-bot-dev
gcr.io/issue-label-bot-dev/training


### Build Docker image
We use builders in Kubeflow Fairing to build docker images. We use `ClusterBuilder` to builds a docker image in a Kubernetes cluster and `AppendBuilder` to append a new layer tarball. We also include `preprocessor` as a parameter to send the processed inputs to docker build.

In [6]:
preprocessor = ConvertNotebookPreprocessorWithFire('RepoMLP', notebook_file='repo_mlp.ipynb')

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files = ['mlp.py', 'repo_config.py', 'embeddings.py', 'inference.py', 'issues_loader.py']
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()

Converting repo_mlp.ipynb to repo_mlp.py
Creating entry point for the class name RepoMLP


[PosixPath('repo_mlp.py'),
 'mlp.py',
 'embeddings.py',
 'issues_loader.py',
 'inference.py',
 'repo_config.py']

In [7]:
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
                                                 base_image=base_image,
                                                 namespace='chunhsiang',
                                                 preprocessor=preprocessor,
                                                 pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                                                 context_source=cluster.gcs_context.GCSContextSource())
cluster_builder.build()

Building image using cluster builder.
Creating docker context: /tmp/fairing_context_bw4xs2lr
Converting repo_mlp.ipynb to repo_mlp.py
Creating entry point for the class name RepoMLP
Waiting for fairing-builder-d8qgk to start...
Waiting for fairing-builder-d8qgk to start...
Waiting for fairing-builder-d8qgk to start...
Pod started running True


[36mINFO[0m[0006] Downloading base image gcr.io/issue-label-bot-dev/ml-gpu-lite-py3.6
[36mINFO[0m[0006] Downloading base image gcr.io/issue-label-bot-dev/ml-gpu-lite-py3.6
[33mWARN[0m[0006] Error while retrieving image from cache: getting image from path: open /cache/sha256:288cc7a9e121d6ff2b3053858146d7d0449c70fed521723df1d537247fe4d0d2: no such file or directory
[36mINFO[0m[0007] Checking for cached layer gcr.io/issue-label-bot-dev/training/fairing-job/cache:52b097ec9e1752ae25a961099a14457c753f759898c8727ea0fbc730a6ba877c...
[36mINFO[0m[0007] No cached layer found for cmd RUN if [ -e requirements.txt ];then pip install --no-cache -r requirements.txt; fi
[36mINFO[0m[0007] Unpacking rootfs as cmd RUN if [ -e requirements.txt ];then pip install --no-cache -r requirements.txt; fi requires it.
[36mINFO[0m[0155] Taking snapshot of full filesystem...
[36mINFO[0m[0170] Skipping paths under /dev, as it is a whitelisted directory
[36mINFO[0m[0170] Skipping paths under /etc/se

In [8]:
builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
                                      base_image=cluster_builder.image_tag,
                                      preprocessor=preprocessor)
builder.build()

Building image using Append builder...
Creating docker context: /tmp/fairing_context_os7mmd1e
Converting repo_mlp.ipynb to repo_mlp.py
Creating entry point for the class name RepoMLP
repo_mlp.py already exists in Fairing context, skipping...
Loading Docker credentials for repository 'gcr.io/issue-label-bot-dev/training/fairing-job:E9222071'
Invoking 'docker-credential-gcr' to obtain Docker credentials.
Successfully obtained Docker credentials.
Image successfully built in 1.9546294669999043s.
Pushing image gcr.io/issue-label-bot-dev/training/fairing-job:50BEEED1...
Loading Docker credentials for repository 'gcr.io/issue-label-bot-dev/training/fairing-job:50BEEED1'
Invoking 'docker-credential-gcr' to obtain Docker credentials.
Successfully obtained Docker credentials.
Uploading gcr.io/issue-label-bot-dev/training/fairing-job:50BEEED1
Layer sha256:dd81a869f87509229a9e346025abad411e2fd874fdbdcd29ea3b3dd5547a0b81 exists, skipping
Layer sha256:f401bdaa92adf9a46956b65a2d86021969297b5a5ed17b98

## Build pipeline
Kubeflow [Pipelines](https://www.kubeflow.org/docs/pipelines/) builds reusable end-to-end machine learning workflows.

Define the pipeline as a Python function. "@kfp.dsl.pipeline" is a required decoration including name and description properties.

We define two steps for our training pipelines, including scrapping issues and training model, both of which will be executed in our built image from Kubeflow Fairing. Also, we use GPU and add GCP credentials to the pipelines.

In [9]:
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler

In [10]:
target_image = 'gcr.io/issue-label-bot-dev/training/fairing-job:50BEEED1'

In [11]:
@dsl.pipeline(
   name='Training pipeline',
   description='A pipeline that loads embeddings and trains a model for a github repo.'
)
def train_pipeline(owner, repo):
    scrape_op = dsl.ContainerOp(
            name='scrape issues',
            image=target_image,
            command=['python', 'issues_loader.py', 'save_issue_embeddings', f'--owner={owner}', f'--repo={repo}'],
            ).set_gpu_limit(1).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    scrape_op.container.working_dir = '/app'

    train_op = dsl.ContainerOp(
            name='train',
            image=target_image,
            command=['python', 'repo_mlp.py', 'train', f'--owner={owner}', f'--repo={repo}'],
            ).set_gpu_limit(1).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    train_op.container.working_dir = '/app'
    train_op.after(scrape_op)

### Compile the pipeline
We compile our pipeline to an intermediate representation, which is a YAML file compressed in a zip file.

In [12]:
pipeline_func = train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

### Submit the pipeline for execution
We upload our created pipeline, the zip file, and run it. Then, we can see the pipeline and experiments on Kubeflow UI.

In [13]:
EXPERIMENT_NAME = 'TrainModel'

client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

In [14]:
#Specify pipeline argument values
arguments = {'owner': 'kubeflow', 'repo': 'kubeflow'}

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)