### Create the template file for creating the pipeline

In [129]:
%%writefile ./pipeline/sensor_training_pipeline.py
import os
from func_components import load_raw_data
from func_components import split_data
from func_components import disp_loss
from jinja2 import Template
import kfp
from kfp.components import func_to_container_op
from kfp.dsl.types import Dict
from kfp.dsl.types import GCPProjectID
from kfp.dsl.types import GCPRegion
from kfp.dsl.types import GCSPath
from kfp.dsl.types import String
from kfp.gcp import use_gcp_secret

# Defaults and environment settings
BASE_IMAGE = os.getenv('BASE_IMAGE')
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE')
DD_IMAGE = os.getenv('DD_IMAGE')
RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')
USE_KFP_SA = os.getenv('USE_KFP_SA')

# Create component factories
component_store = kfp.components.ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

# Create all the component ops
caip_train_op = component_store.load_component('ml_engine/train')

retrieve_raw_data_op = func_to_container_op(
    load_raw_data, base_image=BASE_IMAGE)

split_preprocess_data_op = func_to_container_op(
    split_data, base_image=BASE_IMAGE)

disp_loss_op = func_to_container_op(
    disp_loss)

def datadescribe_op(gcs_root, filepath):
    return kfp.dsl.ContainerOp(
        name='Run_Data_Decsribe',
        image = 'gcr.io/mwpmltr/rrusson_kubeflow_datadescribe:v1',
        arguments=[
            '--gcs_root', gcs_root,
            '--file', filepath
        ]
    )


# Define the pipeline
@kfp.dsl.pipeline(
    name='Bearing Sensor Data Training',
    description='The pipeline for training and deploying an anomaly detector based on an autoencoder')

def pipeline_run(project_id,
                 region,
                 source_bucket_name, 
                 prefix,
                 dest_bucket_name,
                 dest_file_name,
                 gcs_root="gs://rrusson-kubeflow-test",
                 dataset_location='US'):
    
    # Read in the raw sensor data from the public dataset and load in the project bucket
    raw_data = retrieve_raw_data_op(source_bucket_name,
                                    prefix,
                                    dest_bucket_name,
                                    dest_file_name)
    
    
    # Prepare some output from Data Describe
    dd_out = datadescribe_op(gcs_root, 
                             raw_data.outputs['dest_file_name'])
    
    
    # Preprocess and split the raw data by time
    split_data = split_preprocess_data_op(raw_data.outputs['dest_bucket_name'],
                                          raw_data.outputs['dest_file_name'],
                                          '2004-02-15 12:52:39',
                                          True)
    
    # Set up the training args
    train_args = ["--bucket", split_data.outputs['bucket_name'],
                  "--train_file", split_data.outputs['train_dest_file'],
                  "--test_file", split_data.outputs['test_dest_file']
                 ]
    
    job_dir = "{0}/{1}/{2}".format(gcs_root, 'jobdir', kfp.dsl.RUN_ID_PLACEHOLDER)
    
    # Train the model on AI Platform
    train_model = caip_train_op(project_id,
                                region=region,
                                master_image_uri=TRAINER_IMAGE,
                                job_id_prefix='anomaly-detection_',
                                job_dir=job_dir,
                                args=train_args)
    
    # Expose artifacts to the Kubeflow UI
    disp_loss_img = disp_loss_op(train_model.outputs['job_id'])
    disp_loss_dist_img = disp_loss_op(train_model.outputs['job_id'])
    


Overwriting ./pipeline/sensor_training_pipeline.py


### Set up the environment

In [130]:
REGION = 'us-central1'
ENDPOINT = '629f42fe6886c9d5-dot-us-central2.pipelines.googleusercontent.com'
ARTIFACT_STORE_URI = 'gs://rrusson-kubeflow-test'
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

### Create the base image and load it into gcr.io

In [131]:
IMAGE_NAME='rrusson_kubeflow_base'
TAG='v1'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [132]:
# DON'T RUN THIS IF THE IMAGE EXISTS!
#!gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image

### Create the training image from the base image and load it into the gcr.io (maybe just have one image?)

In [133]:
IMAGE_NAME='rrusson_kubeflow_tf2_trainer'
TAG='v5'
TRAINER_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [134]:
# DON'T RUN THIS IF THE IMAGE EXISTS!
#!gcloud builds submit --timeout 15m --tag $TRAINER_IMAGE train_image

### Create the Data Describe image from the base image and load it into gcr.io

In [135]:
IMAGE_NAME='rrusson_kubeflow_datadescribe'
TAG='v1'
DD_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [136]:
# DON'T RUN THIS IF THE IMAGE EXISTS!
#!gcloud builds submit --timeout 15m --tag $DD_IMAGE dd_image

### Compile the Pipeline

In [137]:
USE_KFP_SA = False

COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
RUNTIME_VERSION = '1.15'
PYTHON_VERSION = '3.7'

%env USE_KFP_SA={USE_KFP_SA}
%env BASE_IMAGE={BASE_IMAGE}
%env TRAINER_IMAGE={TRAINER_IMAGE}
%env COMPONENT_URL_SEARCH_PREFIX={COMPONENT_URL_SEARCH_PREFIX}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}

env: USE_KFP_SA=False
env: BASE_IMAGE=gcr.io/mwpmltr/rrusson_kubeflow_base:v1
env: TRAINER_IMAGE=gcr.io/mwpmltr/rrusson_kubeflow_tf2_trainer:v5
env: COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/
env: RUNTIME_VERSION=1.15
env: PYTHON_VERSION=3.7


In [138]:
!dsl-compile --py pipeline/sensor_training_pipeline.py --output sensor_training_pipeline.yaml

### List the Pipeline in AI Platform Pipelines

In [139]:
PIPELINE_NAME='bearing_sensor_anomaly_v1.0'

!kfp --endpoint $ENDPOINT pipeline upload \
-p $PIPELINE_NAME \
sensor_training_pipeline.yaml

Pipeline aa671d0b-b5e8-4137-b5fa-329fc35557d7 has been submitted

Pipeline Details
------------------
ID           aa671d0b-b5e8-4137-b5fa-329fc35557d7
Name         bearing_sensor_anomaly_v1.0
Description
Uploaded at  2020-12-04T17:05:52+00:00
+--------------------+----------------------------+
| Parameter Name     | Default Value              |
| project_id         |                            |
+--------------------+----------------------------+
| region             |                            |
+--------------------+----------------------------+
| source_bucket_name |                            |
+--------------------+----------------------------+
| prefix             |                            |
+--------------------+----------------------------+
| dest_bucket_name   |                            |
+--------------------+----------------------------+
| dest_file_name     |                            |
+--------------------+----------------------------+
| gcs_root           | gs://

In [101]:
!kfp --endpoint $ENDPOINT pipeline list

+--------------------------------------+-------------------------------------------------+---------------------------+
| Pipeline ID                          | Name                                            | Uploaded at               |
| 8f258509-2c2b-4179-8950-38d6fa5280e4 | bearing_sensor_anomaly_v8                       | 2020-12-04T04:36:41+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| fe571e93-1965-4c1f-8afc-a7f5059cc03b | bearing_sensor_anomaly_v7                       | 2020-12-04T03:56:55+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| b9d3a470-891f-4b5e-b2bb-d02fbf929816 | bearing_sensor_anomaly_v6                       | 2020-12-03T20:34:57+00:00 |
+--------------------------------------+-------------------------------------------------+---------------------------+
| 5c85e06e-0a59-44c1-b730-9690fd270bc6 | bearing

### Submit a Run

In [35]:
PIPELINE_ID='c9cda641-e37b-422f-a195-57e77d504f91'

In [28]:
EXPERIMENT_NAME = 'Train Anomaly Detector'
RUN_ID = 'Run_001'
SOURCE_BUCKET_NAME = 'amazing-public-data'
PREFIX = 'bearing_sensor_data/bearing_sensor_data/'
DEST_BUCKET_NAME = 'rrusson-kubeflow-test'
DEST_FILE_NAME = 'raw_data_v3.csv'

GCS_STAGING_PATH = '{}/staging'.format(ARTIFACT_STORE_URI)

In [29]:
!kfp --endpoint $ENDPOINT run submit \
-e $EXPERIMENT_NAME \
-r $RUN_ID \
-p $PIPELINE_ID \
project_id=$PROJECT_ID \
gcs_root=$GCS_STAGING_PATH \
region=$REGION \
source_bucket_name=$SOURCE_BUCKET_NAME \
prefix=$PREFIX \
dest_bucket_name=$DEST_BUCKET_NAME \
DEST_FILE_NAME=$DEST_FILE_NAME

dictionary update sequence element #0 has length 1; 2 is required


In [27]:
PROJECT_ID

'mwpmltr'