In [20]:
# # Use the latest version of pip.
# !pip install --upgrade pip
# !pip install --upgrade "tfx[kfp]"
# !pip install --upgrade tensorflow_transform

import os
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow.keras import layers
import tensorflow_model_analysis as tfma
from tfx import v1 as tfx
from tfx.types import Channel, standard_artifacts
from tfx.dsl.components.common.resolver import Resolver
from tfx.components import Evaluator
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tfx.orchestration.pipeline import Pipeline
from tfx.proto import trainer_pb2, pusher_pb2
from tfx.types import standard_artifacts
import tensorflow_model_analysis as tfma
from google.cloud import bigquery
import pandas as pd
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google.cloud import storage

In [21]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

### Check the package versions.

In [22]:

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.15.1
TFX version: 1.15.1
KFP version: 1.8.22


In [23]:
GOOGLE_CLOUD_PROJECT = 'brldi-gcpcapabilities-ai-audit'
GOOGLE_CLOUD_REGION = 'us-central1'
GCS_BUCKET_NAME = 'finalrundemo1chicago'
FILE_NAME ='Final_Chicago_Train.csv'

#### Set `gcloud` to use your project.

In [24]:
 {GOOGLE_CLOUD_PROJECT}

{'brldi-gcpcapabilities-ai-audit'}

### Set up Global variables for model serving locations

In [25]:


# Initialize the Google Cloud Storage client
client = storage.Client()


PIPELINE_NAME = 'chicago-gcp-vertex-pipelines'

# Function to create directories in GCS
def create_gcs_directory(bucket_name, path):
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(path)
    blob.upload_from_string('', content_type='application/x-www-form-urlencoded;charset=UTF-8')

# Paths for pipeline artifacts
PIPELINE_ROOT = f'gs://{GCS_BUCKET_NAME}/{PIPELINE_NAME}/pipeline_root/'
MODULE_ROOT = f'gs://{GCS_BUCKET_NAME}/{PIPELINE_NAME}/pipeline_module/'
DATA_ROOT = f'gs://{GCS_BUCKET_NAME}/{PIPELINE_NAME}/data/'
SERVING_MODEL_DIR = f'gs://{GCS_BUCKET_NAME}/{PIPELINE_NAME}/serving_model/'

# Create directories
create_gcs_directory(GCS_BUCKET_NAME, f'{PIPELINE_NAME}/pipeline_root/')
create_gcs_directory(GCS_BUCKET_NAME, f'{PIPELINE_NAME}/pipeline_module/')
create_gcs_directory(GCS_BUCKET_NAME, f'{PIPELINE_NAME}/data/')
create_gcs_directory(GCS_BUCKET_NAME, f'{PIPELINE_NAME}/serving_model/')

print('GCS directories created:')
print(PIPELINE_ROOT)
print(MODULE_ROOT)
print(DATA_ROOT)
print(SERVING_MODEL_DIR)


GCS directories created:
gs://finalrundemo1chicago/chicago-gcp-vertex-pipelines/pipeline_root/
gs://finalrundemo1chicago/chicago-gcp-vertex-pipelines/pipeline_module/
gs://finalrundemo1chicago/chicago-gcp-vertex-pipelines/data/
gs://finalrundemo1chicago/chicago-gcp-vertex-pipelines/serving_model/


We need to make our own copy of the dataset. Because TFX ExampleGen reads
inputs from a directory, we need to create a directory and copy dataset to it
on GCS.

Take a quick look at the CSV file.

In [26]:


client = bigquery.Client()
# Define the SQL query
query = """
SELECT * FROM bigquery-public-data.chicago_taxi_trips.taxi_trips
WHERE EXTRACT(year FROM trip_start_timestamp) > 2018 AND
      trip_miles IS NOT NULL AND
      trip_seconds IS NOT NULL
ORDER BY RAND()
LIMIT 40000
"""

# Run the query
query_job = client.query(query)

# Convert to DataFrame
df = query_job.to_dataframe()

# Display the DataFrame
df




Unnamed: 0,unique_key,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,extras,trip_total,payment_type,company,pickup_latitude,pickup_longitude,pickup_location,dropoff_latitude,dropoff_longitude,dropoff_location
0,f446a00d13583893f8dff96d5e7f5f1c344b3b0f,72efecb1648eb2bb2584e1add78527ffa37dfef82aba84...,2019-03-07 23:45:00+00:00,2019-03-08 00:00:00+00:00,600,3.20,,,28,22,...,2.0,15.60,Credit Card,Star North Management LLC,41.874005,-87.663518,POINT (-87.6635175498 41.874005383),41.922761,-87.699155,POINT (-87.69915534320002 41.9227606205)
1,b0e63c3f94bdcc5087075ce68fd327b3a077ae13,e585218a9f728ba533db40a74237a1caa23f01212f0fa8...,2019-03-10 04:00:00+00:00,2019-03-10 04:00:00+00:00,420,1.60,17031081700,17031833000,8,28,...,1.0,8.50,Cash,KOAM Taxi Association,41.892042,-87.631864,POINT (-87.6318639497 41.8920421365),41.885281,-87.657233,POINT (-87.6572331997 41.8852813201)
2,fa7664fb4a363a891374554e01528e5da219f7c4,249ef6f75a49feebb50f4bc68cf7ba703c4006498c63b6...,2019-01-08 11:30:00+00:00,2019-01-08 11:30:00+00:00,451,1.56,,,33,32,...,0.0,7.75,Cash,Chicago Carriage Cab Corp,41.857184,-87.620335,POINT (-87.6203346241 41.8571838585),41.878866,-87.625192,POINT (-87.6251921424 41.8788655841)
3,e648cfac8d404da2749fdcdd9e566a1b9f026bed,7ec17a8416dca01eef674860959ae50b91aeecb1b417b0...,2019-04-05 17:00:00+00:00,2019-04-05 17:15:00+00:00,540,0.60,17031839100,17031081700,32,8,...,1.0,9.75,Credit Card,Star North Management LLC,41.880994,-87.632746,POINT (-87.6327464887 41.8809944707),41.892042,-87.631864,POINT (-87.6318639497 41.8920421365)
4,25abfc3959ff900d456804165c03351c1aff647b,b2917f6d0a6e1407152c6fa9c392d03ca840fd719c8062...,2019-02-13 15:15:00+00:00,2019-02-13 16:15:00+00:00,3540,19.80,17031980000,17031330100,76,33,...,4.0,62.20,Credit Card,Medallion Leasin,41.979071,-87.903040,POINT (-87.9030396611 41.9790708201),41.859350,-87.617358,POINT (-87.6173580061 41.859349715)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
39995,34f2e523be8c72cc011aa4a9dd50df3063ddb4b8,9472ac2ed88284fd6d87425e35fbc13bd462a38f594b1b...,2019-04-12 03:45:00+00:00,2019-04-12 04:00:00+00:00,1260,1.10,,,8,76,...,4.0,57.60,Credit Card,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.980264,-87.913625,POINT (-87.913624596 41.9802643146)
39996,2d21ee9e7a3b7f0e2b08592d7a39788d49865477,d092ee0cb44dc6fa5f6757a7ef07181c4ee3d25eed0bc4...,2019-04-26 13:30:00+00:00,2019-04-26 13:45:00+00:00,240,0.60,17031839100,17031320400,32,32,...,0.0,6.75,Credit Card,Top Cab Affiliation,41.880994,-87.632746,POINT (-87.6327464887 41.8809944707),41.877406,-87.621972,POINT (-87.6219716519 41.8774061234)
39997,8de0c36894b3353ff165f7e455399745bf08456f,0c0b18d1759f9e7dc42dcec3eaccc1531e18dce30b5cd3...,2019-04-03 19:30:00+00:00,2019-04-03 19:45:00+00:00,1140,5.20,,,8,3,...,0.0,16.75,Cash,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.965812,-87.655879,POINT (-87.6558787862 41.96581197)
39998,9a81669dd209249baea02fc8613abf1c6ccbccfc,7e25041185a6cc504dd6e2f71691c595aeae45c4b86247...,2019-03-06 21:30:00+00:00,2019-03-06 21:45:00+00:00,1140,13.50,17031980000,17031831000,76,22,...,5.0,44.25,Credit Card,Top Cab Affiliation,41.979071,-87.903040,POINT (-87.9030396611 41.9790708201),41.916005,-87.675095,POINT (-87.6750951155 41.9160052737)


In [27]:
df.isnull().sum()

unique_key                    0
taxi_id                       0
trip_start_timestamp          0
trip_end_timestamp            0
trip_seconds                  0
trip_miles                    0
pickup_census_tract       13198
dropoff_census_tract      13587
pickup_community_area      2376
dropoff_community_area     3535
fare                          5
tips                          5
tolls                       149
extras                        5
trip_total                    5
payment_type                  0
company                       0
pickup_latitude            2375
pickup_longitude           2375
pickup_location            2375
dropoff_latitude           3422
dropoff_longitude          3422
dropoff_location           3422
dtype: int64

In [28]:
df["trip_start_day"] = df.trip_start_timestamp.apply(lambda x: x.day)

In [29]:
df["trip_start_month"] = df.trip_start_timestamp.apply(lambda x: x.month)

In [30]:
df["trip_start_hour"] = df.trip_start_timestamp.apply(lambda x: x.hour)

In [31]:
df

Unnamed: 0,unique_key,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_census_tract,dropoff_census_tract,pickup_community_area,dropoff_community_area,...,company,pickup_latitude,pickup_longitude,pickup_location,dropoff_latitude,dropoff_longitude,dropoff_location,trip_start_day,trip_start_month,trip_start_hour
0,f446a00d13583893f8dff96d5e7f5f1c344b3b0f,72efecb1648eb2bb2584e1add78527ffa37dfef82aba84...,2019-03-07 23:45:00+00:00,2019-03-08 00:00:00+00:00,600,3.20,,,28,22,...,Star North Management LLC,41.874005,-87.663518,POINT (-87.6635175498 41.874005383),41.922761,-87.699155,POINT (-87.69915534320002 41.9227606205),7,3,23
1,b0e63c3f94bdcc5087075ce68fd327b3a077ae13,e585218a9f728ba533db40a74237a1caa23f01212f0fa8...,2019-03-10 04:00:00+00:00,2019-03-10 04:00:00+00:00,420,1.60,17031081700,17031833000,8,28,...,KOAM Taxi Association,41.892042,-87.631864,POINT (-87.6318639497 41.8920421365),41.885281,-87.657233,POINT (-87.6572331997 41.8852813201),10,3,4
2,fa7664fb4a363a891374554e01528e5da219f7c4,249ef6f75a49feebb50f4bc68cf7ba703c4006498c63b6...,2019-01-08 11:30:00+00:00,2019-01-08 11:30:00+00:00,451,1.56,,,33,32,...,Chicago Carriage Cab Corp,41.857184,-87.620335,POINT (-87.6203346241 41.8571838585),41.878866,-87.625192,POINT (-87.6251921424 41.8788655841),8,1,11
3,e648cfac8d404da2749fdcdd9e566a1b9f026bed,7ec17a8416dca01eef674860959ae50b91aeecb1b417b0...,2019-04-05 17:00:00+00:00,2019-04-05 17:15:00+00:00,540,0.60,17031839100,17031081700,32,8,...,Star North Management LLC,41.880994,-87.632746,POINT (-87.6327464887 41.8809944707),41.892042,-87.631864,POINT (-87.6318639497 41.8920421365),5,4,17
4,25abfc3959ff900d456804165c03351c1aff647b,b2917f6d0a6e1407152c6fa9c392d03ca840fd719c8062...,2019-02-13 15:15:00+00:00,2019-02-13 16:15:00+00:00,3540,19.80,17031980000,17031330100,76,33,...,Medallion Leasin,41.979071,-87.903040,POINT (-87.9030396611 41.9790708201),41.859350,-87.617358,POINT (-87.6173580061 41.859349715),13,2,15
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
39995,34f2e523be8c72cc011aa4a9dd50df3063ddb4b8,9472ac2ed88284fd6d87425e35fbc13bd462a38f594b1b...,2019-04-12 03:45:00+00:00,2019-04-12 04:00:00+00:00,1260,1.10,,,8,76,...,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.980264,-87.913625,POINT (-87.913624596 41.9802643146),12,4,3
39996,2d21ee9e7a3b7f0e2b08592d7a39788d49865477,d092ee0cb44dc6fa5f6757a7ef07181c4ee3d25eed0bc4...,2019-04-26 13:30:00+00:00,2019-04-26 13:45:00+00:00,240,0.60,17031839100,17031320400,32,32,...,Top Cab Affiliation,41.880994,-87.632746,POINT (-87.6327464887 41.8809944707),41.877406,-87.621972,POINT (-87.6219716519 41.8774061234),26,4,13
39997,8de0c36894b3353ff165f7e455399745bf08456f,0c0b18d1759f9e7dc42dcec3eaccc1531e18dce30b5cd3...,2019-04-03 19:30:00+00:00,2019-04-03 19:45:00+00:00,1140,5.20,,,8,3,...,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.965812,-87.655879,POINT (-87.6558787862 41.96581197),3,4,19
39998,9a81669dd209249baea02fc8613abf1c6ccbccfc,7e25041185a6cc504dd6e2f71691c595aeae45c4b86247...,2019-03-06 21:30:00+00:00,2019-03-06 21:45:00+00:00,1140,13.50,17031980000,17031831000,76,22,...,Top Cab Affiliation,41.979071,-87.903040,POINT (-87.9030396611 41.9790708201),41.916005,-87.675095,POINT (-87.6750951155 41.9160052737),6,3,21


In [32]:
df.to_csv("Final_Chicago_Train.csv", index=False)

In [33]:
!gsutil mb -l {GOOGLE_CLOUD_REGION} gs://{GCS_BUCKET_NAME}
!gsutil cp Final_Chicago_Train.csv {DATA_ROOT}/

Creating gs://finalrundemo1chicago/...
ServiceException: 409 A Cloud Storage bucket named 'finalrundemo1chicago' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.
Copying file://Final_Chicago_Train.csv [Content-Type=text/csv]...
/ [1 files][ 16.4 MiB/ 16.4 MiB]                                                
Operation completed over 1 objects/16.4 MiB.                                     


# Write Example Component


### Write model code.

We will use the same model code as in the
[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

In [34]:
_trainer_module_file = 'bfs_trainer.py'
_transformer_module_file = 'transformer.py'
_training_pipeline_file = 'training_pipeline.py'

In [35]:
%%writefile {_transformer_module_file}

import tensorflow as tf
import tensorflow_transform as tft

# Define the numerical features that will be used in the model.
NUMERICAL_FEATURES = ['trip_miles', 'trip_seconds']

# Define the features that will be bucketized.
BUCKET_FEATURES = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Define the number of buckets used by tf.transform for encoding each feature in BUCKET_FEATURES.
FEATURE_BUCKET_COUNT = 10

# Define the categorical features that are represented as numerical values.
CATEGORICAL_NUMERICAL_FEATURES = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

# Define the categorical features that are represented as strings.
CATEGORICAL_STRING_FEATURES = [
    'payment_type',
    'company',
]

# Define the number of vocabulary terms used for encoding categorical features.
VOCAB_SIZE = 1000

# Define the count of out-of-vocab buckets in which unrecognized categorical are hashed.
OOV_SIZE = 10

# Define the keys for the label and fare columns in the input data.
LABEL_KEY = 'fare'

# Define a helper function that appends the suffix '_xf' to a feature key to avoid clashes
# with raw feature keys when running the Evaluator component.
def t_name(key):
    return key + '_xf'

def _make_one_hot(x, key):
    """Make a one-hot tensor to encode categorical features.
    Args:
        x: A dense tensor
        key: A string key for the feature in the input
    Returns:
        A dense one-hot tensor as a float list
    """
    # Computing and applying vocabulary to the input tensor and integerizing it.
    integerized = tft.compute_and_apply_vocabulary(x,
                                                   top_k=VOCAB_SIZE,
                                                   num_oov_buckets=OOV_SIZE,
                                                   vocab_filename=key,
                                                   name=key)
    # Getting the vocabulary size for the feature.
    depth = (
        tft.experimental.get_vocabulary_size_by_name(key) + OOV_SIZE)
    # Converting the integerized tensor to a one-hot tensor.
    one_hot_encoded = tf.one_hot(
        integerized,
        depth=tf.cast(depth, tf.int32),
        on_value=1.0,
        off_value=0.0)
    return tf.reshape(one_hot_encoded, [-1, depth])

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor.
    Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
    Args:
      x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
        in the second dimension.
    Returns:
      A rank 1 tensor where missing values of `x` have been filled in.
    """
    if not isinstance(x, tf.sparse.SparseTensor):
        return x

    default_value = '' if x.dtype == tf.string else 0
    return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
      inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
      Map from string feature key to transformed feature operations.
    """
    outputs = {}
    for key in NUMERICAL_FEATURES:
        # Filling in missing values and scaling the numerical features to have mean=0 and variance=1.
        outputs[t_name(key)] = tft.scale_to_z_score(
            _fill_in_missing(inputs[key]), name=key)

    for key in BUCKET_FEATURES:
        # Filling in missing values and bucketizing the features.
        outputs[t_name(key)] = tf.cast(tft.bucketize(
            _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT, name=key),
            dtype=tf.float32)

    for key in CATEGORICAL_STRING_FEATURES:
        # Filling in missing values and one-hot encoding the categorical string features.
        outputs[t_name(key)] = _make_one_hot(_fill_in_missing(inputs[key]), key)

    for key in CATEGORICAL_NUMERICAL_FEATURES:
        # Filling in missing values, converting the categorical numerical features to strings, and one-hot encoding them.
        outputs[t_name(key)] = _make_one_hot(tf.strings.strip(
        tf.strings.as_string(_fill_in_missing(inputs[key]))), key)

    # Fare is used as a label here
    outputs[LABEL_KEY] = _fill_in_missing(inputs[LABEL_KEY])

    return outputs



Overwriting transformer.py


In [36]:
%%writefile {_trainer_module_file}
# Import necessary libraries
from typing import Dict, List, Text
import os
import glob
from absl import logging
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput


_LABEL_KEY = 'fare'

_BATCH_SIZE = 40

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
    """Generates features and label for tuning/training.

    Args:
      file_pattern: List of paths or patterns of input tfrecord files.
      data_accessor: DataAccessor for converting input to RecordBatch.
      tf_transform_output: A TFTransformOutput.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    # Create a dataset from the input files using the TFTransformOutput and batch size
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=batch_size, label_key=_LABEL_KEY),
        tf_transform_output.transformed_metadata.schema)

def _get_tf_examples_serving_signature(model, tf_transform_output):
    """Returns a serving signature that accepts `tensorflow.Example`."""

    # We need to track the layers in the model in order to save it.
    # TODO(b/162357359): Revise once the bug is resolved.
    model.tft_layer_inference = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
    ])
    def serve_tf_examples_fn(serialized_tf_example):
        """Returns the output to be used in the serving signature."""
        raw_feature_spec = tf_transform_output.raw_feature_spec()
        # Remove label feature since these will not be present at serving time.
        raw_feature_spec.pop(_LABEL_KEY)
        raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
        transformed_features = model.tft_layer_inference(raw_features)
        logging.info('serve_transformed_features = %s', transformed_features)

        outputs = model(transformed_features)
        # TODO(b/154085620): Convert the predicted labels from the model using a
        # reverse-lookup (opposite of transform.py).
        return {'outputs': outputs}

    # Define a serving function that takes in serialized tf.Example and returns model outputs
    return serve_tf_examples_fn

def _get_transform_features_signature(model, tf_transform_output):
    """Returns a serving signature that applies tf.Transform to features."""

    # We need to track the layers in the model in order to save it.
    # TODO(b/162357359): Revise once the bug is resolved.
    model.tft_layer_eval = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
    ])
    def transform_features_fn(serialized_tf_example):
        """Returns the transformed_features to be fed as input to evaluator."""
        raw_feature_spec = tf_transform_output.raw_feature_spec()
        raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
        transformed_features = model.tft_layer_eval(raw_features)
        logging.info('eval_transformed_features = %s', transformed_features)
        return transformed_features

    # Define a serving function that takes in serialized tf.Example and returns transformed features
    return transform_features_fn

def export_serving_model(tf_transform_output, model, output_dir):
    """Exports a keras model for serving.

    Args:
      tf_transform_output: Wrapper around output of tf.Transform.
      model: A keras model to export for serving.
      output_dir: A directory where the model will be exported to.
    """
    # Save the transform layer to the model for serving
    model.tft_layer = tf_transform_output.transform_features_layer()

    signatures = {
        'serving_default':
            _get_tf_examples_serving_signature(model, tf_transform_output),
        'transform_features':
            _get_transform_features_signature(model, tf_transform_output),
    }

    # Save the model with serving signatures
    model.save(output_dir, save_format='tf', signatures=signatures)

def _build_keras_model(tf_transform_output: TFTransformOutput
                       ) -> tf.keras.Model:
    """Creates a DNN Keras model for classifying taxi data.

    Args:
      tf_transform_output: [TFTransformOutput], the outputs from Transform

    Returns:
      A keras Model.
    """
    # Create a dictionary of model inputs based on the transformed feature spec
    feature_spec = tf_transform_output.transformed_feature_spec().copy()
    feature_spec.pop(_LABEL_KEY)

    inputs = {}
    for key, spec in feature_spec.items():
        if isinstance(spec, tf.io.VarLenFeature):
            inputs[key] = tf.keras.layers.Input(shape=[None], name=key, dtype=spec.dtype, sparse=True)
        elif isinstance(spec, tf.io.FixedLenFeature):
            inputs[key] = tf.keras.layers.Input(shape=spec.shape or [1], name=key, dtype=spec.dtype)
        else:
            raise ValueError('Spec type is not supported: ', key, spec)

    # Define the model architecture using the inputs
    output = tf.keras.layers.Concatenate()(tf.nest.flatten(inputs))
    output = tf.keras.layers.Dense(100, activation='relu')(output)
    output = tf.keras.layers.Dense(70, activation='relu')(output)
    output = tf.keras.layers.Dense(50, activation='relu')(output)
    output = tf.keras.layers.Dense(20, activation='relu')(output)
    output = tf.keras.layers.Dense(1)(output)
    return tf.keras.Model(inputs=inputs, outputs=output)

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
      fn_args: Holds args used to train the model as name/value pairs.
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    # Create training and evaluation datasets using the input function
    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
                              tf_transform_output, _BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
                             tf_transform_output, _BATCH_SIZE)

    # Build and compile the Keras model
    model = _build_keras_model(tf_transform_output)

    model.compile(
      optimizer=tf.optimizers.Adam(learning_rate=0.0005), 
      loss=tf.keras.losses.MeanSquaredError(),
      metrics=[tf.keras.metrics.MeanSquaredError()])

    # Train the model using the training and evaluation datasets
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=fn_args.model_run_dir, update_freq='batch')

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        callbacks=[tensorboard_callback])

    # Export the trained model with serving signatures
    export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)

Overwriting bfs_trainer.py


### Copy files to bucket
The transform and trainer module files need to be copied over to the GCP bucket for TFX to read.

In [37]:
!gsutil cp bfs_trainer.py {MODULE_ROOT}/
!gsutil cp transformer.py {MODULE_ROOT}/

Copying file://bfs_trainer.py [Content-Type=text/x-python]...
/ [1 files][  7.3 KiB/  7.3 KiB]                                                
Operation completed over 1 objects/7.3 KiB.                                      
Copying file://transformer.py [Content-Type=text/x-python]...
/ [1 files][  4.4 KiB/  4.4 KiB]                                                
Operation completed over 1 objects/4.4 KiB.                                      


### Create TFX pipeline. This pipeline can then be passed onto an orchestrator, such as KubeFlow, for deployment.

In [38]:

def build_pipeline(pipeline_name, pipeline_root, serving_model_dir, data_root, file_name):
    print("Running pipeline")

    print("Creating example_gen")
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    print("Creating statistics_gen")
    statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs["examples"])

    print("Creating schema_gen")
    schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs["statistics"])

    print("Creating example_validator")
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs["statistics"],
        schema=schema_gen.outputs["schema"]
    )

    print("Creating transform")
    transform = tfx.components.Transform(
        examples=example_gen.outputs["examples"],
        schema=schema_gen.outputs["schema"],
        module_file=os.path.join(MODULE_ROOT, "transformer.py")
    )

    print("Creating trainer")
    trainer = tfx.components.Trainer(
        examples=transform.outputs["transformed_examples"],
        transform_graph=transform.outputs["transform_graph"],
        module_file=os.path.join(MODULE_ROOT, "bfs_trainer.py"),
        schema=schema_gen.outputs["schema"],
        train_args=trainer_pb2.TrainArgs(num_steps=150),
        eval_args=trainer_pb2.EvalArgs(num_steps=150)
    )

    print("Adding model_resolver")
    model_resolver = tfx.dsl.Resolver(
        strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
        model=tfx.dsl.Channel(type=standard_artifacts.Model),
        model_blessing=tfx.dsl.Channel(type=standard_artifacts.ModelBlessing)
    ).with_id('latest_blessed_model_resolver')

    print("Adding evaluator")
    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=tfma.EvalConfig(
            model_specs=[
                tfma.ModelSpec(
                    signature_name='serving_default',
                    label_key='fare',
                    preprocessing_function_names=['transform_features'],
                )
            ],
            metrics_specs=[
                tfma.MetricsSpec(
                    metrics=[
                        tfma.MetricConfig(class_name='ExampleCount'),
                        tfma.MetricConfig(class_name='MeanSquaredError')
                    ]
                )
            ],
            slicing_specs=[
                tfma.SlicingSpec(feature_keys=['trip_start_hour'])
            ]
        )
    )

    print("Creating pusher")
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)
        )
    )

    print("Creating tfx_pipeline")
    tfx_pipeline = Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=[
            example_gen,
            statistics_gen,
            schema_gen,
            example_validator,
            transform,
            trainer,
            model_resolver,
            evaluator,
            pusher
        ],
        enable_cache=True
    )

    print("Creating runner")
    runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
        config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
        output_filename=pipeline_name + '_pipeline.json'
    )

    print("Executing runner")
    _ = runner.run(tfx_pipeline)

    print("Pipeline is ready to be executed.")
    
    
build_pipeline(PIPELINE_NAME, PIPELINE_ROOT, SERVING_MODEL_DIR, DATA_ROOT, FILE_NAME)

Running pipeline
Creating example_gen
Creating statistics_gen
Creating schema_gen
Creating example_validator
Creating transform
Creating trainer
Adding model_resolver
Adding evaluator
Creating pusher
Creating tfx_pipeline
Creating runner
Executing runner
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying transformer.py -> build/lib


!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


installing to /var/tmp/tmpvclegts5
running install
running install_lib
copying build/lib/transformer.py -> /var/tmp/tmpvclegts5
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /var/tmp/tmpvclegts5/tfx_user_code_Transform-0.0+9a5a89b6638dd2d2f72fb9360d4a810643fc3eadbcdd9f0c43022d91098b93de-py3.10.egg-info
running install_scripts
creating /var/tmp/tmpvclegts5/tfx_user_code_Transform-0.0+9a5a89b6638dd2d2f72fb9360d4a810643fc3eadbcdd9f0c43022d91098b93de.dist-info/WHEEL
creating '/var/tmp/tmpvu76go_p/tfx_use

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


installing to /var/tmp/tmp8s6kxhh5
running install
running install_lib
copying build/lib/bfs_trainer.py -> /var/tmp/tmp8s6kxhh5
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /var/tmp/tmp8s6kxhh5/tfx_user_code_Trainer-0.0+0545d0a3143be72e8c4a7380209e47d36c8df9271e4bb0f0810855e6cd41d14a-py3.10.egg-info
running install_scripts
creating /var/tmp/tmp8s6kxhh5/tfx_user_code_Trainer-0.0+0545d0a3143be72e8c4a7380209e47d36c8df9271e4bb0f0810855e6cd41d14a.dist-info/WHEEL
creating '/var/tmp/tmp_nuh_jqy/tfx_user_code_Trainer-0.0+0