In [1]:
# Use the latest version of pip.
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"

Collecting tfx[kfp]<2
  Downloading tfx-1.13.0-py3-none-any.whl (2.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m40.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting ml-pipelines-sdk==1.13.0 (from tfx[kfp]<2)
  Downloading ml_pipelines_sdk-1.13.0-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m58.4 MB/s[0m eta [36m0:00:00[0m
Collecting ml-metadata<1.14.0,>=1.13.1 (from tfx[kfp]<2)
  Downloading ml_metadata-1.13.1-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (6.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.7/6.7 MB[0m [31m88.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hCollecting packaging<21,>=20 (from tfx[kfp]<2)
  Downloading packaging-20.9-py2.py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.9/40.9 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting portp

In [43]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

2023-07-12 12:55:59.538726: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


TensorFlow version: 2.12.1




TFX version: 1.13.0
KFP version: 1.8.22


In [44]:
from tfx import v1 as tfx
import tensorflow_data_validation as tfdv
import sys
from tfx.components import ImportExampleGen
from tfx.components import CsvExampleGen
from tfx.components import StatisticsGen
from tfx.components import SchemaGen
from tfx.components import Transform
from tfx.v1 import proto
from tfx.proto import example_gen_pb2
from tfx.components import Trainer
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import Tuner
from tfx.components import Evaluator
import keras_tuner
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.dsl.components.common import resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
import tensorflow_model_analysis as tfma
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import glob

In [45]:
GOOGLE_CLOUD_PROJECT = 'itp-ml-sndbx'     
GOOGLE_CLOUD_REGION = 'us-west1'      
GCS_BUCKET_NAME = 'gcp-ml-pipeline'          

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

In [46]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}

Updated property [core/project].


In [47]:
PIPELINE_NAME = 'taxicab-pipeline'
PIPELINE_ROOT = 'gs://gcp-ml-pipeline/pipeline/taxicab-pipeline'
DATA_ROOT = 'gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/data'
SERVING_MODEL_DIR = 'gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/serving_model'
METADATA_ROOT = 'gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/metadata'
LABEL_NAME = 'fare_amount'
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME


In [48]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str, endpoint_name: str, serving_model_dir: str, metadata_root: str, region: str
                     ) -> tfx.dsl.Pipeline:
    
    output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))

    example_gen = CsvExampleGen(input_base='gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/data', output_config=output)
    print(example_gen)
    
    stats_options = tfdv.StatsOptions(label_feature=LABEL_NAME)
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'], stats_options=stats_options)
    
    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
    
    transform = Transform(examples = example_gen.outputs['examples'], schema = schema_gen.outputs['schema'], module_file='./preprocessing_fn.py', materialize=True)  
    
    tuner = Tuner(
    module_file='./tuner.py',
    examples=transform.outputs['transformed_examples'],
    transform_graph = transform.outputs['transform_graph'],
    schema=transform.outputs['post_transform_schema'],
    train_args=proto.TrainArgs(splits=['train'], num_steps=2500),
    eval_args=proto.EvalArgs(splits=['eval'], num_steps=50),
    # tune_args=tune_args
    )
    
    trainer = Trainer(
    module_file='./model.py',
    examples=transform.outputs['transformed_examples'],
    transform_graph = transform.outputs['transform_graph'],
    schema=transform.outputs['post_transform_schema'],
    hyperparameters = tuner.outputs['best_hyperparameters'],
    train_args=proto.TrainArgs(splits=['train'], num_steps=7500),
    eval_args=proto.EvalArgs(splits=['eval'], num_steps=2500))

    
    example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
    
    '''infra_validator = tfx.components.InfraValidator(
      model=trainer.outputs['model'],
      examples=example_gen.outputs['examples'],
      serving_spec=tfx.proto.ServingSpec(
          # TODO(b/244254788): Roll back to the 'latest' tag.
          tensorflow_serving=tfx.proto.TensorFlowServing(tags=['latest']),
          local_docker=tfx.proto.LocalDockerConfig()
      ),
      request_spec=tfx.proto.RequestSpec(
          tensorflow_serving=tfx.proto.TensorFlowServingRequestSpec(),
          # If this flag is set, InfraValidator will produce a model with
          # warmup requests (in its outputs['blessing']).
          make_warmup=True),
      validation_spec=tfx.proto.ValidationSpec(
        # How much time to wait for model to load before automatically making
        # validation fail.
        max_loading_time_seconds=60,
        # How many times to retry if infra validation fails.
        num_tries=3
        )
    )'''
    
    model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing))

    eval_config = tfma.EvalConfig(
          model_specs=[
              tfma.ModelSpec(signature_name='serving_default', label_key=LABEL_NAME, preprocessing_function_names=['transform_features'])] ,
          slicing_specs=[
              tfma.SlicingSpec(),
              # tfma.SlicingSpec(feature_keys=['trip_start_hour'])
          ],
          metrics_specs=[
              tfma.MetricsSpec(
                  thresholds={
                      'mse':
                          tfma.MetricThreshold(
                              value_threshold=tfma.GenericValueThreshold(
                                  upper_bound={'value': 2e2}),
                              change_threshold=tfma.GenericChangeThreshold(
                                  direction=tfma.MetricDirection.LOWER_IS_BETTER,
                                  absolute={'value': 1e-1}))
                  }
          )]
    )


    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )

    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest'
    vertex_serving_spec = {
      'project_id': GOOGLE_CLOUD_PROJECT,
      'endpoint_name': endpoint_name,
  }
        
    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
        model=trainer.outputs['model'],
         custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      }
    )

    components = [
              example_gen,
              statistics_gen,
              schema_gen,
              transform,
              trainer,
              example_validator,
              tuner,
              evaluator,
              pusher,
              model_resolver,
        ]
        # }}}
        # {{{ Pipeline Definition
    pipeline_name=pipeline_name
    pipeline_root=pipeline_root
    metadata_path='gs://gcp-ml-pipeline/taxicab-pipeline/metadata/metadata.db'
        
    pipeline_options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
        ])

    beam_pipeline_args = [
            "--runner=PortableRunner",
            "--job_endpoint=localhost:8099",
            "--environment_type=LOOPBACK",
            "--spark_version=3",
            "--machine_type=n1-standard-8"
        ]

    pipeline = tfx.dsl.Pipeline(
              pipeline_name=pipeline_name,
              pipeline_root=pipeline_root,
              metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(metadata_path),
              components=components,
        enable_cache=True)
        
    return pipeline



In [49]:
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        endpoint_name=ENDPOINT_NAME,
        serving_model_dir=SERVING_MODEL_DIR,
        metadata_root=METADATA_ROOT,
        region=GOOGLE_CLOUD_REGION))

CsvExampleGen(spec: <tfx.types.standard_component_specs.FileBasedExampleGenSpec object at 0x7f74dae2d9d0>, executor_spec: <tfx.dsl.components.base.executor_spec.BeamExecutorSpec object at 0x7f74dae2d670>, driver_class: <class 'tfx.components.example_gen.driver.FileBasedDriver'>, component_id: CsvExampleGen, inputs: {}, outputs: {'examples': OutputChannel(artifact_type=Examples, producer_component_id=CsvExampleGen, output_key=examples, additional_properties={}, additional_custom_properties={}, _input_trigger=None})
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying model.py -> build/lib
copying tfx-client.py -> build/lib
copying tuner.py -> build/lib
copying transparency.py -> build/lib
copying preprocessing_fn.py -> build/lib
installing to /var/tmp/tmpxtdtzc34
running install
running install_lib
copying build/lib/model.py -> /var/tmp/tmpxtdtzc34
copying build/lib/tuner.py -> /var/tmp/tmpxtdtzc34
copying build/lib/preprocessing_fn.py -> /var/tmp

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying model.py -> build/lib
copying tfx-client.py -> build/lib
copying tuner.py -> build/lib
copying transparency.py -> build/lib
copying preprocessing_fn.py -> build/lib
installing to /var/tmp/tmpi3017w18
running install
running install_lib
copying build/lib/model.py -> /var/tmp/tmpi3017w18
copying build/lib/tuner.py -> /var/tmp/tmpi3017w18
copying build/lib/preprocessing_fn.py -> /var/tmp/tmpi3017w18
copying build/lib/tfx-client.py -> /var/tmp/tmpi3017w18
copying build/lib/transparency.py -> /var/tmp/tmpi3017w18
running install_egg_info


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running egg_info
creating tfx_user_code_Tuner.egg-info
writing tfx_user_code_Tuner.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Tuner.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Tuner.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
Copying tfx_user_code_Tuner.egg-info to /var/tmp/tmpi3017w18/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3.9.egg-info
running install_scripts
creating /var/tmp/tmpi3017w18/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.dist-info/WHEEL
creating '/var/tmp/tmpai1qc8tq/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl' and adding '/var/tmp/tmpi3017w18' to it
adding 'model.py'
adding 'preprocessing_fn.py'
adding 

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmpgs9zsncj/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3.9.egg-info
running install_scripts
creating /var/tmp/tmpgs9zsncj/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.dist-info/WHEEL
creating '/var/tmp/tmptm4rq25p/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl' and adding '/var/tmp/tmpgs9zsncj' to it
adding 'model.py'
adding 'prepro

In [50]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path='./taxicab-pipeline_pipeline.json',
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/853203979454/locations/us-west1/pipelineJobs/taxicab-pipeline-20230712125702


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/853203979454/locations/us-west1/pipelineJobs/taxicab-pipeline-20230712125702


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/853203979454/locations/us-west1/pipelineJobs/taxicab-pipeline-20230712125702')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/853203979454/locations/us-west1/pipelineJobs/taxicab-pipeline-20230712125702')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/taxicab-pipeline-20230712125702?project=853203979454


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/taxicab-pipeline-20230712125702?project=853203979454


In [41]:
def explain_sample(project: str, location: str, endpoint_id: str, instance_dict: Dict):

    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint(endpoint_id)

    response = endpoint.explain(instances=[instance_dict], parameters={})

    for explanation in response.explanations:
        print(" explanation")
        # Feature attributions.
        attributions = explanation.attributions
        for attribution in attributions:
            print("  attribution")
            print("   baseline_output_value:", attribution.baseline_output_value)
            print("   instance_output_value:", attribution.instance_output_value)
            print("   output_display_name:", attribution.output_display_name)
            print("   approximation_error:", attribution.approximation_error)
            print("   output_name:", attribution.output_name)
            output_index = attribution.output_index
            for output_index in output_index:
                print("   output_index:", output_index)

    for prediction in response.predictions:
        print(prediction)


In [51]:
explain_sample(
    project='853203979454',
    location='us-west1',
    endpoint_id='970323409558831104',
    instance_dict=input_1
)

FailedPrecondition: 400 Deployed model 8317136961055752192 does not support explanation.

In [29]:
from typing import Dict

from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value


def predict_tabular_regression(
    project: str,
    endpoint_id: str,
    instance_dict: Dict,
    location: str = "us-west1",
    api_endpoint: str = "us-west1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)
    # for more info on the instance schema, please use get_model_sample.py
    # and look at the yaml found in instance_schema_uri
    instance = json_format.ParseDict(instance_dict, Value())
    instances = [instance]
    parameters_dict = {}
    parameters = json_format.ParseDict(parameters_dict, Value())
    endpoint = client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    # See gs://google-cloud-aiplatform/schema/predict/prediction/tabular_regression_1.0.0.yaml for the format of the predictions.
    predictions = response.predictions
    for prediction in predictions:
        print(" prediction:", dict(prediction))


In [39]:
input_1={
      "key":"2010-10-19 00:01:17.0000003",
      "pickup_datetime":"2010-10-19 00:01:17 UTC",
      "pickup_longitude":-73.94872283935547,
      "pickup_latitude":40.777488708496094,
      "dropoff_longitude":-73.94956970214844,
      "dropoff_latitude":40.814048767089844,
      "passenger_count":1
    }
  


In [52]:
predict_tabular_regression(
    project='853203979454',
    endpoint_id='970323409558831104',
    location='us-west1',
    instance_dict=input_1,
    api_endpoint="us-west1-aiplatform.googleapis.com"
)

InvalidArgument: 400 {
    "error": "Failed to process element: 0 key: dropoff_latitude of 'instances' list. Error: INVALID_ARGUMENT: JSON object: does not have named input: dropoff_latitude"
}

In [57]:
import json

features = {
    'key': '2010-10-19 00:01:17.0000003',
    'pickup_datetime': '2010-10-19 00:01:17 UTC',
    'pickup_longitude': -73.94872283935547,
    'pickup_latitude': 40.777488708496094,
    'dropoff_longitude': -73.94956970214844,
    'dropoff_latitude': 40.814048767089844,
    'passenger_count': 1
}

json_data = json.dumps(features)
print(json_data)


{"key": "2010-10-19 00:01:17.0000003", "pickup_datetime": "2010-10-19 00:01:17 UTC", "pickup_longitude": -73.94872283935547, "pickup_latitude": 40.777488708496094, "dropoff_longitude": -73.94956970214844, "dropoff_latitude": 40.814048767089844, "passenger_count": 1}


In [58]:
predict_custom_trained_model_sample(
    project='853203979454',
    endpoint_id='970323409558831104',
    location='us-west1',
    instances= {"instances": ["2010-10-19 00:01:17.0000003,2010-10-19 00:01:17 UTC,-73.94872283935547,40.777488708496094, -73.94956970214844,40.814048767089844,1"]}
)

InvalidArgument: 400 {
    "error": "Failed to process element: 0 key: instances of 'instances' list. Error: INVALID_ARGUMENT: JSON object: does not have named input: instances"
}

In [54]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [55]:
context = InteractiveContext()



In [56]:
tfrecord_filename = ['./pipeline_taxicab-pipeline_853203979454_taxicab-pipeline-20230628122601_CsvExampleGen_2303745141027897344_examples_Split-train_data_tfrecord-00000-of-00001.gz']


dataset = tf.data.TFRecordDataset(tfrecord_filename, compression_type="GZIP")

for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

features {
  feature {
    key: "dropoff_latitude"
    value {
      float_list {
        value: 40.814048767089844
      }
    }
  }
  feature {
    key: "dropoff_longitude"
    value {
      float_list {
        value: -73.94956970214844
      }
    }
  }
  feature {
    key: "fare_amount"
    value {
      float_list {
        value: 11.300000190734863
      }
    }
  }
  feature {
    key: "key"
    value {
      bytes_list {
        value: "2010-10-19 00:01:17.0000003"
      }
    }
  }
  feature {
    key: "passenger_count"
    value {
      int64_list {
        value: 1
      }
    }
  }
  feature {
    key: "pickup_datetime"
    value {
      bytes_list {
        value: "2010-10-19 00:01:17 UTC"
      }
    }
  }
  feature {
    key: "pickup_latitude"
    value {
      float_list {
        value: 40.777488708496094
      }
    }
  }
  feature {
    key: "pickup_longitude"
    value {
      float_list {
        value: -73.94872283935547
      }
    }
  }
}



2023-07-12 13:28:29.445007: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [1]
	 [[{{node Placeholder/_0}}]]


In [13]:
instance={
    'key' : '2010-10-19 00:01:17.0000003',
    'pickup_datetime': '2010-10-19 00:01:17 UTC',
    'pickup_longitude': -73.94872283935547,
    'pickup_latitude': 40.777488708496094,
    'dropoff_longitude': -73.94956970214844,
    'dropoff_latitude' : 40.814048767089844,
    'passenger_count': 1,
   
}

response=endpoint.predict([instance])

print(response)
print('Value predicted for the first sample: ', response.predictions[0][0])

InvalidArgument: 400 {
    "error": "Failed to process element: 0 key: dropoff_latitude of 'instances' list. Error: Invalid argument: JSON object: does not have named input: dropoff_latitude"
}

In [91]:
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.orchestration import pipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import DataflowRunner
from tfx.orchestration import metadata
from ml_metadata.proto import metadata_store_pb2
from typing import Optional, Text, List, Dict, Any

In [92]:
PIPELINE_NAME = 'dataflow-pipeline'
PIPELINE_ROOT = 'gs://gcp-ml-pipeline/pipeline/dataflow-pipeline'
DATA_ROOT = 'gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/data'
SERVING_MODEL_DIR = 'gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/serving_model'
METADATA_ROOT = './dataflow-pipeline/metadata/metadata.db'
LABEL_NAME = 'fare_amount'


In [93]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     serving_model_dir: str,  metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
                     ) -> tfx.dsl.Pipeline:
    
    output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))

    example_gen = CsvExampleGen(input_base='gs://gcp-ml-pipeline/pipeline/taxicab-pipeline/data', output_config=output)
    print(example_gen)
    
    stats_options = tfdv.StatsOptions(label_feature=LABEL_NAME)
    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'], stats_options=stats_options)
    
    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
    
    transform = Transform(examples = example_gen.outputs['examples'], schema = schema_gen.outputs['schema'], module_file='./preprocessing_fn.py', materialize=True)  
    
    tuner = Tuner(
    module_file='./tuner.py',
    examples=transform.outputs['transformed_examples'],
    transform_graph = transform.outputs['transform_graph'],
    schema=transform.outputs['post_transform_schema'],
    train_args=proto.TrainArgs(splits=['train'], num_steps=2500),
    eval_args=proto.EvalArgs(splits=['eval'], num_steps=50),
    # tune_args=tune_args
    )
    
    trainer = Trainer(
    module_file='./model.py',
    examples=transform.outputs['transformed_examples'],
    transform_graph = transform.outputs['transform_graph'],
    schema=transform.outputs['post_transform_schema'],
    hyperparameters = tuner.outputs['best_hyperparameters'],
    train_args=proto.TrainArgs(splits=['train'], num_steps=7500),
    eval_args=proto.EvalArgs(splits=['eval'], num_steps=2500))
    
    example_validator = ExampleValidator(statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
    
    '''infra_validator = tfx.components.InfraValidator(
      model=trainer.outputs['model'],
      examples=example_gen.outputs['examples'],
      serving_spec=tfx.proto.ServingSpec(
          # TODO(b/244254788): Roll back to the 'latest' tag.
          tensorflow_serving=tfx.proto.TensorFlowServing(tags=['latest']),
          local_docker=tfx.proto.LocalDockerConfig()
      ),
      request_spec=tfx.proto.RequestSpec(
          tensorflow_serving=tfx.proto.TensorFlowServingRequestSpec(),
          # If this flag is set, InfraValidator will produce a model with
          # warmup requests (in its outputs['blessing']).
          make_warmup=True),
      validation_spec=tfx.proto.ValidationSpec(
        # How much time to wait for model to load before automatically making
        # validation fail.
        max_loading_time_seconds=60,
        # How many times to retry if infra validation fails.
        num_tries=3
        )
    )'''
    
    model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing))

    eval_config = tfma.EvalConfig(
          model_specs=[
              tfma.ModelSpec(signature_name='serving_default', label_key=LABEL_NAME, preprocessing_function_names=['transform_features'])] ,
          slicing_specs=[
              tfma.SlicingSpec(),
              # tfma.SlicingSpec(feature_keys=['trip_start_hour'])
          ],
          metrics_specs=[
              tfma.MetricsSpec(
                  thresholds={
                      'mse':
                          tfma.MetricThreshold(
                              value_threshold=tfma.GenericValueThreshold(
                                  upper_bound={'value': 2e2}),
                              change_threshold=tfma.GenericChangeThreshold(
                                  direction=tfma.MetricDirection.LOWER_IS_BETTER,
                                  absolute={'value': 1e-1}))
                  }
          )]
    )


    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )
    # }}}
    # {{{ Pusher
    pusher = Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        #infra_blessing=infra_validator.outputs['blessing'],
        push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
        base_directory=serving_model_dir)
      )
    )

    components = [
              example_gen,
              statistics_gen,
              schema_gen,
              transform,
              trainer,
              example_validator,
              tuner,
              evaluator,
              pusher,
              model_resolver,
        ]
        # }}}
        # {{{ Pipeline Definition
    pipeline_name=pipeline_name
    pipeline_root=pipeline_root
    
    beam_pipeline_args = [
    '--runner=DataflowRunner',
    '--project=itp-ml-sndbx',
    '--region=us-west1',
    '--temp_location=gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/tmp',
]  
    
    pipeline_t = tfx.dsl.Pipeline(
              pipeline_name=pipeline_name,
              pipeline_root=pipeline_root,
              metadata_connection_config=metadata_connection_config,
              components=components,
              beam_pipeline_args=beam_pipeline_args,
              enable_cache=True)
        
    return pipeline_t



In [94]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

PIPELINE_DEFINITION_FILE = './dataflow-pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)

_=runner.run(
    pipeline=_create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        serving_model_dir=SERVING_MODEL_DIR,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_ROOT)))
    

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path='./dataflow-pipeline.json',
                                display_name=PIPELINE_NAME)
job.submit()
 

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.


CsvExampleGen(spec: <tfx.types.standard_component_specs.FileBasedExampleGenSpec object at 0x7ff47fcce130>, executor_spec: <tfx.dsl.components.base.executor_spec.BeamExecutorSpec object at 0x7ff49c143e80>, driver_class: <class 'tfx.components.example_gen.driver.FileBasedDriver'>, component_id: CsvExampleGen, inputs: {}, outputs: {'examples': OutputChannel(artifact_type=Examples, producer_component_id=CsvExampleGen, output_key=examples, additional_properties={}, additional_custom_properties={}, _input_trigger=None})


INFO:absl:Generating ephemeral wheel package for '/home/jupyter/taxicab-pipeline/preprocessing_fn.py' (including modules: ['model', 'tfx-client', 'tuner', 'transparency', 'preprocessing_fn']).
INFO:absl:User module package has hash fingerprint version a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.
INFO:absl:Executing: ['/opt/conda/bin/python', '/var/tmp/tmp0vxdcp17/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/var/tmp/tmpr5xrpiq8', '--dist-dir', '/var/tmp/tmp9xuaqkss']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying model.py -> build/lib
copying tfx-client.py -> build/lib
copying tuner.py -> build/lib
copying transparency.py -> build/lib
copying preprocessing_fn.py -> build/lib
installing to /var/tmp/tmpr5xrpiq8
running install
running install_lib
copying build/lib/model.py -> /var/tmp/tmpr5xrpiq8
copying build/lib/tuner.py -> /var/tmp/tmpr5xrpiq8
copying build/lib/preprocessing_fn.py -> /var/tmp/tmpr5xrpiq8
copying build/lib/tfx-client.py -> /var/tmp/tmpr5xrpiq8
copying build/lib/transparency.py -> /var/tmp/tmpr5xrpiq8
running install_egg_info


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /var/tmp/tmpr5xrpiq8/tfx_user_code_Transform-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3.9.egg-info
running install_scripts
creating /var/tmp/tmpr5xrpiq8/tfx_user_code_Transform-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.dist-info/WHEEL
creating '/var/tmp/tmp9xuaqkss/tfx_user_code_Transform-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl' and adding '/var/tmp/tmpr5xrpiq8' to it
adding 'mo

INFO:absl:Successfully built user code wheel distribution at 'gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Transform-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'; target user module is 'preprocessing_fn'.
INFO:absl:Full user module path is 'preprocessing_fn@gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Transform-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'
INFO:absl:Generating ephemeral wheel package for '/home/jupyter/taxicab-pipeline/tuner.py' (including modules: ['model', 'tfx-client', 'tuner', 'transparency', 'preprocessing_fn']).
INFO:absl:User module package has hash fingerprint version a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.
INFO:absl:Executing: ['/opt/conda/bin/python', '/var/tmp/tmp2gigppet/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/var/tmp/tmpw6grazrr', '--dist-dir', '/var/tmp/tmp0t4pxebt']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying model.py -> build/lib
copying tfx-client.py -> build/lib
copying tuner.py -> build/lib
copying transparency.py -> build/lib
copying preprocessing_fn.py -> build/lib
installing to /var/tmp/tmpw6grazrr
running install
running install_lib
copying build/lib/model.py -> /var/tmp/tmpw6grazrr
copying build/lib/tuner.py -> /var/tmp/tmpw6grazrr
copying build/lib/preprocessing_fn.py -> /var/tmp/tmpw6grazrr
copying build/lib/tfx-client.py -> /var/tmp/tmpw6grazrr
copying build/lib/transparency.py -> /var/tmp/tmpw6grazrr
running install_egg_info


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running egg_info
creating tfx_user_code_Tuner.egg-info
writing tfx_user_code_Tuner.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Tuner.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Tuner.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Tuner.egg-info/SOURCES.txt'
Copying tfx_user_code_Tuner.egg-info to /var/tmp/tmpw6grazrr/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3.9.egg-info
running install_scripts
creating /var/tmp/tmpw6grazrr/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.dist-info/WHEEL
creating '/var/tmp/tmp0t4pxebt/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl' and adding '/var/tmp/tmpw6grazrr' to it
adding 'model.py'
adding 'preprocessing_fn.py'
adding 

INFO:absl:Successfully built user code wheel distribution at 'gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'; target user module is 'tuner'.
INFO:absl:Full user module path is 'tuner@gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Tuner-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'
INFO:absl:Generating ephemeral wheel package for '/home/jupyter/taxicab-pipeline/model.py' (including modules: ['model', 'tfx-client', 'tuner', 'transparency', 'preprocessing_fn']).
INFO:absl:User module package has hash fingerprint version a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.
INFO:absl:Executing: ['/opt/conda/bin/python', '/var/tmp/tmp5mns8y8q/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/var/tmp/tmpmubgogrw', '--dist-dir', '/var/tmp/tmpzutaw0yd']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying model.py -> build/lib
copying tfx-client.py -> build/lib
copying tuner.py -> build/lib
copying transparency.py -> build/lib
copying preprocessing_fn.py -> build/lib
installing to /var/tmp/tmpmubgogrw
running install
running install_lib
copying build/lib/model.py -> /var/tmp/tmpmubgogrw
copying build/lib/tuner.py -> /var/tmp/tmpmubgogrw
copying build/lib/preprocessing_fn.py -> /var/tmp/tmpmubgogrw
copying build/lib/tfx-client.py -> /var/tmp/tmpmubgogrw
copying build/lib/transparency.py -> /var/tmp/tmpmubgogrw
running install_egg_info


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmpmubgogrw/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3.9.egg-info
running install_scripts
creating /var/tmp/tmpmubgogrw/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7.dist-info/WHEEL
creating '/var/tmp/tmpzutaw0yd/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl' and adding '/var/tmp/tmpmubgogrw' to it
adding 'model.py'
adding 'prepro

INFO:absl:Successfully built user code wheel distribution at 'gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'; target user module is 'model'.
INFO:absl:Full user module path is 'model@gs://gcp-ml-pipeline/pipeline/dataflow-pipeline/_wheels/tfx_user_code_Trainer-0.0+a934c052176f163d3dab67799ea8d35eaba6c4a2c448e43701a7efbd323252e7-py3-none-any.whl'


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/853203979454/locations/us-west1/pipelineJobs/dataflow-pipeline-20230712001655


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/853203979454/locations/us-west1/pipelineJobs/dataflow-pipeline-20230712001655


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/853203979454/locations/us-west1/pipelineJobs/dataflow-pipeline-20230712001655')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/853203979454/locations/us-west1/pipelineJobs/dataflow-pipeline-20230712001655')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/dataflow-pipeline-20230712001655?project=853203979454


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-west1/pipelines/runs/dataflow-pipeline-20230712001655?project=853203979454
