In [10]:
# Use the latest version of pip.
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"



In [11]:
# Restart runtime
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

In [2]:
# Login to google
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

In [3]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.8.1
TFX version: 1.8.0
KFP version: 1.8.12


In [4]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [5]:
GOOGLE_CLOUD_PROJECT = 'ml-eng-cs611-group-project'     
GOOGLE_CLOUD_REGION = 'asia-southeast1'      
GCS_BUCKET_NAME = 'taxi-final-mle'       

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

!gcloud config set project {GOOGLE_CLOUD_PROJECT}

Updated property [core/project].


In [6]:
from datetime import date
today = date.today()

In [7]:
from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

PIPELINE_NAME = 'taxi-vertex-pipelines'
SCHEMA_PIPELINE_NAME = "taxi-tfdv-schema"

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)
SCHEMA_PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, SCHEMA_PIPELINE_NAME)

# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = 'gs://{}/metadata_root/{}/metadata.db'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + str(today) + '-' + PIPELINE_NAME

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs://taxi-final-mle/pipeline_root/taxi-vertex-pipelines


## Generate Prelim Schema

In [8]:
def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [9]:
tfx.orchestration.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metada

INFO:absl:Processing input csv data gs://taxi-final-mle/data/taxi-vertex-pipelines/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 21 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "gs://taxi-final-mle/pipeline_root/taxi-tfdv-schema/CsvExampleGen/examples/21"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:31,total_bytes:230561402,xor_checksum:1656348520,sum_checksum:51347819224"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "taxi-tfdv-schema:2022-06-28T08:41:07.998152:CsvExampleGen:examples:0"
  }
}
custom_p

## Review outputs of pipeline

In [10]:
from ml_metadata.proto import metadata_store_pb2

from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,
                                          [metadata_store_pb2.Event.OUTPUT])

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

### Examine outputs from pipeline execution

In [11]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [12]:
visualize_artifacts(stats_artifacts)

In [13]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'air_temperature',FLOAT,required,,-
'cos_day',FLOAT,required,,-
'cos_hour',FLOAT,required,,-
'cos_mth',FLOAT,required,,-
'rainfall',FLOAT,required,,-
'relative_humidity',FLOAT,required,,-
'sin_day',FLOAT,required,,-
'sin_hour',FLOAT,required,,-
'sin_mth',FLOAT,required,,-
'taxi_count',INT,required,,-


In [14]:
_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'gs://{}/metadata_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
!gsutil cp {_generated_path} {SCHEMA_PATH}/

Copying gs://taxi-final-mle/pipeline_root/taxi-tfdv-schema/SchemaGen/schema/23/schema.pbtxt...
/ [1 files][  1.6 KiB/  1.6 KiB]                                                
Operation completed over 1 objects/1.6 KiB.                                      


## Write model code

In [12]:
SCHEMA_PATH = 'gs://{}/metadata_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

In [24]:
_trainer_module_file = str(today) + '_taxi_trainer.py'

In [25]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from keras.models import Sequential
from keras.layers import Activation, Dense


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_LABEL_KEY = 'y_30'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()

def _make_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for taxi prediction.

  Returns:
    A Keras Model.
  """

  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(10, activation='relu')(d)
  outputs = keras.layers.Dense(1)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.MeanSquaredError(),
      metrics=[keras.metrics.RootMeanSquaredError()])

  model.summary(print_fn=logging.info)
  return model

# Read `use_gpu` from the custom_config of the Trainer.
# if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model(schema)
  else:
    with strategy.scope():
      model = _make_keras_model(schema)

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing 2022-06-27_taxi_trainer.py


In [26]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

Copying file://2022-06-27_taxi_trainer.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]                                                
Operation completed over 1 objects/4.1 KiB.                                      


## Writing Pipeline Function

In [9]:
import tensorflow_model_analysis as tfma
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, #serving_model_dir: str,
                     endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool, metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates components taxi pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Configuration for Vertex AI Training.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })


  ## STAT GEN
  statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs['examples'])

  # Import the schema.
  schema_importer = tfx.dsl.Importer(
      source_uri=schema_path,
      artifact_type=tfx.types.standard_artifacts.Schema).with_id(
          'schema_importer')

  ## EXAMPLE VALIDATOR
  example_validator = tfx.components.ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_importer.outputs['result'])

  
  # Trains a model using Vertex AI Training.
  # specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      ## ADD-ON SCHEME
      schema=schema_importer.outputs['result'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  ## EVALUATOR
  # Get the latest blessed model for Evaluator.
  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')
    
  # Uses TFMA to compute evaluation statistics over features of a model and
  #   perform quality validation of a candidate model (compared to a baseline).

  eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='y_30')],
      slicing_specs=[t.
          tfma.SlicingSpec(),
          # Slice metrics by hour
          tfma.SlicingSpec(feature_keys=['sin_hour', 'cos_hour']),
          ],
      metrics_specs=[
          tfma.MetricsSpec(per_slice_thresholds={
              'RootMeanSquaredError':
                  tfma.PerSliceMetricThresholds(thresholds=[
                      tfma.PerSliceMetricThreshold(
                          slicing_specs=[tfma.SlicingSpec()],
                          threshold=tfma.MetricThreshold(
                              value_threshold=tfma.GenericValueThreshold(
                                   lower_bound={'value': 0.6}),
                              # Change threshold will be ignored if there is no
                              # baseline model resolved from MLMD (first run).
                              change_threshold=tfma.GenericChangeThreshold(
                                  direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                  absolute={'value': -1e-10}))
                       )]),
          })],
      )
  evaluator = tfx.components.Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      eval_config=eval_config)

  # Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4',
  }
  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'
    

  # Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      ## w evaluator
      model_blessing=evaluator.outputs['blessing'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  # Following components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_importer,
      example_validator,
      trainer,
      model_resolver,
      evaluator,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

INFO:absl:imported tensorflow_io
INFO:absl:tensorflow_ranking is not available: No module named 'tensorflow_ranking'
INFO:absl:tensorflow_text is not available: No module named 'tensorflow_text'
INFO:absl:tensorflow_decision_forests is not available: No module named 'tensorflow_decision_forests'
INFO:absl:struct2tensor is not available: No module named 'struct2tensor'
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
INFO:apac

NameError: name 'tfx' is not defined

In [93]:
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        schema_path=SCHEMA_PATH,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        #serving_model_dir=SERVING_MODEL_DIR,
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        metadata_path=METADATA_PATH,
        use_gpu=False))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/tmp/tmpka22_5am/2022-06-27_taxi_trainer.py' (including modules: ['2022-06-27_taxi_trainer']).
INFO:absl:User module package has hash fingerprint version f1fdb35d90010b2a892d542a90a2eda519cc0477ec16c767f2bccb7c8871d611.
INFO:absl:Executing: ['/opt/conda/bin/python3.7', '/tmp/tmp9v000cxn/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpsjr3qny0', '--dist-dir', '/tmp/tmpekvn5ab9']
INFO:absl:Successfully built user code wheel distribution at 'gs://taxi-final-mle/pipeline_root/taxi-vertex-pipelines/_wheels/tfx_user_code_Trainer-0.0+f1fdb35d90010b2a892d542a90a2eda519cc0477ec16c767f2bccb7c8871d611-py3-none-any.whl'; target user module is '2022-06-27_taxi_trainer'.
INFO:absl:Full user module path is '2022-06-27_taxi_trainer@gs://taxi-final-mle/pipeline_root/taxi-vertex-pipelines/_wheels/tfx_user_code

In [97]:
#from kfp.v2 import compiler
#compiler.Compiler().compile(pipeline_func=pipeline_name,
#        package_path='image_classif_pipeline.json')

NameError: name 'pipeline_name' is not defined

In [4]:
GOOGLE_CLOUD_PROJECT = 'ml-eng-cs611-group-project' 
GOOGLE_CLOUD_REGION = 'asia-southeast1'      
GCS_BUCKET_NAME = 'taxi-final-mle' 
PIPELINE_NAME = 'taxi-vertex-pipelines'
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

In [8]:
logging.__version__

'0.5.1.2'

In [5]:
## Create and Run the pipeline
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/154529812159/locations/asia-southeast1/pipelineJobs/taxi-vertex-pipelines-20220628065652
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/154529812159/locations/asia-southeast1/pipelineJobs/taxi-vertex-pipelines-20220628065652')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/taxi-vertex-pipelines-20220628065652?project=154529812159


In [3]:
PIPELINE_DEFINITION_FILE

'taxi-vertex-pipelines_pipeline.json'

## Test with a prediction request

In [17]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging

In [15]:
ENDPOINT_ID='1152815951490580480'     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')

In [19]:
# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
instances = [{ 
    "taxi_count":[10], 
    "air_temperature":[25.2], 
    "rainfall": [0], 
    "relative_humidity":[86.9], 
    "sin_day":[1], 
    "cos_day":[0.5], 
    "sin_hour":[0.25881904510252], 
    "cos_hour":[0.965925826289068], 
    "sin_mth":[0.56], 
    "cos_mth":[0.5],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('Taxis Avail::', int(response.predictions[0][0]))

Taxis Avail:: 7
