# Orchestrate BigQuery and AutoML tables with Kubeflow pipelines

In [1]:
import kfp
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

from typing import NamedTuple

## Create a base image to be used by lightweight components
The image is created by Kaniko Kubernetes service. The image contains the libraries required to interface with BigQuery, Storage and AutoML tables services.

In [2]:
# Configure a staging directory for Kaniko
STAGING_DIR = 'gs://jksandbox/staging'
PROJECT_NAME = 'sandbox-235500'
# Set the base image name
BASE_IMAGE='gcr.io/%s/automltablesbase:dev' % PROJECT_NAME
TARGET_IMAGE='gcr.io/%s/automltablescreate:dev' % PROJECT_NAME

Jupyter docker magic is used to start a Kaniko job. The magic uses a default Kubernetes config.

In [None]:
%%docker {BASE_IMAGE} {STAGING_DIR}
FROM tensorflow/tensorflow:latest-py3
RUN pip3 install --upgrade pandas
RUN pip3 install --upgrade google-cloud-storage
RUN pip3 install --upgrade google-cloud-automl
RUN pip3 install --upgrade google-cloud-bigquery

In [3]:
project_id = 'sandbox-235500'
dataset_id = 'CLVDataset'
transactions_table_id = 'transactions'
threshold_date = '2011-08-08'
predict_end = '2011-12-12'
max_monetary = '15000'
model_name = 'CLVModel'
order_summaries_table_id = "order_summaries"


### Create python lightweight components

In [4]:
DATA_PREPROCESSING_QUERY_TEMPLATE = '''
SELECT
  a.customer_id,
  a.order_date,
  a.order_value,
  a.order_qty_articles
FROM
(
  SELECT
    customer_id,
    order_date,
    ROUND(SUM(unit_price * quantity), 2) AS order_value,
    SUM(quantity) AS order_qty_articles,
    (
      SELECT
        MAX(order_date)
      FROM
        `<<project_id>>.<<dataset_id>>.<<transactions_table_id>>` tl
      WHERE
        tl.customer_id = t.customer_id
    ) latest_order
  FROM
    `<<project_id>>.<<dataset_id>>.<<transactions_table_id>>` t
  GROUP BY
      customer_id,
      order_date
) a

INNER JOIN (
  -- Only customers with more than one positive order values before threshold.
  SELECT
    customer_id
  FROM (
    -- Customers and how many positive order values  before threshold.
    SELECT
      customer_id,
      SUM(positive_value) cnt_positive_value
    FROM (
      -- Customer with whether order was positive or not at each date.
      SELECT
        customer_id,
        (
          CASE
            WHEN SUM(unit_price * quantity) > 0 THEN 1
            ELSE 0
          END ) positive_value
      FROM
        `<<project_id>>.<<dataset_id>>.<<transactions_table_id>>`
      WHERE
        order_date < DATE("<<threshold_date>>")
      GROUP BY
        customer_id,
        order_date)
    GROUP BY
      customer_id )
  WHERE
    cnt_positive_value > 1
  ) b
ON
  a.customer_id = b.customer_id
--[START common_clean]
WHERE
  -- Bought in the past 3 months
  DATE_DIFF(DATE("<<predict_end>>"), latest_order, DAY) <= 90
  -- Make sure returns are consistent.
  AND (
    (order_qty_articles > 0 and order_Value > 0) OR
    (order_qty_articles < 0 and order_Value < 0)
  )
'''

FEATURE_ENGINEERING_QUERY_TEMPLATE = '''
SELECT
  tf.customer_id,
  -- For training period
  -- Copying the calculations from Lifetimes where first orders are ignored
  -- See https://github.com/CamDavidsonPilon/lifetimes/blob/master/lifetimes/utils.py#L246
--[START features_target]
  ROUND(tf.monetary, 2) as monetary,
  tf.cnt_orders AS frequency,
  tf.recency,
  tf.T,
  ROUND(tf.recency/cnt_orders, 2) AS time_between,
  ROUND(tf.avg_basket_value, 2) AS avg_basket_value,
  ROUND(tf.avg_basket_size, 2) AS avg_basket_size,
  tf.cnt_returns,
  -- Target calculated for overall period
  ROUND(tt.target_monetary, 2) as target_monetary
--[END features_target]
FROM
  -- This SELECT uses only data before threshold to make features.
  (
    SELECT
      customer_id,
      SUM(order_value) AS monetary,
      DATE_DIFF(MAX(order_date), MIN(order_date), DAY) AS recency,
      DATE_DIFF(DATE('<<threshold_date>>'), MIN(order_date), DAY) AS T,
      COUNT(DISTINCT order_date) AS cnt_orders,
      AVG(order_qty_articles) avg_basket_size,
      AVG(order_value) avg_basket_value,
      SUM(CASE
          WHEN order_value < 1 THEN 1
          ELSE 0 END) AS cnt_returns
    FROM
      -- Makes the order value = 0 if it is the first one
      (
        SELECT
          a.*,
          (CASE
              WHEN a.order_date = c.order_date_min THEN 0
              ELSE a.order_value END) AS order_value_btyd
        FROM
          `<<project_id>>.<<dataset_id>>.<<order_summaries_table_id>>` a
        INNER JOIN (
          SELECT
            customer_id,
            MIN(order_date) AS order_date_min
          FROM
            `<<project_id>>.<<dataset_id>>.<<order_summaries_table_id>>`
          GROUP BY
            customer_id) c
        ON
          c.customer_id = a.customer_id
      )
    WHERE
      order_date <= DATE('<<threshold_date>>')
    GROUP BY
      customer_id) tf,

  -- This SELECT uses all records to calculate the target (could also use data after threshold )
  (
    SELECT
      customer_id,
      SUM(order_value) target_monetary
    FROM
      `<<project_id>>.<<dataset_id>>.<<order_summaries_table_id>>`
      --WHERE order_date > DATE('<<threshold_date>>')
    GROUP BY
      customer_id) tt
WHERE
  tf.customer_id = tt.customer_id
  AND tf.monetary > 0
  AND tf.monetary <= <<max_monetary>>

'''


### Import BigQuery table to AutoML Tables dataset component

In [8]:
@kfp.dsl.python_component(name='Import dataset', base_image=BASE_IMAGE,target_component_file='import_op.yaml')
def import_dataset(
    location: str,
    project_id: str,
    dataset_id: str,
    table_id: str,
    display_name: str) -> NamedTuple('DatasetInfo', 
                               [('automl_dataset_name', str), 
                                ('automl_dataset_id', str)]):
    """Creates an AutoML Tables dataset from the data in BigQuery."""
    
    from collections import namedtuple
    from google.cloud import automl_v1beta1 as automl
    
    client = automl.AutoMlClient()
    # Create dataset
    location_path = client.location_path(project_id, location)
    create_dataset_response = client.create_dataset(
        location_path,
        {
            'display_name': display_name,
            'tables_dataset_metadata': {}
        })
    
    path = "bq://{}.{}.{}".format(project_id, dataset_id, table_id)
    input_config = {"bigquery_source": {"input_uri": path}}
    import_data_response = client.import_data(create_dataset_response.name, input_config)
    print("Initiating import ...")
    # synchronous check of operation status.
    print("Data imported. {}".format(import_data_response.result()))
    
    # Return component outputs
    result = namedtuple('DatasetInfo', ['automl_dataset_name', 'automl_dataset_id'])
    return result(create_dataset_response.display_name, create_dataset_response.name)
    

### Generate SQL queries from templates
This is a workaround for a bug in BigQuery components that prevents using paramtrized queries

In [5]:
@kfp.dsl.python_component(name='Preprocessing query', base_image=BASE_IMAGE,target_component_file='clean_op.yaml')
def generate_data_preprocessing_query(
    query_template: str,
    project_id: str,
    dataset_id: str,
    transactions_table_id: str,
    threshold_date: str,
    predict_end: str) -> str:
    """Parametrizes clean data query"""
    
    query = str(query_template)
    query = query.replace("<<project_id>>", str(project_id))
    query = query.replace("<<dataset_id>>", str(dataset_id))
    query = query.replace("<<threshold_date>>", str(threshold_date))
    query = query.replace("<<predict_end>>", str(predict_end))
    query = query.replace("<<transactions_table_id>>", str(transactions_table_id))
    
    return query


In [6]:
@kfp.dsl.python_component(name='Feature engineering query', base_image=BASE_IMAGE,target_component_file='features_op.yaml')
def generate_feature_engineering_query(
    query_template: str,
    project_id: str,
    dataset_id: str,
    order_summaries_table_id: str,
    threshold_date: str,
    max_monetary: int) -> str:
    """Parametrizes engineer features queries"""
    
    query = str(query_template)
    query = query.replace("<<project_id>>", str(project_id))
    query = query.replace("<<dataset_id>>", str(dataset_id))
    query = query.replace("<<threshold_date>>", str(threshold_date))
    query = query.replace("<<max_monetary>>", str(max_monetary))
    query = query.replace("<<order_summaries_table_id>>", str(order_summaries_table_id))
    
    return query


In [None]:
query = generate_data_preprocessing_query(
    DATA_PREPROCESSING_QUERY_TEMPLATE,
    project_id,
    dataset_id,
    transactions_table_id,
    threshold_date,
    predict_end
)

In [None]:
from google.cloud import bigquery

client = bigquery.Client()

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset(dataset_id).table(order_summaries_table_id)
job_config.destination = table_ref
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE



query_job = client.query(query, location='US', job_config=job_config)  # API request
rows = iter(query_job.result())  # Waits for query to finish

for _ in range(5):
    print(next(rows))

In [None]:
query = generate_feature_engineering_query(
    FEATURE_ENGINEERING_QUERY_TEMPLATE,
    project_id,
    dataset_id,
    order_summaries_table_id,
    threshold_date,
    max_monetary
)

In [None]:
from google.cloud import bigquery

client = bigquery.Client()

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset(dataset_id).table('features')
job_config.destination = table_ref
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE



query_job = client.query(query, location='US', job_config=job_config)  # API request
rows = iter(query_job.result())  # Waits for query to finish

for _ in range(5):
    print(next(rows))

## Create and run a pipeline

### Define a pipeline

In [12]:
from google.cloud import bigquery
import json

ORDER_SUMMARIES_TABLE_ID = 'order_summaries'
FEATURES_TABLE_ID = 'clv_features'
AUTOML_DATASET_NAME = 'CLVFeatures'

generate_data_preprocessing_query_op = kfp.components.func_to_container_op(generate_data_preprocessing_query)
generate_feature_engineering_query_op = kfp.components.func_to_container_op(generate_feature_engineering_query)
import_dataset_op = kfp.components.func_to_container_op(import_dataset)

bq_query_op = kfp.components.load_component_from_url(
        'https://raw.githubusercontent.com/kubeflow/pipelines/e8524eefb138725fc06600d1956da0f4dd477178/components/gcp/bigquery/query/component.yaml')


@dsl.pipeline(
    name='CLVPipeline',
    description='CLV Pipeline'
)
def clv_pipeline(
    project_id='', 
    dataset_id='', 
    transactions_table_id='',
    threshold_date='',
    predict_end='',
    model_name='',
    max_monetary=15000,
    bq_dataset_location='US',
    automl_dataset_location='us-central1'
):


    generate_data_preprocessing_query_task = generate_data_preprocessing_query_op(
        query_template=DATA_PREPROCESSING_QUERY_TEMPLATE,
        project_id=project_id,
        dataset_id=dataset_id,
        transactions_table_id=transactions_table_id,
        threshold_date=threshold_date,
        predict_end=predict_end
        )
  
    generate_feature_engineering_query_task = generate_feature_engineering_query_op(
        query_template=FEATURE_ENGINEERING_QUERY_TEMPLATE,
        project_id=project_id,
        dataset_id=dataset_id,
        order_summaries_table_id=ORDER_SUMMARIES_TABLE_ID,
        threshold_date=threshold_date,
        max_monetary=max_monetary
        )
    
    clean_data_task = bq_query_op(
        query=generate_data_preprocessing_query_task.output, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=ORDER_SUMMARIES_TABLE_ID, 
        output_gcs_path='', 
        dataset_location=bq_dataset_location, 
        job_config='').apply(gcp.use_gcp_secret('user-gcp-sa'))
    clean_data_task.name="clean_data"
 
    engineer_features_task = bq_query_op(
        query=generate_feature_engineering_query_task.output, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=FEATURES_TABLE_ID, 
        output_gcs_path='', 
        dataset_location=bq_dataset_location, 
        job_config='').apply(gcp.use_gcp_secret('user-gcp-sa'))
    engineer_features_task.name="engineer_features"
    
    engineer_features_task.after(clean_data_task)
    
    import_dataset_task = import_dataset_op(automl_dataset_location, project_id, dataset_id, FEATURES_TABLE_ID, AUTOML_DATASET_NAME)
    import_dataset_task.name = "import-from-bq"
    
    import_dataset_task.after(engineer_features_task)
    


### Compile a pipeline

In [13]:
pipeline_func = clv_pipeline
pipeline_filename = pipeline_func.__name__ + '.tar.gz'

kfp.compiler.Compiler().compile(pipeline_func, pipeline_filename)

### Submit the pipeline for execution

In [14]:
#Specify pipeline argument values
arguments = {
    'project_id': 'sandbox-235500',
    'dataset_id': 'CLVDataset',
    'transactions_table_id': 'transactions',
    'threshold_date': '2011-08-08',
    'predict_end': '2011-12-12',
    'max_monetary': '15000',
    'model_name': 'CLVModel'
}


HOST = 'http://localhost:8082/api/v1/namespaces/kubeflow/services/ml-pipeline:8888/proxy'
EXPERIMENT_NAME = 'CLV_TRAINING'

client = kfp.Client(HOST)
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
print(run_result)

{'created_at': datetime.datetime(2019, 5, 8, 1, 2, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'finished_at': None,
 'id': 'e299fcf8-712c-11e9-bf64-42010a800073',
 'metrics': None,
 'name': 'clv_pipeline run',
 'pipeline_spec': {'parameters': [{'name': 'model-name', 'value': 'CLVModel'},
                                  {'name': 'threshold-date',
                                   'value': '2011-08-08'},
                                  {'name': 'predict-end',
                                   'value': '2011-12-12'},
                                  {'name': 'max-monetary', 'value': '15000'},
                                  {'name': 'project-id',
                                   'value': 'sandbox-235500'},
                                  {'name': 'dataset-id', 'value': 'CLVDataset'},
                                  {'name': 'transactions-table-id',
                                   'value': 'transactions'}],
                   'pipeline_id': None,
           