# Kubeflow Pipelines demo using the Chicago taxi trips problem

## Setup

In [1]:
!gcloud auth activate-service-account --key-file=${GOOGLE_APPLICATION_CREDENTIALS}

Activated service account credentials for: [kubeflow-user@kubeflow-demo-256908.iam.gserviceaccount.com]


In [2]:
%%capture pip_install_out

!pip install pip --upgrade
!pip install -r mlpipeline_utils/requirements.txt --upgrade

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import json
import logging
import os
import sys
sys.path.append('../mlpipeline_utils')
import uuid
import tempfile

import kfp
import kfp.gcp

from mlpipeline_utils.kfp_components import *


logging.getLogger().setLevel(logging.INFO)

In [5]:
PROJECT_ID = !gcloud config get-value project
PROJECT_ID = PROJECT_ID[0]
REGION = 'us-central1'
DEV_BUCKET = f'gs://{PROJECT_ID}-dev'
!gsutil mb -b on {DEV_BUCKET}
BASE_GCS_PATH = os.path.join(DEV_BUCKET, 'demo', 'chicago_taxi_tips')
MODEL_MODULE_PATH = os.path.join(BASE_GCS_PATH, 'model.py')

Creating gs://kubeflow-demo-256908-dev/...
ServiceException: 409 Bucket kubeflow-demo-256908-dev already exists.


In [None]:
UTILS_IMAGE = kfp.containers.build_image_from_working_dir(
    working_dir='../mlpipeline_utils',
    base_image=f'gcr.io/{PROJECT_ID}/kfp-base',
)
UTILS_IMAGE

In [8]:
kfp_client = kfp.Client()
kfp_compiler = kfp.compiler.Compiler()


def _get_pipeline_id_by_name(pipeline_name):
    next_page_token = ''
    while next_page_token is not None:
        list_result = kfp_client.list_pipelines(next_page_token).to_dict()
        next_page_token = list_result['next_page_token']
        for l in list_result['pipelines']:
            if l['name'] == pipeline_name:
                return l['id']


def _delete_pipeline_by_name(pipeline_name):
    pipeline_id = _get_pipeline_id_by_name(pipeline_name)
    if pipeline_id is not None:
        kfp_client.pipelines.delete_pipeline(pipeline_id)


def _force_upload_pipeline(pipeline_func):
    pipeline_name = pipeline_func._pipeline_name
    with tempfile.NamedTemporaryFile(suffix='.zip') as package_file:
        kfp.compiler.Compiler().compile(pipeline_func, package_file.name)
        _delete_pipeline_by_name(pipeline_name)
        kfp_client.upload_pipeline(package_file.name, pipeline_name=pipeline_name)

## Upload model module (IMPORTANT!)

In [9]:
!gsutil cp chicago_taxi_trips_model.py {MODEL_MODULE_PATH}

Copying file://chicago_taxi_trips_model.py [Content-Type=text/x-python]...
/ [1 files][ 12.4 KiB/ 12.4 KiB]                                                
Operation completed over 1 objects/12.4 KiB.                                     


## Pipeline for dataset generation

In [10]:
@kfp.dsl.pipeline(
    name='Generate chicago taxi trips dataset',
    description='Pipeline to generate and analyze train/eval splits for the Chicago traxi trips problem.'
)
def chicago_taxi_trips_dataset_pipeline(
    examples_output_dir: str,
    stats_output_dir: str,
    base_sample_rate: float=0.0001,
    train_samples_fraction: float=0.75,
    temp_dir: str=os.path.join(BASE_GCS_PATH, 'temp'),
    project_id: str=PROJECT_ID,
    region: str=REGION,
    runner: str='DirectRunner',
):
    workflow_temp_dir = os.path.join(str(temp_dir), '{{workflow.name}}')

    base_query = """
    SELECT
        unique_key AS row_id,
        pickup_community_area,
        fare,
        EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,
        EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,
        EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,
        UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,
        pickup_latitude,
        pickup_longitude,
        dropoff_latitude,
        dropoff_longitude,
        trip_miles,
        pickup_census_tract,
        dropoff_census_tract,
        payment_type,
        company,
        trip_seconds,
        dropoff_community_area,
        tips
    FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
    """

    train_eval_examples_op = generate_random_train_eval_examples_from_bq_comp(
        name='generate_train_eval_examples',
        utils_image=UTILS_IMAGE,
        project_id=project_id,
        region=region,
        temp_dir=workflow_temp_dir,
        runner=runner,
        base_query=base_query,
        base_sample_rate=base_sample_rate,
        train_samples_fraction=train_samples_fraction,
        output_dir=examples_output_dir,
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))

    train_dataset_stats_op = tfrecord_stats_gen_comp(
        name='train_dataset_stats',
        utils_image=UTILS_IMAGE,
        project_id=project_id,
        region=region,
        temp_dir=workflow_temp_dir,
        runner=runner,
        data_location=os.path.join(str(examples_output_dir), 'train', '*.tfrecord.gz'),
        output_dir=os.path.join(str(stats_output_dir), 'train'),
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))
    train_dataset_stats_op.after(train_eval_examples_op)

    eval_dataset_stats_op = tfrecord_stats_gen_comp(
        name='eval_dataset_stats',
        utils_image=UTILS_IMAGE,
        project_id=project_id,
        region=region,
        temp_dir=workflow_temp_dir,
        runner=runner,
        data_location=os.path.join(str(examples_output_dir), 'eval', '*.tfrecord.gz'),
        output_dir=os.path.join(str(stats_output_dir), 'eval'),
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))
    eval_dataset_stats_op.after(train_eval_examples_op)

### Submit adhoc pipeline run (good for debugging)

#### Local runner (heavily undersampled)

In [11]:
dataset_id = str(uuid.uuid4()).replace('-', '')
examples_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'examples')
stats_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'stats')

kfp_client.create_run_from_pipeline_func(
    chicago_taxi_trips_dataset_pipeline,
    arguments={
        'project_id': PROJECT_ID,
        'region': REGION,
        'base_sample_rate': 0.0001,
        'train_samples_fraction': 0.75,
        'runner': 'DirectRunner',
        'examples_output_dir': examples_output_dir,
        'stats_output_dir': stats_output_dir
    },
    experiment_name='Chicago taxi trips dataset',
    run_name=f'train_eval_examples_local_{dataset_id}'
)



<kfp._client.Client.create_run_from_pipeline_package.<locals>.RunPipelineResult at 0x7f03ac5d1c18>

#### Dataflow runner (for when we need to scale)

In [10]:
dataset_id = str(uuid.uuid4()).replace('-', '')
examples_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'examples')
stats_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'stats')

kfp_client.create_run_from_pipeline_func(
    chicago_taxi_trips_dataset_pipeline,
    arguments={
        'project_id': PROJECT_ID,
        'region': REGION,
        'base_sample_rate': 0.1,
        'train_samples_fraction': 0.75,
        'runner': 'DataflowRunner',
        'examples_output_dir': examples_output_dir,
        'stats_output_dir': stats_output_dir
    },
    experiment_name='Chicago taxi trips dataset',
    run_name=f'train_eval_examples_dataflow_{dataset_id}'
)



<kfp._client.Client.create_run_from_pipeline_package.<locals>.RunPipelineResult at 0x7fcfd747f898>

### Upload final pipeline iteration

In [11]:
_force_upload_pipeline(chicago_taxi_trips_dataset_pipeline)

## Pipeline for training and evaluating the model

In [9]:
def run_tfma(
    eval_savedmodel_dir: str,
    data_location: str,
    output_dir: str
):
    import logging
    import tensorflow_model_analysis as tfma

    logging.getLogger().setLevel(logging.INFO)

    eval_shared_model = tfma.default_eval_shared_model(
        eval_saved_model_path=eval_savedmodel_dir,
        add_metrics_callbacks=[
            tfma.post_export_metrics.auc_plots(),
            tfma.post_export_metrics.calibration_plot_and_prediction_histogram()
        ]
    )
    eval_result = tfma.run_model_analysis(
        eval_shared_model=eval_shared_model,
        data_location=data_location,
        file_format='tfrecords',
        slice_spec=[
            tfma.slicer.SingleSliceSpec(),
            tfma.slicer.SingleSliceSpec(columns=['trip_start_hour']),
        ],
        output_path=output_dir
    )


run_tfma_comp = kfp.components.func_to_container_op(run_tfma, base_image=UTILS_IMAGE)

In [10]:
@kfp.dsl.pipeline(
    name='Build Chicago taxi trips model',
    description='Build a model for the Chicago taxi trips problem.'
)
def chicago_taxi_trips_model_pipeline(
    train_examples_dir: str,
    eval_examples_dir: str,
    examples_schema_path: str,
    model_dir: str,
    tfma_output_dir: str='',
    module_path: str=MODEL_MODULE_PATH,
    train_steps: int=10000,
    eval_steps: int=1000,
    temp_dir: str=os.path.join(BASE_GCS_PATH, 'temp'),
    project_id: str=PROJECT_ID,
    region: str=REGION,
    runner: str='DirectRunner'
):
    workflow_temp_dir = os.path.join(str(temp_dir), '{{workflow.name}}')
    transformed_examples_base_dir = os.path.join(workflow_temp_dir, 'transformed_examples')
    train_transformed_examples_dir = os.path.join(transformed_examples_base_dir, 'train')
    eval_transformed_examples_dir = os.path.join(transformed_examples_base_dir, 'eval')
    transform_fn_dir = os.path.join(workflow_temp_dir, 'transform_fn')

    train_tft_analyze_and_transform_op = tft_analyze_and_transform_comp(
        name='train-tft-analyze-and-transform',
        utils_image=UTILS_IMAGE,
        project_id=project_id,
        region=region,
        temp_dir=workflow_temp_dir,
        raw_examples_path=os.path.join(str(train_examples_dir), '*.tfrecord.gz'),
        raw_examples_schema_path=examples_schema_path,
        preprocessing_module_path=module_path,
        transformed_examples_path_prefix=os.path.join(str(train_transformed_examples_dir), 'part'),
        transform_fn_dir=transform_fn_dir,
        runner=runner
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))

    eval_tft_transform_op = tft_transform_comp(
        name='eval-tft-transform',
        utils_image=UTILS_IMAGE,
        project_id=project_id,
        region=region,
        temp_dir=workflow_temp_dir,
        raw_examples_path=os.path.join(str(eval_examples_dir), '*.tfrecord.gz'),
        raw_examples_schema_path=examples_schema_path,
        transformed_examples_path_prefix=os.path.join(str(eval_transformed_examples_dir), 'part'),
        transform_fn_dir=transform_fn_dir,
        runner=runner
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))
    eval_tft_transform_op.after(train_tft_analyze_and_transform_op)

    trainer_op = tf_estimator_trainer_comp(
        name='tf-train-and-evaluate',
        utils_image=UTILS_IMAGE,
        module_path=str(module_path),
        model_dir=model_dir,
        hparams=json.dumps({
            'train_files': os.path.join(str(train_transformed_examples_dir), '*.tfrecord.gz'),
            'eval_files': os.path.join(str(eval_transformed_examples_dir), '*.tfrecord.gz'),
            'transform_output': str(transform_fn_dir),
            'train_steps': str(train_steps),
            'eval_steps': str(eval_steps),
            'schema_path': str(examples_schema_path),
            'warm_start_from': None,
        }),
        metrics_to_export=' '.join([
            'auc', 'auc_precision_recall', 'loss'
        ])
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))

    trainer_op.after(train_tft_analyze_and_transform_op)
    trainer_op.after(eval_tft_transform_op)

    with kfp.dsl.Condition(tfma_output_dir != ''):
        run_tfma_op = run_tfma_comp(
            eval_savedmodel_dir=trainer_op.outputs['eval_savedmodel_dir'],
            data_location=os.path.join(str(eval_examples_dir), '*.tfrecord.gz'),
            output_dir=str(tfma_output_dir)
        ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa'))

### Submit adhoc pipeline run (good for debugging)

In [11]:
dataset_id = '16d4f29d89c240a19e4bf61fed07505f'  # big scale dataset
# dataset_id = 'e500ca33e46846f1ac4367444e314657'  # small scale dataset
examples_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'examples')
stats_output_dir = os.path.join(BASE_GCS_PATH, 'datasets', dataset_id, 'stats')
model_id = str(uuid.uuid4()).replace('-', '')

kfp_client.create_run_from_pipeline_func(
    chicago_taxi_trips_model_pipeline, 
    arguments={
        'project_id': PROJECT_ID,
        'region': REGION,
#         'runner': 'DirectRunner',
        'runner': 'DataflowRunner',
        'train_examples_dir': os.path.join(examples_output_dir, 'train'),
        'eval_examples_dir': os.path.join(examples_output_dir, 'eval'),
        'examples_schema_path': os.path.join(stats_output_dir, 'train', 'inferred_schema.pb2'),
        'model_dir': os.path.join(BASE_GCS_PATH, 'models', model_id),
        'train_steps': 1000000,
        'eval_steps': 1000,
        'tfma_output_dir': os.path.join(BASE_GCS_PATH, 'tfma', model_id)
    },
    experiment_name='Chicago taxi trips modeling',
    run_name=f'train_chicago_taxi_trips_model_{model_id}'
)



<kfp._client.Client.create_run_from_pipeline_package.<locals>.RunPipelineResult at 0x7f3fd5a51898>

### Upload final pipeline iteration

In [18]:
_force_upload_pipeline(chicago_taxi_trips_model_pipeline)