In [1]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#            http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Orchestrating AutoML Tables training and deployment with Kubeflow Pipelines

In this lab you develop a continous training and deployment pipeline using Kubeflow Pipelines, BigQuery and AutoML Tables.

The scenario used in the lab is  predicting customer lifetime value (CLV).

The goal of CLV modeling is to identify the most valuable customers - customers that are going to generate the highest value in a given future time range. The CLV models are built from a variety of data sources - historical sales data being the most important one and in many cases the only one. 

Predicting Customer Lifetime Value (CLV)  is a representative example of a use case where you may need to fine-tune and re-train a predictive model on a frequent basis. As there is a constant flow of new sales transactions that constitute the core of training data, models have to be kept up to date with evolving purchase patterns. Automation of model training and deployment is critical. 

In the CLV model developed in this lab, the historical sales transactions are preprocessed and aggregated to engineer a set of latent features  representing the so-called RFM characteristics of your customers:
- Recency: How active have they been recently?
- Frequency: How often do they buy?
- Monetary: What amount do they spend?

The following diagram shows a succession of past sales for a set of four customers.

![clv_timeline](../../images/clv-timeline.png)

The diagram illustrates the RFM values for the customers, showing for each customer:
- Recency: The time between the last purchase and today, represented by the distance between the leftmost circle and the vertical dotted line that's labeled **Now**.
- Frequency: The time between purchaes, represented by the distance between the circles on a single line.
- Monetary: The amount of money spent on each purchase, represented by the size of the circle.

As demonstrated in the lab you usually create multiple features per each characteristic. For example, in the lab, Recency is captured by two features: *recency* and *T*.

The RFM input features and the target label are engineered using the following process:
- A time series of of historical sales transactions for a given customer is divided into two time periods: *the features period* and *the predict period*. A point in time that is used to divide the time series is referred two as *the threshold date*. 
- The transactions in *the features period* are aggregated to create the latent RFM input features 
- The transactions in *the predict period* are aggregated to calculate the target label representing the expected total value of the customer

This process results in a single example per customer and a set of examples across a customer population constitutes a training set.

The pipeline implemented in the lab, uses BigQuery as a source of historical sales transactions. BigQuery is also used to engineer RFM features. The model is then trained and deployed using AutoML Tables. The below diagram represents the control and data flow implemented by the pipeline.


![Training pipeline](../images/clv_train.png)

1. The BQ query is run to process sales transactions in the *transactions* table into RFM features in the *features* table. 
1. The data from the *features* table is imported into the AutoML dataset
1. The AutoML model is trained on the imported dataset
1. After the training completes the evaluation metrics are retrieved and compared against the performance threshold
1. If the model performs better than the threshold the model is deployed to AutoML Deployment

The sample dataset used in the lab is based on the publicaly available [Online Retail Data Set](http://archive.ics.uci.edu/ml/datasets/Online+Retail) from the UCI Machine Learning Repository. The original dataset was preprocessed to conform to the following schema:

| Field | Type | Description |
|-------|------|-------------|
| customer_id | string | A unique customer ID |
| order_date | date (yyyy-MM-dd) | The date of a transaction. Transactions (potentially from multiple invoices) are grouped by day |
| quantity | integer | A number of items of a single SKU in a transaction |
| unit_price | float | A unit price of a SKU |

The feature engineering query generates the features table with the below schema. 


| Field | Type | Description |
|-------|------|-------------|
| monetary | Float | The total spend by a customer in the features period|
| frequency | Integer | The number of transactions placed by a customer in the features period |
| recency | Integer |  The time (in days) between the first and the last orders in the features period |
| T | Integer | The time between the first order placed and in the threshold date|
| time_between | Float |  The average time betwee orders in the features period |
| avg_basket_value | Float |  The averate monetary value of the customer's basket in the features period |
| avg_basket_size | Float |  The average number of items in a basket in the features perio|
| cnt_returns | Integer |  The number of returns in the features period|
| target_monetary | Float | The total amount spent in the predict period. This is the label for predictions|





## Prepare lab environment
Let's start with configuring your GCP environment settings and uploading the sales transactions into BigQuery.

In [1]:
import kfp
from kfp.components import func_to_container_op
from typing import NamedTuple
from google.cloud import bigquery
from jinja2 import Template

### Set lab settings
Make sure to update the following values with you environment settings.

In [2]:
PROJECT_ID = 'mlops-workshop'
DATASET_LOCATION = 'US'
CLUSTER_NAME = 'mlops-workshop-cluster'
CLUSTER_ZONE = 'us-central1-a'

You don't need to change the below values.

In [3]:
DATASET_ID = 'lab_11'
TRANSACTIONS_TABLE_ID = 'transactions'
TRANSACTIONS_TABLE_SCHEMA = 'customer_id:STRING,order_date:DATE,quantity:INTEGER,unit_price:FLOAT'
TRANSACTIONS_SOURCE_FILE = '../datasets/clv/transactions.csv'
COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.1.36/components/gcp/'

### Create a BigQuery dataset

In [4]:
!bq --location=$DATASET_LOCATION --project_id=$PROJECT_ID mk --dataset $DATASET_ID

Dataset 'mlops-workshop:lab_11' successfully created.


### Load sale transactions data to BigQuery

In [5]:
!bq --project_id=$PROJECT_ID --dataset_id=$DATASET_ID load \
--source_format=CSV \
--skip_leading_rows=1 \
--replace \
$TRANSACTIONS_TABLE_ID \
$TRANSACTIONS_SOURCE_FILE \
$TRANSACTIONS_TABLE_SCHEMA

Upload complete.
Waiting on bqjob_r1476da991c727455_0000016ef83c5b9c_1 ... (4s) Current status: DONE   


### Explore the dataset
To query data in BigQuery you can use BigQuery Python client library ....

In [6]:
client = bigquery.Client()

In [7]:
query_template = """
SELECT *
FROM `{{ source_table }}`
LIMIT 100
"""

query = Template(query_template).render(
    source_table='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, TRANSACTIONS_TABLE_ID))

df = client.query(query).to_dataframe()
df.head(10)

Unnamed: 0,customer_id,order_date,quantity,unit_price
0,14646,2011-05-12,256,1.65
1,16553,2011-05-18,256,0.36
2,16701,2011-05-18,256,0.36
3,13941,2011-06-21,256,0.36
4,14258,2011-07-11,256,0.36
5,18102,2011-07-28,256,5.88
6,14646,2011-08-09,256,0.72
7,14646,2011-08-09,256,0.72
8,14646,2011-08-09,256,0.72
9,14646,2011-08-11,256,2.55


... or Jupyter the `%%bigquery` magic

In [9]:
%%bigquery --project $PROJECT_ID
SELECT *
FROM `lab_11.transactions`
WHERE customer_id='16553'

Unnamed: 0,customer_id,order_date,quantity,unit_price
0,16553,2011-05-18,256,0.36
1,16553,2011-03-10,2,12.75
2,16553,2011-02-08,4,12.75
3,16553,2011-03-10,4,5.95
4,16553,2011-05-18,6,2.95
...,...,...,...,...
81,16553,2011-03-10,120,0.85
82,16553,2011-06-29,144,0.53
83,16553,2010-12-14,192,0.85
84,16553,2011-04-12,-4,12.75


As you can see there are multiple sales transactions per customer. They represent the purchasing history and behavior of a given customer. For example, 
the customer identified by 16553 has 85 orders. Most of them are new purchases. Some of them are returns - the records with a negative quantity.

The feature engineering query converts these 85 records into a single record representing the RFM charateristics of this customer.

## Create the KFP training pipeline

### Create component factories for the pre-defined GCP components

In [10]:
component_store = kfp.components.ComponentStore(
    local_search_paths=None,
    url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])
    
automl_create_dataset_op = component_store.load_component('automl/create_dataset_for_tables')
automl_import_data_from_bq_op = component_store.load_component('automl/import_data_from_bigquery')
automl_create_model__op = component_store.load_component('automl/create_model_for_tables')
automl_split_dataset_table_column_names_op = component_store.load_component('automl/split_dataset_table_column_names')

### Create custom components

#### Create a base docker image for the custom components

In [11]:
%%writefile Dockerfile
FROM python:3.7
RUN pip3 install --upgrade google-cloud-bigquery google-api-core google-cloud-automl grpcio

Overwriting Dockerfile


In [12]:
IMAGE_NAME="lab_11_components"
IMAGE_URI="gcr.io/{}/{}:latest".format(PROJECT_ID, IMAGE_NAME)
!gcloud builds submit --timeout 15m --tag $IMAGE_URI

Creating temporary tarball archive of 6 file(s) totalling 139.5 KiB before compression.
Uploading tarball of [.] to [gs://mlops-workshop_cloudbuild/source/1576122789.78-90692af3fd994ee8b105fa40c9329d27.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/mlops-workshop/builds/0aa087e2-6fce-4236-8488-41d9aa443a59].
Logs are available at [https://console.cloud.google.com/gcr/builds/0aa087e2-6fce-4236-8488-41d9aa443a59?project=745302968357].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "0aa087e2-6fce-4236-8488-41d9aa443a59"

FETCHSOURCE
Fetching storage object: gs://mlops-workshop_cloudbuild/source/1576122789.78-90692af3fd994ee8b105fa40c9329d27.tgz#1576122790087837
Copying gs://mlops-workshop_cloudbuild/source/1576122789.78-90692af3fd994ee8b105fa40c9329d27.tgz#1576122790087837...
/ [1 files][ 29.5 KiB/ 29.5 KiB]                                                
Operation completed over 1 objects/29.5 KiB.                             

#### Create BQ query component

In [22]:
def bq_query(query: str, 
             project_id:str, 
             dataset_id: str, 
             table_id: str, 
             location: str) -> NamedTuple('Outputs', [('table_uri', str), ('job_id', str)]):

    from google.cloud import bigquery
    from google.api_core import exceptions
    import logging
    import os
    import uuid
    
    logging.basicConfig(level=logging.INFO)
    
    client = bigquery.Client(project=project_id, location=location)
    
    job_config = bigquery.QueryJobConfig()
    job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
    job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
    job_id = 'query_' + os.environ.get('KFP_POD_NAME', uuid.uuid1().hex)
    
    dataset_ref = client.dataset(dataset_id)
    try:
        dataset = client.get_dataset(dataset_ref)
    except exceptions.NotFound:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = location
        logging.info('Creating dataset {}'.format(dataset_id))
        client.create_dataset(dataset)
     
    table_id = table_id if table_id else job_id
    table_ref = dataset_ref.table(table_id)
    job_config.destination = table_ref
    logging.info('Submitting the job {}'.format(job_id))
    query_job = client.query(query, job_config, job_id=job_id)
    query_job.result() # Wait for query to finish
            
    table_uri = 'bq://{}.{}.{}'.format(project_id, dataset_id, table_id)
    
    return (table_uri, job_id)
    
bq_query_op = func_to_container_op(bq_query, base_image='gcr.io/mlops-workshop/lab_11_components:latest')

#### Create a component that retrieves and logs AutoML evaluation metrics

In [23]:
def automl_log_regression_metrics(model_path: str,
               primary_metric:str) -> NamedTuple('Outputs', [('primary_metric', str), ('primary_metric_value', float)]):
    
    import logging
    import json
    from google.cloud import automl_v1beta1 as automl

    logging.basicConfig(level=logging.INFO)
    client = automl.TablesClient()

    # Retrieve evaluation metrics
    for evaluation in client.list_model_evaluations(model_name=model_path):
        if evaluation.regression_evaluation_metrics.ListFields():
            evaluation_metrics = evaluation.regression_evaluation_metrics      
    primary_metric_value = getattr(evaluation_metrics, primary_metric)
    
    # Write the primary metric as a KFP pipeline metric
    metrics = {
        'metrics': [{
            'name': primary_metric.replace('_', '-'),
            'numberValue': primary_metric_value
        }]
    }
    with open('/mlpipeline-metrics.json', 'w') as f:
       json.dump(metrics, f)
    
    return (primary_metric, primary_metric_value)
    
    
log_regression_metrics_op = func_to_container_op(automl_log_regression_metrics, base_image='gcr.io/mlops-workshop/lab_11_components:latest')

#### Create a component that deploys an AutoML model

In [24]:
def automl_deploy_model(model_path: str):
    
    import logging
    from google.cloud import automl_v1beta1 as automl
    from google.cloud.automl_v1beta1 import enums
    
    logging.basicConfig(level=logging.INFO)
    client = automl.TablesClient()
    
    model = client.get_model(model_name=model_path)
    if model.deployment_state != enums.Model.DeploymentState.DEPLOYED:
        logging.info("Starting model deployment: {}".format(model_path))
        response = client.deploy_model(model_name=model_path)
        response.result() # Wait for operation to complete
        logging.info("Deployment completed")
    else:
         logging.info("Model already deployed")
    
    
deploy_model_op = func_to_container_op(automl_deploy_model, base_image='gcr.io/mlops-workshop/lab_11_components:latest')

### Define the pipeline
#### Set default values for pipeline parameters

In [25]:
project_id = PROJECT_ID
features_dataset_id = DATASET_ID
features_dataset_location = DATASET_LOCATION
features_table_id = 'features'
aml_compute_region = 'us-central1'
aml_dataset_name = 'clv_features'
aml_model_name = 'clv_regression'
target_column_name = 'target_monetary'
train_budget = 1000
optimization_objective = 'MINIMIZE_MAE'
primary_metric = 'mean_absolute_error'
deployment_threshold = 900

# Read and render the query template
query_template_file = 'query_template.sql.jinja'
with open(query_template_file, 'r') as file:
    query_template = file.read()

query = Template(query_template).render(
    data_source_id='{}.{}.{}'.format(PROJECT_ID, DATASET_ID, TRANSACTIONS_TABLE_ID),
    threshold_date='2011-08-08',
    predict_end='2011-12-12',
    max_monetary=15000)

#### Define the pipeline function

In [26]:
@kfp.dsl.pipeline(
    name='CLV Training',
    description='CLV Training Pipeline using BigQuery for feature engineering and Automl Tables for model training'
)
def clv_train(
    project_id:str,
    feature_engineering_query:str =query,
    aml_compute_region:str =aml_compute_region,
    features_table_id:str =features_table_id,
    features_dataset_id:str =features_dataset_id,
    features_dataset_location:str =features_dataset_location,
    aml_dataset_name:str =aml_dataset_name,
    target_column_name:str =target_column_name,
    aml_model_name:str =aml_model_name,
    train_budget:'Integer' =train_budget,
    optimization_objective:str =optimization_objective,
    primary_metric:str =primary_metric,
    deployment_threshold:'Float' =deployment_threshold
    ):
    """Trains a Customer Lifetime Value model"""
    
    # Use BigQuery to engineer features from transaction data
    engineer_features = bq_query_op(
        query=feature_engineering_query,
        project_id=project_id,
        dataset_id=features_dataset_id,
        table_id=features_table_id,
        location=features_dataset_location)
    
    # Create an AML Dataset
    create_dataset = automl_create_dataset_op(
        gcp_project_id=project_id,
        gcp_region=aml_compute_region,
        display_name=aml_dataset_name
    )
    
    # Import the features from BigQuery to AML Dataset
    import_data = automl_import_data_from_bq_op(
        dataset_path=create_dataset.outputs['dataset_path'],
        input_uri=engineer_features.outputs['table_uri']
    )
    
    # Set the target and feature columns
    split_column_specs = automl_split_dataset_table_column_names_op(
        dataset_path=import_data.outputs['dataset_path'],
        table_index=0,
        target_column_name=target_column_name
    )
    
    # Create a model
    create_model = automl_create_model__op(
        gcp_project_id=project_id,
        gcp_region=aml_compute_region,
        display_name=aml_model_name,
        dataset_id=create_dataset.outputs['dataset_id'],
        target_column_path=split_column_specs.outputs['target_column_path'],
        input_feature_column_paths=split_column_specs.outputs['feature_column_paths'],
        optimization_objective=optimization_objective,
        train_budget_milli_node_hours=train_budget
    )
    
    # Retrieve the primary metric from the model evaluations
    log_regression_metrics = log_regression_metrics_op(create_model.outputs['model_path'], primary_metric)
    
    # Deploy the model if the primary metric is better than threshold
    with kfp.dsl.Condition(log_regression_metrics.outputs['primary_metric_value'] < deployment_threshold):
        deploy_model = deploy_model_op(create_model.outputs['model_path'])
    
    from kfp.gcp import use_gcp_secret
    kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))


### Compile the pipeline

In [27]:
pipeline_yaml = 'clv_training.yaml'
kfp.compiler.Compiler().compile(clv_train, pipeline_yaml)

## Upload the pipeline to KFP
Get GKE cluster credentials.

In [28]:
!gcloud config set project $PROJECT_ID
!gcloud container clusters get-credentials $CLUSTER_NAME --zone $CLUSTER_ZONE

Updated property [core/project].
Fetching cluster endpoint and auth data.
kubeconfig entry generated for mlops-workshop-cluster.


Use `kfp.Client()` to upload the pipeline.

In [29]:
pipeline_name = 'clv_training_pipeline'
client = kfp.Client()

pipelines = [pipeline for pipeline in client.list_pipelines(page_size=100).pipelines if pipeline.name == pipeline_name]

if pipelines:
    print("Pipeline with this name already exists")
    pipeline_ref = pipelines[0]
    
else:
    pipeline_ref = client.upload_pipeline(pipeline_yaml, pipeline_name)

## Trigger a pipeline run

In [30]:
experiment_name = 'CLV Training'
run_name = 'Run 01'
params = dict(
    project_id=PROJECT_ID
)

try:
    experiment_ref = client.get_experiment(experiment_name)
except:
    experiment_ref = client.create_experiment(experiment_name)

run_ref =client.run_pipeline(
    experiment_ref.id,
    run_name,
    pipeline_package_path=None,
    params=params,
    pipeline_id=pipeline_ref.id)

(run_ref.name, run_ref.id)

('Run 01', 'a7c96154-e7d7-4e9e-a8de-5bc5bd1f456e')

## Clean up

In [None]:
!bq --project_id=$PROJECT_ID rm -r -f -d $DATASET_NAME