# Deploy model using TFX Pipeline
To deploy the model we will following GCP best practises and use TensorFlow Extended (TFX). A TensorFlow pipeline is a sequence of components taht impoement an ML Pipeline which is specificially designed for scale, deployment, and retraining.

To successfully deploy the model we will need address the 3 phases of the pipeline:
1. Ingest & Validate Data
  - ExampleGen
  - StatisticsGen
  - SchemaGen
  - ExampleValidator
2. Train & Analyze Model
  - Transform
  - Trainer
3. Deploy in Production
  - Pusher

### Install python packages
We will install required Python packages including TFX and KFP to author ML pipelines and submit jobs to Vertex Pipelines. We will be deploying the TFX pipeline onto the Apache Beam orchistrator.

In [14]:
# Use the latest version of pip.
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"
!pip install --upgrade tensorflow_transform

[0m

### Restart the Runtime
You will need to restart the runtime for the libraries to be available in Google Collab. Runtime > Restart Runtime

### Login in to Google for this *notebook*

In [15]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

### Check the package versions.

In [16]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.7.1
TFX version: 1.6.1
KFP version: 1.8.11


### Set up variables

We will set up some variables used to customize the pipelines below. Following
information is required:

* GCP Project id.
* GCP Region to run pipelines.
* Google Cloud Storage Bucket to store pipeline outputs.

In [17]:
GOOGLE_CLOUD_PROJECT = 'ml-spec-demo-2-sandbox'
GOOGLE_CLOUD_REGION = 'us-central1'
GCS_BUCKET_NAME = 'black_friday_gcp_bucket'

#### Set `gcloud` to use your project.

In [18]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}

Updated property [core/project].


### Set up Global variables for model serving locations

In [19]:
PIPELINE_NAME = 'black-friday-gcp-vertex-pipelines'

# Path to pipeline artifacts
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths to training data
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)


# Training data file name
FILE_NAME = 'train.csv'

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
print('Data root: {}'.format(DATA_ROOT))

PIPELINE_ROOT: gs://black_friday_gcp_bucket/pipeline_root/black-friday-gcp-vertex-pipelines
Data root: gs://black_friday_gcp_bucket/data/black-friday-gcp-vertex-pipelines


### Prepare example data
This will use the Black Friday dataset as found on Kaggle.

There are four numeric features in this dataset which were already normalized
to have range [0,1]. We will build a classification model which predicts the
`species` of penguins.

We need to make our own copy of the dataset. Because TFX ExampleGen reads
inputs from a directory, we need to create a directory and copy dataset to it
on GCS.

Take a quick look at the CSV file.

In [7]:
!gsutil cat {DATA_ROOT}/train.csv | head

User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1000001,P00248942,F,0-17,10,A,2,0,1,6,14,15200
1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
1000001,P00085442,F,0-17,10,A,2,0,12,14,,1057
1000002,P00285442,M,55+,16,C,4+,0,8,,,7969
1000003,P00193542,M,26-35,15,A,3,0,1,2,,15227
1000004,P00184942,M,46-50,7,B,2,1,1,8,17,19215
1000004,P00346142,M,46-50,7,B,2,1,1,15,,15854
1000004,P0097242,M,46-50,7,B,2,1,1,16,,15686


## Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which
consists of three components, CsvExampleGen, Trainer and Pusher. The pipeline
and model definition is almost the same as
[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

The only difference is that we don't need to set `metadata_connection_config`
which is used to locate
[ML Metadata](https://www.tensorflow.org/tfx/guide/mlmd) database. Because
Vertex Pipelines uses a managed metadata service, users don't need to care
of it, and we don't need to specify the parameter.

Before actually define the pipeline, we need to write a model code for the
Trainer component first.

# Write Example Component


### Write model code.

We will use the same model code as in the
[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

In [20]:
_trainer_module_file = 'bfs_trainer.py'
_transformer_module_file = 'transformer.py'
_training_pipeline_file = 'training_pipeline.py'

In [21]:
%%writefile {_transformer_module_file}

from typing import Dict, Text, Any, List
import tensorflow as tf
import tensorflow_transform as tft

FEATURES = [
            'Product_ID',
            'Gender',
            'Age',
            'Occupation',
            'City_Category', 
            'Stay_In_Current_City_Years',
            'Marital_Status',
            'Product_Category_1',
            'Product_Category_2',
            'Purchase'
            ]

CATEGORICAL_FEATURE_KEYS = [
                            'Product_ID', 
                            'Age', 
                            'City_Category', 
                            'Product_Category_1', 
                            'Product_Category_2', 
                            'Stay_In_Current_City_Years',
                            'Gender'
                            ]

OPTIONAL_NUMERIC_KEY_FEATURES = ['Product_Category_2']

def preprocessing_fn(inputs: Dict[Text, Any], custom_config) -> Dict[Text, Any]:
    """tf.transform's callback function for preprocessing inputs.
    Args:
      inputs: map from feature keys to raw not-yet-transformed features.
      custom_config:
        timesteps: The number of timesteps in the look back window
        features: Which of the features from the TF.Example to use in the model.
    Returns:
      Map from string feature key to transformed feature operations.
    """
    print('Start preprocessing')
    outputs = {}
    for key in FEATURES:
      outputs[key] = inputs[key]


    # Convert optional categories to sparse tensor (fills in blank values basically)
    for key in OPTIONAL_NUMERIC_KEY_FEATURES:
      sparse = tf.sparse.SparseTensor(inputs[key].indices, inputs[key].values,
                                      [inputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0)

      # Reshaping from a batch of vectors of size 1 to a batch to scalars.
      dense = tf.squeeze(dense, axis=1)
      outputs[key] = dense

    for key in CATEGORICAL_FEATURE_KEYS:
      outputs[key] = tft.compute_and_apply_vocabulary(inputs[key])

    return outputs

Overwriting transformer.py


In [22]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow.keras import layers
from tfx_bsl.tfxio import dataset_options
from tfx import v1 as tfx
from tfx_bsl.public import tfxio

import tensorflow_transform as tft

from tensorflow_metadata.proto.v0 import schema_pb2

FEATURES = [
            'Product_ID',
            'Gender',
            'Age', 
            'Occupation', 
            'City_Category', 
            'Stay_In_Current_City_Years', 
            'Marital_Status', 
            'Product_Category_1',
            'Product_Category_2'
            ]

LABEL = 'Purchase'


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  #model = tfdf.keras.RandomForestModel(task=tfdf.keras.Task.REGRESSION)
  #model.summary(print_fn=logging.info)

  inputs = [keras.layers.Input(shape=(1,), name=f) for f in FEATURES]
  d = keras.layers.concatenate(inputs)
  d = keras.layers.Dense(128, activation='relu')(d)
  d = keras.layers.Dense(256, activation='relu')(d)
  d = keras.layers.Dense(128, activation='relu')(d)
  outputs = keras.layers.Dense(1)(d)

  model = tf.keras.Model(inputs=inputs, outputs=outputs)

  model.compile(
      optimizer=tf.optimizers.Adam(learning_rate=0.0005), 
      loss=tf.keras.losses.MeanSquaredError(),
      metrics=[keras.metrics.MeanSquaredError()]
    )
  
  model.summary(print_fn=logging.info)

  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  
  # get transform component output
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  # read input data
  train_dataset = fn_args.data_accessor.tf_dataset_factory(
      fn_args.train_files,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=20,
          label_key=LABEL
      ),
      tf_transform_output.transformed_metadata.schema,
  )

  eval_dataset = fn_args.data_accessor.tf_dataset_factory(
      fn_args.eval_files,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=10,
          label_key=LABEL
      ),
      tf_transform_output.transformed_metadata.schema,
  )

  model = _make_keras_model()

  # tf callbacks for tensorboard
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir,
      update_freq="batch",
  )


  # Train model
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback]
  )


  '''
  # Build signatures
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def _serve_tf_examples_fn(**input_features):
      # """Returns the output to be used in the serving signature."""
      preprocessed_features = model.tft_layer(input_features)
      autoencoded_features = model(preprocessed_features)

      return {
          **{
              f"input_features::{f}": input_features[f] for f in input_features.keys()
          },
          **{
              f"preprocessed_features::{f}": preprocessed_features[f]
              for f in preprocessed_features.keys()
          },
          # Output tensor names are of the form:
          # lstm_autoencoder_model/decoder/{feature_name}/Reshape_1:0
          **{
              f"output_features::{f.name.split('/')[2]}": f
              for f in autoencoded_features
          },
      }

  _input_tf_specs = {
      f: tf.TensorSpec(
          shape=[None, 8], dtype=tf.float32, name=f
      )
      for f in fn_args.custom_config["input_features"]
  }

  signatures = {
      "serving_default": _serve_tf_examples_fn.get_concrete_function(
          **_input_tf_specs
      )
  }

  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

  '''


  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')


Overwriting bfs_trainer.py


### Copy files to bucket
The transform and trainer module files need to be copied over to the GCP bucket for TFX to read.

In [23]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/
!gsutil cp {_transformer_module_file} {MODULE_ROOT}/

Copying file://bfs_trainer.py [Content-Type=text/x-python]...
-
Operation completed over 1 objects/4.6 KiB.                                      
Copying file://transformer.py [Content-Type=text/x-python]...
/ [1 files][  2.0 KiB/  2.0 KiB]                                                
Operation completed over 1 objects/2.0 KiB.                                      


### Create TFX pipeline. This pipeline can then be passed onto an orchestrator, such as KubeFlow, for deployment.

In [24]:
import os
from absl import logging
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow.keras import layers
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tfx.orchestration.pipeline import Pipeline
from tfx.proto.trainer_pb2 import EvalArgs, TrainArgs

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

def build_pipeline(pipeline_name, pipeline_root, serving_model_dir, data_root, file_name):
  print("Running pipeline")

  print("Creating example_gen")
  # Generate Training Samples from Dataset stored on bucket.
  example_gen = tfx.components.CsvExampleGen(
    input_base=data_root
  )

  print("Creating statistics_gen")
  # Generate statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs["examples"]
  )

  print("Creating schema_gen")
  # Generates schema based on statistic files
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs["statistics"],
      #infer_feature_shape=True  
  )

  print("Creating example_validator")
  # Performs anomaly dection based on statistics and data schema
  example_validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs["statistics"],
      schema=schema_gen.outputs["schema"]
  )

  print("Creating transform")
  transform = tfx.components.Transform(
      examples=example_gen.outputs["examples"],
      schema=schema_gen.outputs["schema"],
      module_file=os.path.join(MODULE_ROOT, _transformer_module_file)
  )

  print("Creating trainer")
  # Trains the model
  trainer = tfx.components.Trainer(
      examples=transform.outputs["transformed_examples"],
      transform_graph=transform.outputs["transform_graph"],
      module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
      schema=schema_gen.outputs["schema"],
      train_args=TrainArgs(num_steps=150),
      eval_args=EvalArgs(num_steps=150)

  )

  print("Creating pusher")
  # Pushes the trained model to vertex
  pusher = tfx.components.Pusher(
      trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)),
  )
  
  print("Creating tfx_pipeline")
  tfx_pipeline = Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          pusher
      ],
      data_root=data_root,
      module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
      serving_model_dir=serving_model_dir,
      enable_cache=True
  )


  pipeline_definition_file = pipeline_name + '_pipeline.json'

  print("Creating runner")
  runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
      config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
      output_filename=pipeline_definition_file)
  

  print("Executing runner")
  # Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
  _ = runner.run(tfx_pipeline)


def deploy_to_vertex(pipeline_name, project_name, cloud_region):
  aiplatform.init(project=project_name, location=cloud_region)
  pipeline_definition_file = pipeline_name + '_pipeline.json'
  job = pipeline_jobs.PipelineJob(template_path=pipeline_definition_file,
                                display_name=pipeline_name)
  job.run(sync=False)

build_pipeline(PIPELINE_NAME, PIPELINE_ROOT, SERVING_MODEL_DIR, DATA_ROOT, FILE_NAME)

Running pipeline
Creating example_gen
Creating statistics_gen
Creating schema_gen
Creating example_validator
Creating transform
Creating trainer
Creating pusher
Creating tfx_pipeline
Creating runner
Executing runner


# Deploy Pipeline to Vertex AI
TFX can be deployed to Vertex AI. The build_pipeline function deploys the pipeline to using the Kubeflow V2 orchestrator.

In [25]:
deploy_to_vertex(PIPELINE_NAME, GOOGLE_CLOUD_PROJECT, GOOGLE_CLOUD_REGION)

### Model Accuracy
You can check the model accuracy by reading the logs outputted by trainer module in Vertex AI. The accuracy seems to match the Neural Networks done in the exploration phase coming in with a RME of 80,064,008.

### Setting Up Vertex AI Endpoints

Once the model is trained it is placed into the Google Bucket under: 

```
black_friday_gcp_bucket/serving_model/black-friday-gcp-vertex-pipelines/#
```

This model will need to be registered in Vertex AI's Model service.

Once this model has been setup, Vertex AI's endpoint service can easily be configured to point to the registered model.

### Inferencing
Once the endpoint has been set up it can be inferenced using a HTTP request. 

To inference request's body structure is:

```
      {"instances": 
        [
        {"Age": [0.1],
        "City_Category": [0.2],
        "Gender": [0.3],
        "Marital_Status": [0.4],
        "Occupation": [0.5], 
        "Product_Category_1": [0.6],
        "Product_Category_2": [0.7],
        "Product_ID": [0.8],
        "Stay_In_Current_City_Years": [0.9]}
        ]
      }
```

You will notice that the inference values are encoded. This is because the servnig model does not use the transform_graph generated in the training pipeline.

There are two ways to solve this issue:
1. Create a signiture for the model using python decorators.
2. Create a Google Cloud service that manually maps inputs for the model.

For simplicity the latter has been implemented however template code for a signiture has been left in the trainer module.