# Orchestrating AutoML Tables training workflow

In [23]:
import kfp
from kfp.components import func_to_container_op
from typing import NamedTuple

## Prepare lab environment


### Set lab settings

In [203]:
PROJECT_ID = 'jk-caip'
DATASET_ID = 'lab_301'
DATASET_LOCATION = 'US'
TRANSACTIONS_TABLE_ID = 'transactions'
TRANSACTIONS_TABLE_SCHEMA = 'customer_id:STRING,order_date:DATE,quantity:INTEGER,unit_price:FLOAT'
TRANSACTIONS_SOURCE_FILE = '../datasets/clv/transactions.csv'
CLUSTER_NAME = 'mlops5-cluster'
CLUSTER_ZONE = 'us-central1-a'
COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.1.33/components/gcp/'

### Create a BigQuery dataset

In [204]:
!bq --location=$DATASET_LOCATION --project_id=$PROJECT_ID mk --dataset $DATASET_ID

BigQuery error in mk operation: Dataset 'jk-caip:lab_301' already exists.


### Load sale transactions data to BigQuery

In [205]:
!bq --project_id=$PROJECT_ID --dataset_id=$DATASET_ID load \
--source_format=CSV \
--skip_leading_rows=1 \
--replace \
$TRANSACTIONS_TABLE_ID \
$TRANSACTIONS_SOURCE_FILE \
$TRANSACTIONS_TABLE_SCHEMA

Upload complete.
Waiting on bqjob_r42ef5cbf2ac46b0f_0000016e588a182c_1 ... (4s) Current status: DONE   


## Create the KFP training pipeline

### Create component factories for the pre-defined GCP components

In [95]:
component_store = kfp.components.ComponentStore(
    local_search_paths=None,
    url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])
    
automl_create_dataset_op = component_store.load_component('automl/create_dataset_for_tables')
automl_import_data_from_bq_op = component_store.load_component('automl/import_data_from_bigquery')
automl_create_model__op = component_store.load_component('automl/create_model_for_tables')
automl_split_dataset_table_column_names_op = component_store.load_component('automl/split_dataset_table_column_names')

### Create custom components

#### Create a base docker image for the custom components

In [155]:
%%writefile Dockerfile
FROM python:3.7
RUN pip3 install --upgrade google-cloud-bigquery google-api-core

Overwriting Dockerfile


In [156]:
IMAGE_NAME="lab_301_components"
IMAGE_URI="gcr.io/{}/{}:latest".format(PROJECT_ID, IMAGE_NAME)
!gcloud builds submit --timeout 15m --tag $IMAGE_URI

Creating temporary tarball archive of 10 file(s) totalling 229.3 KiB before compression.
Uploading tarball of [.] to [gs://jk-caip_cloudbuild/source/1573442503.13-cdf51009c2e1441abf89702e072a1d28.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jk-caip/builds/dca423db-12bf-4ba9-8920-48264b037d5e].
Logs are available at [https://console.cloud.google.com/gcr/builds/dca423db-12bf-4ba9-8920-48264b037d5e?project=919618504923].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "dca423db-12bf-4ba9-8920-48264b037d5e"

FETCHSOURCE
Fetching storage object: gs://jk-caip_cloudbuild/source/1573442503.13-cdf51009c2e1441abf89702e072a1d28.tgz#1573442503567740
Copying gs://jk-caip_cloudbuild/source/1573442503.13-cdf51009c2e1441abf89702e072a1d28.tgz#1573442503567740...
/ [1 files][ 33.6 KiB/ 33.6 KiB]                                                
Operation completed over 1 objects/33.6 KiB.                                     
BUILD
Already have

#### Create BQ query component

In [196]:
def bq_query(query: str, 
             project_id:str, 
             dataset_id: str, 
             table_id: str, 
             location: str) -> NamedTuple('Outputs', [('table_reference', str), ('job_id', str)]):

    from google.cloud import bigquery
    from google.api_core import exceptions
    import logging
    import os
    import uuid
    
    KFP_POD_ENV_NAME = 'KFP_POD_NAME'
    DEFAULT_DATASET_ID = 'lab_301'
    
    def _prepare_dataset_ref(client, dataset_id, location):
        if not dataset_id:
            dataset_id = DEFAULT_DATASET_ID
        dataset_ref = client.dataset(dataset_id)
        try:
            dataset = client.get_dataset(dataset_ref)
        except exceptions.NotFound:
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = location
            logging.info('Creating dataset {}'.format(dataset_id))
            client.create_dataset(dataset)
        
        return dataset_ref

    def _get_job(client, job_id):
        try:
            return client.get_job(job_id)
        except exceptions.NotFound:
            return None
    
    client = bigquery.Client(project=project_id, location=location)
    job_config = bigquery.QueryJobConfig()
    job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
    job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
    job_id = 'query_' + os.environ.get(KFP_POD_ENV_NAME, uuid.uuid1().hex)
     
    if not _get_job(client, job_id):
        dataset_ref = _prepare_dataset_ref(client, dataset_id, location)
        if not table_id:
            table_id = job_id
        table_ref = dataset_ref.table(table_id)
        job_config.destination = table_ref
        logging.info('Submitting the jobL {}'.format(job_id))
        query_job = client.query(query, job_config, job_id=job_id)
        query_job.result() # Wait for query to finish
            
    
    return (table_ref.path, job_id)
    
bq_query_op = func_to_container_op(bq_query, base_image='gcr.io/jk-caip/lab_301_components:latest')

### Define the pipeline
#### Set default values for pipeline parameters

In [197]:
_project_id = PROJECT_ID
_features_dataset_id = DATASET_ID
_features_dataset_location = DATASET_LOCATION
_features_table_id = 'features'
_aml_compute_region = 'us-central1'
_aml_dataset_name = 'clv_features'
_aml_model_name = 'clv_regression'
_target_column_name = 'target_monetary'
_train_budget = 1000
_optimization_objective = 'MINIMIZE_MAE'
_primary_metric = 'mean_absolute_error'
_deployment_threshold = 900

##### Set a default feature engineering query

In [198]:
from jinja2 import Template

_query_template = '''
WITH
  order_summaries as (
    SELECT
      a.customer_id,
      a.order_date,
      a.order_value,
      a.order_qty_articles
    FROM
    (
      SELECT
        customer_id,
        order_date,
        ROUND(SUM(unit_price * quantity), 2) AS order_value,
        SUM(quantity) AS order_qty_articles,
        (
          SELECT
            MAX(order_date)
          FROM
            `{{ data_source_id }}` tl
          WHERE
            tl.customer_id = t.customer_id
        ) latest_order
      FROM
        `{{ data_source_id }}` t
      GROUP BY
          customer_id,
          order_date
    ) a

    INNER JOIN (
      -- Only customers with more than one positive order values before threshold.
      SELECT
        customer_id
      FROM (
        -- Customers and how many positive order values  before threshold.
        SELECT
          customer_id,
          SUM(positive_value) cnt_positive_value
        FROM (
          -- Customer with whether order was positive or not at each date.
          SELECT
            customer_id,
            (
              CASE
                WHEN SUM(unit_price * quantity) > 0 THEN 1
                ELSE 0
              END ) positive_value
          FROM
            `{{ data_source_id }}`
          WHERE
            order_date < DATE("{{ threshold_date }}")
          GROUP BY
            customer_id,
            order_date)
        GROUP BY
          customer_id )
      WHERE
        cnt_positive_value > 1
      ) b
    ON
      a.customer_id = b.customer_id
    --[START common_clean]
    WHERE
      -- Bought in the past 3 months
      DATE_DIFF(DATE("{{ predict_end }}"), latest_order, DAY) <= 90
      -- Make sure returns are consistent.
      AND (
        (order_qty_articles > 0 and order_Value > 0) OR
        (order_qty_articles < 0 and order_Value < 0)
      ))
          
SELECT
  tf.customer_id,
  -- For training period
  -- Copying the calculations from Lifetimes where first orders are ignored
  -- See https://github.com/CamDavidsonPilon/lifetimes/blob/master/lifetimes/utils.py#L246
--[START features_target]
  ROUND(tf.monetary, 2) as monetary,
  tf.cnt_orders AS frequency,
  tf.recency,
  tf.T,
  ROUND(tf.recency/cnt_orders, 2) AS time_between,
  ROUND(tf.avg_basket_value, 2) AS avg_basket_value,
  ROUND(tf.avg_basket_size, 2) AS avg_basket_size,
  tf.cnt_returns,
  -- Target calculated for overall period
  ROUND(tt.target_monetary, 2) as target_monetary
--[END features_target]
FROM
  -- This SELECT uses only data before threshold to make features.
  (
    SELECT
      customer_id,
      SUM(order_value) AS monetary,
      DATE_DIFF(MAX(order_date), MIN(order_date), DAY) AS recency,
      DATE_DIFF(DATE('{{ threshold_date }}'), MIN(order_date), DAY) AS T,
      COUNT(DISTINCT order_date) AS cnt_orders,
      AVG(order_qty_articles) avg_basket_size,
      AVG(order_value) avg_basket_value,
      SUM(CASE
          WHEN order_value < 1 THEN 1
          ELSE 0 END) AS cnt_returns
    FROM
      order_summaries a
    WHERE
      order_date <= DATE('{{ threshold_date }}')
    GROUP BY
      customer_id) tf,

  -- This SELECT uses data after threshold to calculate the target )
  (
    SELECT
      customer_id,
      SUM(order_value) target_monetary
    FROM
      order_summaries
      WHERE order_date > DATE('{{ threshold_date }}')
    GROUP BY
      customer_id) tt
WHERE
  tf.customer_id = tt.customer_id
  AND tf.monetary > 0
  AND tf.monetary <= {{ max_monetary }}
'''

_query = Template(_query_template).render(
    data_source_id='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, TRANSACTIONS_TABLE_ID),
    threshold_date='2011-08-08',
    predict_end='2011-12-12',
    max_monetary=15000)

#### Define the pipeline function

In [199]:
@kfp.dsl.pipeline(
    name='CLV Training',
    description='CLV Training Pipeline using BigQuery for feature engineering and Automl Tables for model training'
)
def clv_train(
    project_id:str =_project_id,
    feature_engineering_query:str =_query,
    aml_compute_region:str =_aml_compute_region,
    features_table_id:str =_features_table_id,
    features_dataset_id:str =_features_dataset_id,
    features_dataset_location:str =_features_dataset_location,
    aml_dataset_name:str =_aml_dataset_name,
    target_column_name:str =_target_column_name,
    aml_model_name:str =_aml_model_name,
    train_budget:int =_train_budget,
    primary_metric:float =_primary_metric,
    deployment_threshold:float =_deployment_threshold
    ):
    
    engineer_features = bq_query_op(
        query=feature_engineering_query,
        project_id=project_id,
        dataset_id=features_dataset_id,
        table_id=features_table_id,
        location=features_dataset_location)
    
    from kfp.gcp import use_gcp_secret
    kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))


### Compile the pipeline

In [200]:
_pipeline_yaml = 'clv_training.yaml'
kfp.compiler.Compiler().compile(clv_train, _pipeline_yaml)

## Upload the pipeline to KFP
Get GKE cluster credentials.

In [134]:
!gcloud config set project $PROJECT_ID
!gcloud container clusters get-credentials $CLUSTER_NAME --zone $CLUSTER_ZONE

Updated property [core/project].
Fetching cluster endpoint and auth data.
kubeconfig entry generated for mlops5-cluster.


Use `kfp.Client()` to upload the pipeline.

In [201]:
_pipeline_name = 'clv_training_pipeline'
_client = kfp.Client()


pipelines = [pipeline for pipeline in _client.list_pipelines(page_size=100).pipelines if pipeline.name == _pipeline_name]

if pipelines:
    print("Pipeline with this name already exists")
    _pipeline_ref = pipelines[0]
    
else:
    _pipeline_ref = _client.upload_pipeline(_pipeline_yaml, _pipeline_name)

## Trigger a pipeline run

In [202]:
_experiment_name = 'CLV Training'
_run_name = 'Run 01'
_params = dict()

try:
    _experiment_ref = _client.get_experiment(_experiment_name)
except:
    _experiment_ref = _client.create_experiment(_experiment_name)

_client.run_pipeline(
    _experiment_ref.id,
    _run_name,
    pipeline_package_path=None,
    params=_params,
    pipeline_id = _pipeline_ref.id)

{'created_at': datetime.datetime(2019, 11, 11, 3, 36, 56, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'finished_at': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=tzlocal()),
 'id': '033bde69-3689-4131-94aa-f1089ef8dcfb',
 'metrics': None,
 'name': 'Run 01',
 'pipeline_spec': {'parameters': None,
                   'pipeline_id': '98af5f51-51a5-41b0-bd77-747b233a8fc8',
                   'pipeline_manifest': None,
                   'workflow_manifest': '{"kind":"Workflow","apiVersion":"argoproj.io/v1alpha1","metadata":{"generateName":"clv-training-","creationTimestamp":null,"annotations":{"pipelines.kubeflow.org/pipeline_spec":"{\\"description\\": '
                                        '\\"CLV Training Pipeline using '
                                        'BigQuery for feature engineering and '
                                        'Automl Tables for model training\\", '
                                        '\\"inputs\\": [{\\"default\\": '
                      

## Clean up

In [None]:
!bq --project_id=$PROJECT_ID rm -r -f -d $DATASET_NAME