# TFX SDK examples

This notebook contains a series of related examples based on the "Chicago Taxi Pipeline", that show using the TFX SDK. It includes examples of how to use custom Python functions and custom containers.



## Setup

Before you run this notebook, ensure that your Google Cloud user account and project are granted access to the Managed Pipelines Experimental. To be granted access to the Managed Pipelines Experimental, fill out this [form](http://go/cloud-mlpipelines-signup) and let your account representative know you have requested access. 

This notebook is intended to be run on [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). See the "AI Platform Notebooks" section in the Experimental [User Guide](https://docs.google.com/document/d/1JXtowHwppgyghnj1N1CT73hwD1caKtWkLcm2_0qGBoI/edit?usp=sharing) for more detail on creating a notebook server instance.

**To run this notebook on AI Platform Notebooks**, click on the **File** menu, then select "Download .ipynb".  Then, upload that notebook from your local machine to AI Platform Notebooks. (In the AI Platform Notebooks left panel, look for an icon of an arrow pointing up, to upload).

The notebook will probably run on [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb) as well, though currently you may see some ignorable warnings.



We'll first install some libraries and set some variables.

Ensure python 3 is being used.

In [None]:
import sys
sys.version

'3.6.9 (default, Oct  8 2020, 12:12:24) \n[GCC 8.4.0]'

Set `gcloud` to use your project.  **Edit the following cell before running it**.

In [None]:
PROJECT_ID = 'your-project-id'  # <---CHANGE THIS

Set `gcloud` to use your project.

In [None]:
!gcloud config set project {PROJECT_ID}

If you're running this notebook on colab, authenticate with your user account:

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

-----------------

**If you're on AI Platform Notebooks**, authenticate with Google Cloud before running the next section, by running
```sh
gcloud auth login
```
**in the Terminal window** (which you can open via **File** > **New** in the menu). You only need to do this once per notebook instance.

In [None]:
!gsutil cp gs://cloud-aiplatform-pipelines/releases/20201123/aiplatform_pipelines_client-0.1.0.caip20201123-py3-none-any.whl .

In [None]:
if 'google.colab' in sys.modules:
  USER_FLAG = ''
else:
  USER_FLAG = '--user'

In [None]:
!pip install {USER_FLAG} pip==20.2.4 --upgrade
!pip install {USER_FLAG} kfp tfx==0.26.0 aiplatform_pipelines_client-0.1.0.caip20201123-py3-none-any.whl --upgrade

If you're on colab, and you got a prompt to restart the runtime after TFX installation, rerun some setup now before proceeding. As before, **edit the following to define your project id** before you run the next cell.

In [None]:
import sys
if 'google.colab' in sys.modules:
  PROJECT_ID = 'your-project-id'  # <---CHANGE THIS
  !gcloud config set project {PROJECT_ID}
  from google.colab import auth
  auth.authenticate_user()
  USER_FLAG = ''

In [None]:
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Check the TFX version. It should be == 0.26.0

In [None]:
# Check version
!pip show tfx

### Setup variables

Let's set up some variables used to customize the pipelines below.  **Edit the values in the cell below before running it**.

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Required Parameters
USER = 'YOUR_LDAP' # <---CHANGE THIS
BUCKET_NAME = 'YOUR_BUCKET_NAME'  # <---CHANGE THIS
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER)

PROJECT_ID = 'YOUR_PROJECT_ID'  # <---CHANGE THIS
REGION = 'us-central1'
API_KEY = 'YOUR_API_KEY'  # <---CHANGE THIS

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

## Part 2: Custom Python functions

In this section, we will create components from Python functions. We won't be doing any real ML— these simple functions are just used to illustrate the process.

We'll use [Cloud Build](https://cloud.google.com/cloud-build) to build the container image that runs the functions.



In [None]:
!mkdir -p custom_fns

In [None]:
%cd custom_fns

We begin by writing a preprocessing function that enables the user to specify different split fractions between training and test data.

In [None]:
%%writefile my_preprocess.py

import os
import tensorflow as tf  # Used for writing files.

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import OutputArtifact

@component
def MyPreprocess(training_data: OutputArtifact[Dataset]):
  with tf.io.gfile.GFile(os.path.join(training_data.uri, 'training_data_file.txt'), 'w') as f:
    f.write('Dummy training data')
    
  # We'll modify metadata and ensure that it gets passed to downstream components.  
  training_data.set_string_custom_property('my_custom_field', 'my_custom_value')
  training_data.set_string_custom_property('uri_for_output', training_data.uri)

Let's write a second component that uses the training data produced.

In [None]:
%%writefile my_trainer.py

import os
import tensorflow as tf

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import Model
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter


@component
def MyTraining(training_data: InputArtifact[Dataset],
               model: OutputArtifact[Model],
               num_iterations: Parameter[int] = 100):
  # Let's read the contents of training data and write to the metadata.
  with tf.io.gfile.GFile(
      os.path.join(training_data.uri, 'training_data_file.txt'), 'r') as f:
    contents = f.read()
    model.set_string_custom_property('contents_of_training_data', contents)
    model.set_int_custom_property('num_iterations_used', num_iterations)

Let's write a finalizer component that collects all metadata, and dumps it. Ensure that PIPELINE_ROOT is correctly defined before creating the template.

In [None]:
PIPELINE_ROOT

In [None]:
collector_template = f"""
import os
import tensorflow as tf
import json

from google.protobuf import json_format

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import Model
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import *
from tfx.utils import json_utils

OUTPUT_LOCATION = '{PIPELINE_ROOT}/python_function_pipeline/metadata.json'

@component
def MetadataCollector(training_data: InputArtifact[Dataset],
                      model: InputArtifact[Model]):
  artifacts = [
      json_format.MessageToDict(x)
      for x in [training_data.mlmd_artifact, model.mlmd_artifact]
  ]
  with tf.io.gfile.GFile(OUTPUT_LOCATION, 'w') as f:
    f.write(json.dumps(artifacts, indent=4))
"""
with open('metadata_collector.py', 'w') as f:
    f.write(collector_template)

Next, let's package the above into a container. We'll do this manually using a Dockerfile.

In [None]:
%%writefile Dockerfile
FROM gcr.io/tfx-oss-public/tfx:0.26.0
WORKDIR /pipeline
COPY ./ ./
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

Next, we'll use Cloud Build to build the container image and upload it to GCR.

In [None]:
!echo $PROJECT_ID

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/caip-tfx-custom:{USER} .

Next, let's author a pipeline using these components.

In [None]:
import os

# Only required for local run.
from tfx.orchestration.metadata import sqlite_metadata_connection_config

from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

from my_preprocess import MyPreprocess
from my_trainer import MyTraining
from metadata_collector import MetadataCollector

PIPELINE_NAME = "function-based-pipeline-{}".format(USER)

def function_based_pipeline(pipeline_root):
    preprocess = MyPreprocess()

    training = MyTraining(
        training_data=preprocess.outputs['training_data'],
        num_iterations=10000)
    
    collect = MetadataCollector(
        training_data=preprocess.outputs['training_data'],
        model=training.outputs['model'])
    
    pipeline_name = "function-based-pipeline-{}".format(USER)
    return Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        # Only needed for local runs.
        metadata_connection_config=sqlite_metadata_connection_config('metadata.sqlite'),
        components=[preprocess, training, collect])

Let's make sure this pipeline works locally, using the local runner:

In [None]:
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

LocalDagRunner().run(function_based_pipeline(pipeline_root='/tmp/pipeline_root'))

Check that the metadata was produced locally.

In [None]:
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2

connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'metadata.sqlite'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)
store.get_artifacts()

### Run the pipeline with Managed Pipelines

Now we're ready to run the pipeline!  We're constructing the client using the API_KEY that you created during setup.

In [None]:
from aiplatform.pipelines import client

my_client = client.Client(
    project_id=PROJECT_ID,
    region=REGION,
    api_key=API_KEY
)

In [None]:
config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
    project_id=PROJECT_ID,
    display_name='function-based-pipeline-{}'.format(USER),
    default_image='gcr.io/{}/caip-tfx-custom:{}'.format(PROJECT_ID, USER))
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=config,
    output_filename='pipeline.json')
runner.compile(
    function_based_pipeline(
        pipeline_root=os.path.join(PIPELINE_ROOT, PIPELINE_NAME)),
    write_out=True)
my_client.create_run_from_job_spec('pipeline.json')

When the pipeline is complete, we can check the final output file to see the metadata produced.

In [None]:
MD_URI = 'gs://{}/pipeline_root/{}/python_function_pipeline/metadata.json'.format(BUCKET_NAME, USER)
MD_URI

!gsutil cat {MD_URI}

In [None]:
%cd ..

## Part 3: Custom containers


In this section, we will build custom containers and chain them together as a pipeline.

This illustrates how we can pass data (using uris) to custom containers. 



### Container 1: Generate examples

First, we'll define and write out the `generate_examples.py` code:

In [None]:
!mkdir -p generate

In [None]:
%%writefile generate/generate_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds


def _serialize_example(example, label):
  example_value = tf.io.serialize_tensor(example).numpy()
  label_value = tf.io.serialize_tensor(label).numpy()
  feature = {
      'examples':
          tf.train.Feature(
              bytes_list=tf.train.BytesList(value=[example_value])),
      'labels':
          tf.train.Feature(bytes_list=tf.train.BytesList(value=[label_value])),
  }
  return tf.train.Example(features=tf.train.Features(
      feature=feature)).SerializeToString()


def _tf_serialize_example(example, label):
  serialized_tensor = tf.py_function(_serialize_example, (example, label),
                                     tf.string)
  return tf.reshape(serialized_tensor, ())


def generate_examples(training_data_uri, test_data_uri, config_file_uri):
  (train_data, test_data), info = tfds.load(
      # Use the version pre-encoded with an ~8k vocabulary.
      'imdb_reviews/subwords8k',
      # Return the train/test datasets as a tuple.
      split=(tfds.Split.TRAIN, tfds.Split.TEST),
      # Return (example, label) pairs from the dataset (instead of a dictionary).
      as_supervised=True,
      with_info=True)

  serialized_train_examples = train_data.map(_tf_serialize_example)
  serialized_test_examples = test_data.map(_tf_serialize_example)

  filename = os.path.join(training_data_uri, "train.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_train_examples)

  filename = os.path.join(test_data_uri, "test.tfrecord")
  writer = tf.data.experimental.TFRecordWriter(filename)
  writer.write(serialized_test_examples)

  encoder = info.features['text'].encoder
  config = {
      'vocab_size': encoder.vocab_size,
  }
  config_file = os.path.join(config_file_uri, "config")
  with tf.io.gfile.GFile(config_file, 'w') as f:
    f.write(json.dumps(config))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)

  args = parser.parse_args()
  generate_examples(args.training_data_uri, args.test_data_uri,
                    args.config_file_uri)

Next, we'll create a Dockerfile that builds a container to run `generate_examples.py`. Here we use a Google DLVM container image as our base. You may use your own image as the base image as well. Note that we're also installing the `tensorflow_datasets` library.

In [None]:
%%writefile generate/Dockerfile

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest
WORKDIR /pipeline
COPY generate_examples.py generate_examples.py
RUN pip install tensorflow_datasets
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/caip-tfx-custom-container-generate:{USER} generate

### Container 2: Train Examples
Next, we'll do the same for the 'Train Examples' custom container. We'll first write out a `train_examples.py` file, then build a container that runs it.

In [None]:
!mkdir -p train

In [None]:
%%writefile train/train_examples.py

import argparse
import json
import os

import numpy as np
import tensorflow as tf


def _parse_example(record):
  f = {
      'examples': tf.io.FixedLenFeature((), tf.string, default_value=''),
      'labels': tf.io.FixedLenFeature((), tf.string, default_value='')
  }
  return tf.io.parse_single_example(record, f)


def _to_tensor(record):
  examples = tf.io.parse_tensor(record['examples'], tf.int64)
  labels = tf.io.parse_tensor(record['labels'], tf.int64)
  return (examples, labels)


def train_examples(training_data_uri, test_data_uri, config_file_uri,
                   output_model_uri, output_metrics_uri):
  train_examples = tf.data.TFRecordDataset(
      [os.path.join(training_data_uri, 'train.tfrecord')])
  test_examples = tf.data.TFRecordDataset(
      [os.path.join(test_data_uri, 'test.tfrecord')])

  train_batches = train_examples.map(_parse_example).map(_to_tensor)
  test_batches = test_examples.map(_parse_example).map(_to_tensor)

  with tf.io.gfile.GFile(os.path.join(config_file_uri, 'config')) as f:
    config = json.loads(f.read())

  model = tf.keras.Sequential([
      tf.keras.layers.Embedding(config['vocab_size'], 16),
      tf.keras.layers.GlobalAveragePooling1D(),
      tf.keras.layers.Dense(1, activation='sigmoid')
  ])

  model.summary()

  model.compile(
      optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

  train_batches = train_batches.shuffle(1000).padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  test_batches = test_batches.padded_batch(
      32, (tf.TensorShape([None]), tf.TensorShape([])))

  history = model.fit(
      train_batches,
      epochs=10,
      validation_data=test_batches,
      validation_steps=30)

  loss, accuracy = model.evaluate(test_batches)

  metrics = {
      'loss': str(loss),
      'accuracy': str(accuracy),
  }

  model_json = model.to_json()
  with tf.io.gfile.GFile(os.path.join(output_model_uri, 'model.json'),
                         'w') as f:
    f.write(model_json)

  with tf.io.gfile.GFile(os.path.join(output_metrics_uri, 'metrics.json'),
                         'w') as f:
    f.write(json.dumps(metrics))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument('--training_data_uri', type=str)
  parser.add_argument('--test_data_uri', type=str)
  parser.add_argument('--config_file_uri', type=str)
  parser.add_argument('--output_model_uri', type=str)
  parser.add_argument('--output_metrics_uri', type=str)

  args = parser.parse_args()

  train_examples(args.training_data_uri, args.test_data_uri,
                 args.config_file_uri, args.output_model_uri,
                 args.output_metrics_uri)


Next, we'll create a Dockerfile that builds a container to run train_examples.py:

In [None]:
%%writefile train/Dockerfile

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest
WORKDIR /pipeline
COPY train_examples.py train_examples.py
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/caip-tfx-custom-container-train:{USER} train

### Define a container-based pipeline

Now we're ready to define a pipeline that uses these containers.

In [None]:
import os

from tfx.orchestration.pipeline import Pipeline
from tfx.types.standard_artifacts import Model
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.experimental.simple_artifacts import File
from tfx.types.experimental.simple_artifacts import Metrics
from tfx.dsl.component.experimental.container_component import create_container_component
from tfx.dsl.component.experimental.placeholders import InputUriPlaceholder
from tfx.dsl.component.experimental.placeholders import OutputUriPlaceholder


import absl
from tfx.orchestration import metadata
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner


def container_based_pipeline():
  generate = create_container_component(
      name='GenerateExamples',
      outputs={
          'training_data': Dataset,
          'test_data': Dataset,
          'config_file': File,
      },
      image = 'gcr.io/{}/caip-tfx-custom-container-generate:{}'.format(PROJECT_ID, USER),
      command=[
        'python', '/pipeline/generate_examples.py',
        '--training_data_uri', OutputUriPlaceholder('training_data'),
        '--test_data_uri', OutputUriPlaceholder('test_data'),
        '--config_file_uri', OutputUriPlaceholder('config_file'),
      ])

  train = create_container_component(
      name='Train',
      inputs={
          'training_data': Dataset,
          'test_data': Dataset,
          'config_file': File,
      },
      outputs={
          'model': Model,
          'metrics': Metrics,
      },
      image='gcr.io/{}/caip-tfx-custom-container-train:{}'.format(PROJECT_ID, USER),
      command=[
        'python', '/pipeline/train_examples.py',
        '--training_data_uri', InputUriPlaceholder('training_data'),
        '--test_data_uri', InputUriPlaceholder('test_data'),
        '--config_file_uri', InputUriPlaceholder('config_file'),
        '--output_model_uri', OutputUriPlaceholder('model'),
        '--output_metrics_uri', OutputUriPlaceholder('metrics'),
      ])

  generate_component = generate()
  train_component = train(
    training_data=generate_component.outputs['training_data'],
    test_data=generate_component.outputs['test_data'],
    config_file=generate_component.outputs['config_file'])

  pipeline_name = "container-based-pipeline-{}".format(USER)
  return Pipeline(
        pipeline_name=pipeline_name,
        enable_cache=True,
        pipeline_root=os.path.join(PIPELINE_ROOT, pipeline_name),
        components=[generate_component, train_component])

container_based_pipeline = container_based_pipeline()

*Now* let's run the pipeline!

In [None]:
config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
    project_id=PROJECT_ID,
    display_name='container-based-pipeline-{}'.format(USER))
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=config,
    output_filename='pipeline.json')
runner.compile(
    container_based_pipeline,
    write_out=True)
my_client.create_run_from_job_spec('pipeline.json')

## Part 4: Caching
In Part 3, the pipeline was run with the cache enabled. Let's try to run the same pipeline again after the one above has finished. We should see it complete immediately since all steps were cached (which you can tell from the green arrow displayed for the components that used caching):


In [None]:
my_client.create_run_from_job_spec('pipeline.json')

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/tfx_cached.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/tfx_cached.png" width="40%"/></a>

Now, let's disable the cache and run it again. This time, it should re-run all steps:

In [None]:
container_based_pipeline.enable_cache = False
runner.compile(container_based_pipeline, write_out=True)
my_client.create_run_from_job_spec('pipeline.json')

## Part 5: Specifying Task-based dependencies

In this section, we will run two steps of a pipeline using task-based dependencies (rather than I/O dependencies) to schedule them. We'll build and use the same container for both steps.

In [None]:
!mkdir -p task_based

In [None]:
%%writefile task_based/task_based_step.py

import os
import tensorflow as tf

from tfx.types.experimental.simple_artifacts import File
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import OutputArtifact, Parameter


@component
def MyTaskBasedStep(
    output_file: OutputArtifact[File], 
    step_number: Parameter[int] = 0, 
    contents: Parameter[str] = ''):
  # Write out whatever string was passed in to the file.
  
  with tf.io.gfile.GFile(os.path.join(output_file.uri, 'output.txt'), 'w') as f:
    f.write('Step {}: Contents: {}'.format(step_number, contents))

Write out the docker file.

In [None]:
%%writefile task_based/Dockerfile

FROM gcr.io/tfx-oss-public/tfx:0.26.0
WORKDIR /pipeline
COPY task_based_step.py task_based_step.py
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

Build a container image for the new component.

In [None]:
!gcloud builds submit --tag gcr.io/{PROJECT_ID}/caip-tfx-custom-task-based:{USER} task_based

Let's create a pipeline with this simple component. Note the `add_upstream_node` call.

In [None]:
%cd task_based

In [None]:
import os

from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

from task_based_step import MyTaskBasedStep

def task_based_pipeline():
    step_1 = MyTaskBasedStep(step_number=1, contents="This is step 1")
    step_2 = MyTaskBasedStep(
        step_number=2, 
        contents="This is step 2", 
        instance_name='MyTaskBasedStep2')
    step_2.add_upstream_node(step_1)
     
    pipeline_name = "task-dependency-based-pipeline-{}".format(USER)
    return Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=os.path.join(PIPELINE_ROOT, pipeline_name),
        components=[step_1, step_2])

task_based_pipeline = task_based_pipeline()

Run the pipeline:

In [None]:
config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
    project_id=PROJECT_ID,
    default_image='gcr.io/{}/caip-tfx-custom-task-based:{}'.format(PROJECT_ID, USER))
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=config,
    output_filename='pipeline.json')
runner.compile(
    task_based_pipeline,
    write_out=True)
my_client.create_run_from_job_spec('pipeline.json')

In [None]:
%cd ..

## Part 6: Chicago Taxi Pipeline

In this section, we'll run the canonical Chicago Taxi Pipeline.

### What's new

If you're familiar with the previous version, here's what's new:

- Support for resolvers: LatestArtifactResolver and LatestBlessedModelResolver
- FileBasedExampleGen
- A Python client SDK to talk to the Alpha service

We'll first do some imports:

In [None]:
from typing import Any, Dict, List, Optional, Text

import absl
import os
import tensorflow as tf
import tensorflow_model_analysis as tfma

from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.dsl.experimental import latest_artifacts_resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline as tfx_pipeline
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import standard_artifacts
from tfx.utils import dsl_utils
from tfx.types import channel

Next, we'll define the [BigQuery](https://cloud.google.com/bigquery/docs) query that we want to use with the `BigQueryExampleGen` component.

In [None]:
# Define the query used for BigQueryExampleGen.
QUERY = """
        SELECT
          pickup_community_area,
          fare,
          EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,
          EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,
          EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,
          UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,
          pickup_latitude,
          pickup_longitude,
          dropoff_latitude,
          dropoff_longitude,
          trip_miles,
          pickup_census_tract,
          dropoff_census_tract,
          payment_type,
          company,
          trip_seconds,
          dropoff_community_area,
          tips
        FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
        WHERE (ABS(FARM_FINGERPRINT(unique_key)) / 0x7FFFFFFFFFFFFFFF)
          < 0.000001"""

# Data location for the CsvExampleGen.
CSV_INPUT_PATH = 'gs://ml-pipeline/sample-data/chicago-taxi/data'

### Create a helper function to construct the TFX pipeline



In [None]:
# Create a helper function to construct a TFX pipeline.
def create_tfx_pipeline(
    query: Optional[Text] = None,
    input_path: Optional[Text] = None,
):
  """Creates an end-to-end Chicago Taxi pipeline in TFX."""
  if bool(query) == bool(input_path):
    raise ValueError('Exact one of query or input_path is expected.')

  if query:
    example_gen = BigQueryExampleGen(query=query)
  else:
    example_gen = CsvExampleGen(input_base=input_path)

  beam_pipeline_args = [
      # Uncomment to use Dataflow.
      # '--runner=DataflowRunner',
      # '--experiments=shuffle_mode=auto',
      '--temp_location=' + os.path.join(PIPELINE_ROOT, 'dataflow', 'temp'),
  #     '--region=us-central1',
  #     '--disk_size_gb=100',
      '--project={}'.format(PROJECT_ID)  # Always needed for BigQueryExampleGen.
  ]

  module_file = 'gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py'

  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)
  example_validator = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)
  # Fetch the latest trained model under the same context for warm-starting.
  latest_model_resolver = ResolverNode(
      instance_name='latest_model_resolver',
      resolver_class=latest_artifacts_resolver.LatestArtifactsResolver,
      model=channel.Channel(type=standard_artifacts.Model))
  trainer = Trainer(
      transformed_examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      base_model=latest_model_resolver.outputs['model'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=trainer_pb2.TrainArgs(num_steps=10),
      eval_args=trainer_pb2.EvalArgs(num_steps=5),
      module_file=module_file,
  )
  # Get the latest blessed model for model validation.
  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=channel.Channel(type=standard_artifacts.Model),
      model_blessing=channel.Channel(type=standard_artifacts.ModelBlessing))

  #   # Set the TFMA config for Model Evaluation and Validation.
  #   # This is  
  # eval_config = tfma.EvalConfig(
  #     model_specs=[tfma.ModelSpec(signature_name='eval')],
  #     metrics_specs=[
  #         tfma.MetricsSpec(
  #             metrics=[tfma.MetricConfig(class_name='ExampleCount')],
  #             thresholds={
  #                 'binary_accuracy':
  #                     tfma.MetricThreshold(
  #                         value_threshold=tfma.GenericValueThreshold(
  #                             lower_bound={'value': 0.5}),
  #                         change_threshold=tfma.GenericChangeThreshold(
  #                             direction=tfma.MetricDirection.HIGHER_IS_BETTER,
  #                             absolute={'value': -1e-10}))
  #             })
  #     ],
  #     slicing_specs=[
  #         tfma.SlicingSpec(),
  #         tfma.SlicingSpec(feature_keys=['trip_start_hour'])
  #     ])

  # This eval_config is for blindly blessing the model.
  eval_config = tfma.EvalConfig(
      model_specs=[
          tfma.ModelSpec(signature_name='eval'),
      ],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.config.MetricConfig(
                  class_name='ExampleCount',
                  # Bless the model as long as there are at least one example.
                  threshold=tfma.config.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0}))),
          ]),
      ],
      slicing_specs=[tfma.SlicingSpec()])

  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      eval_config=eval_config)

  pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=os.path.join(PIPELINE_ROOT, 'model_serving'))))

  components=[
      example_gen, statistics_gen, schema_gen, example_validator, transform,
      latest_model_resolver, trainer, model_resolver, evaluator, pusher
  ]

  return tfx_pipeline.Pipeline(
      pipeline_name='taxi-pipeline-{}'.format(USER),
      pipeline_root=PIPELINE_ROOT,
      components=components,
      beam_pipeline_args=beam_pipeline_args
  )

### Compile and run the pipeline

We'll first use the helper function to create the pipeline, passing it the BigQuery query we defined.

In [None]:
bigquery_taxi_pipeline = create_tfx_pipeline(query=QUERY)

Compile the pipeline.  This creates the `pipeline.json` job spec.

In [None]:
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
        project_id=PROJECT_ID
    ),
    output_filename='pipeline.json')
_ = runner.compile(bigquery_taxi_pipeline, write_out=True)

Now we're ready to run the pipeline!  We're constructing the client using the API_KEY that you created during setup.

In [None]:
from aiplatform.pipelines import client

my_client = client.Client(
    project_id=PROJECT_ID,
    region=REGION,
    api_key=API_KEY
)

Run the pipeline using the client's `create_run_from_job_spec` method.

In [None]:
my_client.create_run_from_job_spec('pipeline.json')

### Monitor the pipeline run in the Cloud Console

Once you've deployed the pipeline run, you can monitor it in the [Cloud Console](https://console.cloud.google.com/ai/platform/pipelines) under **AI Platform (Unified)** > **Pipelines**. 

Click in to the pipeline run to see the run graph, and click on a step to view the job detail and the logs for that step.

As you look at the pipeline graph, you'll see that you can inspect the artifacts passed between the pipeline steps.

In the pipeline run's UI, which shows the DAG, you should be able to see that the model is 'blessed' by the evaluator.


Now let's see if the 'blessed' model can
be retried by another pipeline job as the baseline model for model validation.

First, to expedite the process, let's turn on the cache. Then we'll submit the pipeline for execution.

In [None]:
import time

bigquery_taxi_pipeline.enable_cache = True
runner.compile(bigquery_taxi_pipeline, write_out=True)
my_client.create_run_from_job_spec('pipeline.json', name='big-query-taxi-pipeline-{}-2nd-{}'.format(USER, str(int(time.time()))))

You should be able to see that all the steps before Trainer are cached. And a 'blessed' model is used as the baseline model for the Evaluator component.

<a href="https://storage.googleapis.com/amy-jo/images/kf-pls/tfx_bq.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/kf-pls/tfx_bq.png" width="90%"/></a>



Next, let's try out a taxi pipeline which uses a file location as the data source.

In [None]:
fbeg_taxi_pipeline = create_tfx_pipeline(input_path=CSV_INPUT_PATH)
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
        project_id=PROJECT_ID),
    output_filename='pipeline.json')
runner.compile(fbeg_taxi_pipeline, write_out=True)
my_client.create_run_from_job_spec('pipeline.json', name='file-based-taxi-pipeline-{}-3rd-{}'.format(USER, str(int(time.time()))))

-----------------------------
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.