##### Copyright 2021 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Simple TFX Pipeline for Vertex Pipelines


This notebook-based tutorial will create a simple TFX pipeline and run it using
Google Cloud Vertex Pipelines.  This notebook is based on the TFX pipeline
built in
[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

Google Cloud Vertex Pipelines helps you to automate, monitor, and govern
your ML systems by orchestrating your ML workflow in a serverless manner. You
can define your ML pipelines using Python with TFX, and then execute your
pipelines on Google Cloud. See
[Vertex Pipelines introduction](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)
to learn more about Vertex Pipelines.

## Setup
### Install python packages

We will install required Python packages including TFX and KFP to author ML
pipelines and submit jobs to Vertex Pipelines.

In [57]:
# Use the latest version of pip.
!pip install -q --upgrade pip
!pip install -q --upgrade "tfx[kfp]<2"

#### Restart the runtime

Restart the runtime to ensure the following cells use the updated versions.

You can restart the runtime with following cell:

In [58]:
# docs_infra: no_execute
import sys
import IPython
# if not 'google.colab' in sys.modules:
#   # Automatically restart kernel after installs
#   import IPython
#   app = IPython.Application.instance()
#   app.kernel.do_shutdown(True)

Check the package versions.

In [59]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.8.2
TFX version: 1.8.0
KFP version: 1.8.12


### Set up variables

We will set up some variables used to customize the pipelines below. Following
information is required:

* GCP Project id. You can find your Project ID in the panel with your lab instructions.
* GCP Region to run pipelines. For more information about the regions that
Vertex Pipelines is available in, see the
[Vertex AI locations guide](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability).
* Google Cloud Storage Bucket to store pipeline outputs.

**Enter required values in the cell below before running it**.


In [60]:
GOOGLE_CLOUD_PROJECT = 'licheng-test-06'     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = 'us-central1'     
# GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-gcs'
GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

Set `gcloud` to use your project.

In [61]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}

Updated property [core/project].


In [62]:
PIPELINE_NAME = 'chicago-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
print(PIPELINE_ROOT)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
print(MODULE_ROOT)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
print(DATA_ROOT)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME
print(ENDPOINT_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
print(SERVING_MODEL_DIR)

# print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

gs://licheng-test-06/pipeline_root/chicago-vertex-pipelines
gs://licheng-test-06/pipeline_module/chicago-vertex-pipelines
gs://licheng-test-06/data/chicago-vertex-pipelines
prediction-chicago-vertex-pipelines
gs://licheng-test-06/serving_model/chicago-vertex-pipelines


### Prepare example data
The dataset we are using is the
[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html).

There are four numeric features in this dataset:

* culmen_length_mm
* culmen_depth_mm
* flipper_length_mm
* body_mass_g

All features were already normalized
to have range [0,1]. We will build a classification model which predicts the
`species` of penguins.

We need to make our own copy of the dataset. Because TFX ExampleGen reads
inputs from a directory, we need to create a directory and copy dataset to it
on GCS.

In [None]:
# !gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/

Take a quick look at the CSV file.

In [None]:
# !gsutil cat {DATA_ROOT}/penguins_processed.csv | head

You should be able to see five values. `species` is one of 0, 1 or 2, and all other features should have values between 0 and 1.

In [None]:
# we make our own bucket 
# !gsutil mb -c regional -l us-central1 gs://licheng-test-06

In [6]:
# !gsutil cat {DATA_ROOT}/chicago_taxi.csv | head

CommandException: No URLs matched: gs://licheng-test-06/data/chicago-vertex-pipelines/chicago_taxi.csv


## Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which
consists of three components:

* CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
* Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a _savedmodel format.
* Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.

Our pipeline will be almost identical to a basic [TFX pipeline](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

The only difference is that we don't need to set `metadata_connection_config`
which is used to locate
[ML Metadata](https://www.tensorflow.org/tfx/guide/mlmd) database. Because
Vertex Pipelines uses a managed metadata service, users don't need to care
of it, and we don't need to specify the parameter.

Before actually define the pipeline, we need to write a model code for the
Trainer component first.

### Write model code.

We will create a simple DNN model for classification using TensorFlow Keras API. This model training code will be saved to a separate file.

In this tutorial we will use __Generic Trainer__ of TFX which support Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the `Trainer` component.

In [49]:
_trainer_module_file = 'penguin_trainer.py'

In [50]:
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

# _FEATURE_KEYS = [
#     'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
# ]
# _LABEL_KEY = 'species'


_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude', 'euclidean'
]
_LABEL_KEY = 'trip_total'


_TRAIN_BATCH_SIZE = 40
_EVAL_BATCH_SIZE = 20

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
# _FEATURE_SPEC = {
#     **{
#         feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
#            for feature in _FEATURE_KEYS
#        },
#     _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
# }

_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    # d = keras.layers.Dense(8, activation='relu')(d)
    d = keras.layers.Dense(64, activation='relu')(d)
    d = keras.layers.Dense(32, activation='relu')(d)
    d = keras.layers.Dense(16, activation='relu')(d)
  # outputs = keras.layers.Dense(3)(d)
    outputs = keras.layers.Dense(1, activation='linear')(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  # model.compile(
  #     optimizer=keras.optimizers.Adam(1e-2),
  #     loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
  #     metrics=[keras.metrics.SparseCategoricalAccuracy()])
    
  model.compile(
      optimizer=keras.optimizers.Adam(0.00001),
      loss=tf.keras.losses.MeanSquaredError(),
      metrics=[keras.metrics.MeanSquaredError(),keras.metrics.MeanAbsoluteError()])

  model.summary(print_fn=logging.info)
  return model

# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  print('YAHOOOOOOOOOOOOOO')
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
    
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)

  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)
    
  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=fn_args.model_run_dir, update_freq='batch')
    
  model.fit(
      train_dataset,
      epochs = 50,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Overwriting penguin_trainer.py


Copy the module file to GCS which can be accessed from the pipeline components.
Because model training happens on GCP, we need to upload this model definition. 

Otherwise, you might want to build a container image including the module file
and use the image to run the pipeline.

In [51]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

Copying file://penguin_trainer.py [Content-Type=text/x-python]...
/ [1 files][  5.4 KiB/  5.4 KiB]                                                
Operation completed over 1 objects/5.4 KiB.                                      


### Write a pipeline definition

We will define a function to create a TFX pipeline.

In [52]:
# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str, endpoint_name: str, project_id: str, region: str, use_gpu: bool
                     ) -> tfx.dsl.Pipeline:
    
  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }

  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

    
  """Creates a five pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'],infer_feature_shape=False)

  example_validator = tfx.components.ExampleValidator(statistics=statistics_gen.outputs['statistics'],schema=schema_gen.outputs['schema'])

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=1500), #100
      eval_args=tfx.proto.EvalArgs(num_steps=1500), #5
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

    
  # # Uses user-provided Python function that trains a model.
  # trainer = tfx.components.Trainer(
  #     module_file=module_file,
  #     examples=example_gen.outputs['examples'],
  #     train_args=tfx.proto.TrainArgs(num_steps=1500), #100
  #     eval_args=tfx.proto.EvalArgs(num_steps=1500)) #5
    
  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }
    
  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'
    
  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  # # Pushes the model to a filesystem destination.
  # pusher = tfx.components.Pusher(
  #     model=trainer.outputs['model'],
  #     push_destination=tfx.proto.PushDestination(
  #         filesystem=tfx.proto.PushDestination.Filesystem(
  #             base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      example_validator,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      enable_cache=True)

## Run the pipeline on Vertex Pipelines.

TFX provides multiple orchestrators to run your pipeline. In this tutorial we
will use the Vertex Pipelines together with the Kubeflow V2 dag runner.

We need to define a runner to actually run the pipeline. You will compile
your pipeline into our pipeline definition format using TFX APIs.

In [53]:
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        use_gpu=False,
        serving_model_dir=SERVING_MODEL_DIR))

running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib
installing to /tmp/tmpb38ccgbn
running install
running install_lib
copying build/lib/penguin_trainer.py -> /tmp/tmpb38ccgbn
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpb38ccgbn/tfx_user_code_Trainer-0.0+a15f8fd939386b8143f73f12969d1eb5efa19b404a328ccd845af77f24053619-py3.7.egg-info
running install_scripts
creating /tmp/tmpb38ccgbn/tfx_user_code_Trainer-0.0+a15f8fd939386b8143f73f12969



The generated definition file can be submitted using kfp client.

In [54]:
!gsutil cp {PIPELINE_DEFINITION_FILE} {MODULE_ROOT}/

Copying file://chicago-vertex-pipelines_pipeline.json [Content-Type=application/json]...
/ [1 files][ 10.9 KiB/ 10.9 KiB]                                                
Operation completed over 1 objects/10.9 KiB.                                     


In [55]:
PIPELINE_DEFINITION_FILE = 'gs://licheng-test-06/pipeline_module/chicago-vertex-pipelines/chicago-vertex-pipelines_pipeline.json'

In [56]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.run(sync=False)

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-vertex-pipelines-20220611051353?project=295968001506


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chicago-vertex-pipelines-20220611051353?project=295968001506


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob run completed. Resource name: projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob run completed. Resource name: projects/295968001506/locations/us-central1/pipelineJobs/chicago-vertex-pipelines-20220611051353


Visit __Vertex AI > Pipelines__ in your Google Cloud Console page to see the progress.

Click on your `penguin-vertex-pipelines-xxx` run:

![pipeline_start](01_pipeline_start.png)

Explore the information displayed in each step while you wait for the job to progress.

On completion, your pipeline UI should look similar to this:

![pipeline_end](02_pipeline_completed.png)

This job will take about 15 minutes in total to complete. Once complete, return to the lab to check your progress.

In [63]:
ENDPOINT_ID='625323049041788928'     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')

In [64]:
# docs_infra: no_execute
import numpy as np
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'pickup_latitude':[41.946294536],
    'pickup_longitude':[-87.654298084],
    'dropoff_latitude':[41.949829346],
    'dropoff_longitude': [-87.64396537],
    'euclidean': [940.592272121818],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('trip_total:', response.predictions[0])

trip_total: [7.83638]


In [None]:
# https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_vertex_training