# Operationalizing training with Kubeflow Pipelines

Make sure that the current version of KFP SDK is installed

In [1]:
RELEASE = "0.1.26"
RELEASE_URL = "https://storage.googleapis.com/ml-pipeline/release/{}/kfp.tar.gz".format(RELEASE) 
%pip install $RELEASE_URL --upgrade

Collecting https://storage.googleapis.com/ml-pipeline/release/0.1.26/kfp.tar.gz
  Using cached https://storage.googleapis.com/ml-pipeline/release/0.1.26/kfp.tar.gz
Building wheels for collected packages: kfp
  Building wheel for kfp (setup.py) ... [?25ldone
[?25h  Stored in directory: /tmp/pip-ephem-wheel-cache-qqf_65p1/wheels/89/0c/5f/7bcd7449cc45156ddbdd7c288cbc8ecdfb6662e226e3927b21
Successfully built kfp
Installing collected packages: kfp
  Found existing installation: kfp 0.1.25
    Uninstalling kfp-0.1.25:
      Successfully uninstalled kfp-0.1.25
Successfully installed kfp-0.1.25
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import kfp
from kfp import gcp
import json
import time

## Create a training container image

### Create a Dockerfile

In [21]:
TRAINING_IMAGE_FOLDER = '../training_image'

os.makedirs(TRAINING_IMAGE_FOLDER, exist_ok=True)

In [22]:
%%writefile $TRAINING_IMAGE_FOLDER/Dockerfile

FROM gcr.io/jk-demo1/sklearn-cpu:latest
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Overwriting ../training_image/Dockerfile


### Create a training script

In [23]:
%%writefile $TRAINING_IMAGE_FOLDER/train.py

import logging
import os
import subprocess
import sys
import joblib
import fire
import numpy as np
import pandas as pd

from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.linear_model import Ridge
from sklearn.manifold import TSNE 
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


def train(job_dir, data_path, n_features_options, l2_reg_options):
    
  # Load data from GCS
  df_train = pd.read_csv(data_path)

  y = df_train.octane
  X = df_train.drop('octane', axis=1)
    
  # Configure a training pipeline
  pipeline = Pipeline([
    ('scale', StandardScaler()),
    ('reduce_dim', PCA()),
    ('regress', Ridge())
  ])

  # Configure a parameter grid
  param_grid = [
    {
      'reduce_dim__n_components': n_features_options,
      'regress__alpha': l2_reg_options
    }
  ]

  # Tune hyperparameters
  grid = GridSearchCV(pipeline, cv=10, n_jobs=None, param_grid=param_grid, scoring='neg_mean_squared_error', iid=False)
  grid.fit(X, y)

  logging.info("Best estimator: {}".format(grid.best_params_))
  logging.info("Best score: {}".format(grid.best_score_))
    
  # Retrain the best model on a full dataset
  best_estimator = grid.best_estimator_
  trained_pipeline = best_estimator.fit(X, y)

  # Save the model
  model_filename = 'model.joblib'
  joblib.dump(value=trained_pipeline, filename=model_filename)
  gcs_model_path = "{}/{}".format(job_dir, model_filename)
  subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
  logging.info("Saved model in: {}".format(gcs_model_path)) 
    
if __name__ == "__main__":
  logging.basicConfig(level=logging.INFO)
  fire.Fire(train)

Overwriting ../training_image/train.py


### Build the image

In [24]:
PROJECT_ID = !gcloud config list project --format "value(core.project)"
PROJECT_ID = PROJECT_ID[0]
IMAGE_REPO_NAME="octane-regression-training"
IMAGE_TAG="latest"
IMAGE_URI="gcr.io/{}/{}:{}".format(PROJECT_ID, IMAGE_REPO_NAME, IMAGE_TAG)

!gcloud builds submit --tag $IMAGE_URI $TRAINING_IMAGE_FOLDER

Creating temporary tarball archive of 2 file(s) totalling 2.0 KiB before compression.
Uploading tarball of [../training_image] to [gs://jk-demo1_cloudbuild/source/1566938464.36-5002fe55cdc946a78fc9dad65b0fe1ef.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jk-demo1/builds/aa196a47-bf0e-4428-abc5-0971c37ddafe].
Logs are available at [https://console.cloud.google.com/gcr/builds/aa196a47-bf0e-4428-abc5-0971c37ddafe?project=826865698127].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "aa196a47-bf0e-4428-abc5-0971c37ddafe"

FETCHSOURCE
Fetching storage object: gs://jk-demo1_cloudbuild/source/1566938464.36-5002fe55cdc946a78fc9dad65b0fe1ef.tgz#1566938464819369
Copying gs://jk-demo1_cloudbuild/source/1566938464.36-5002fe55cdc946a78fc9dad65b0fe1ef.tgz#1566938464819369...
/ [1 files][  1.1 KiB/  1.1 KiB]                                                
Operation completed over 1 objects/1.1 KiB.                                      
B

## Define the pipeline

In [25]:
PROJECT_ID = !gcloud config list project --format "value(core.project)"
PROJECT_ID = PROJECT_ID[0]

TRAIN_COMPONENT_SPEC="https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/ml_engine/train/component.yaml"
mlengine_train_op = kfp.components.load_component_from_url(TRAIN_COMPONENT_SPEC)

REGION = 'us-central1'
JOB_ID_PREFIX="OCTANE_PREDICTOR_TRAINING"


@kfp.dsl.pipeline(
    name="octane_predictor_training",
    description="Cloud AI Platform Training and Deployment")
def pipeline(
    project_id=PROJECT_ID,
    region=REGION,
    job_dir="",
    args=""):
    
  train_task = mlengine_train_op(
      project_id=project_id,
      region=region,
      args=args,
      job_dir=job_dir,
      job_id_prefix=JOB_ID_PREFIX,
      master_image_uri=IMAGE_URI,
   ).apply(gcp.use_gcp_secret('user-gcp-sa'))
  

## Compile the pipeline

In [26]:
pipeline_filename = pipeline._pipeline_name + '.zip'
kfp.compiler.Compiler().compile(pipeline, pipeline_filename)

## Submit the run
### Fetch GKE credentials

In [27]:
CLUSTER_NAME = "kf-gke-cluster"
ZONE = "us-central1-a"

!gcloud container clusters get-credentials $CLUSTER_NAME --zone $ZONE

Fetching cluster endpoint and auth data.
kubeconfig entry generated for kf-gke-cluster.


### Create a KFP experiment

In [28]:
EXPERIMENT_NAME = "Octane Predictor Training"

client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

2019-08-27 20:42:10:INFO:Creating experiment Octane Predictor Training.


### Submit a run

In [29]:
run_name =  'run_{}'.format(time.strftime("%Y%m%d_%H%M%S"))
JOB_DIR = "gs://caip-demo-jobdir/"
ARGS = json.dumps([
    '--data-path', 'gs://caip-demo-datasets/gasdata/gasdata.csv',
    '--n-features-options', '[2,4,6]', 
    '--l2-reg-options', '[0.1,0.2,0.3,0.5]'
])
arguments = {"job_dir": JOB_DIR, "args": ARGS}
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)