In this notebook, we will build two custom models - one for Endpoint deployment and the other one for mobile deployment. We will write a TFX pipeline to run their training and export. The entire pipeline will be orchestrated using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction). 

## References

This notebook refers to the following resources and also reuses parts of the code from there: 
* [Simple TFX Pipeline for Vertex Pipelines](https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb)
* [Vertex AI Training with TFX and Vertex Pipelines](https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_vertex_training)
* [Importing models to Vertex AI](https://cloud.google.com/vertex-ai/docs/general/import-model)
* [Deploying a model using the Vertex AI API](https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api)
* [MLOPs with Vertex AI](https://github.com/GoogleCloudPlatform/mlops-with-vertex-ai)
* [Custom components TFX](https://www.tensorflow.org/tfx/tutorials/tfx/python_function_component)

## Setup

In [None]:
# Use the latest version of pip.
%%capture
!pip install --upgrade pip
!pip install --upgrade tfx==1.0.0 kfp==1.6.1
!pip install -q --upgrade google-cloud-aiplatform

### ***Please restart runtime before continuing.*** 

In [None]:
!gcloud init

In [None]:
from google.colab import auth
auth.authenticate_user()

## Imports

In [1]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

from google.cloud import aiplatform as vertex_ai
import os

TensorFlow version: 2.5.0
TFX version: 1.0.0
KFP version: 1.6.1


## Environment setup

In [2]:
GOOGLE_CLOUD_PROJECT = 'fast-ai-exploration'    #@param {type:"string"}
GOOGLE_CLOUD_REGION = 'us-central1'             #@param {type:"string"}
GCS_BUCKET_NAME = 'vertex-tfx-mlops'            #@param {type:"string"}

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

The location of the bucket must be a single region. Also, the bucket needs to be created in a region when [Vertex AI services are available](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions). 

In [3]:
PIPELINE_NAME = 'two-way-vertex-pipelines5'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://flowers-public/tfrecords-jpeg-224x224'

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs://demo-experiments-gde-csp/pipeline_root/two-way-vertex-pipelines5


There are three things you need for Firebase setup, and you can find a sufficient description for those in [Deploy and manage custom models with Firebase Admin SDK](https://firebase.google.com/docs/ml/manage-hosted-models) official document. 

1. You need to obtain credential file for your Firebase project
  - On the [Settings](https://console.firebase.google.com/project/_/settings/serviceaccounts/adminsdk) page, create a service account and download the service account key file. Keep this file safe, since it grants administrator access to your project.
  - Save the credential JSON file in a GCS bucket, and replace the `FIREBASE_CREDENTIAL_PATH` value with it.



2. You need to create a GCS bucket where the model is going to be temporarily stored.
  - On the Storage page, enable Cloud Storage. Take note of your bucket name.
  - Replace `FIREBASE_GCS_BUCKET` with the obtained GCS bucket name. It usually has this form `YOUR_GCP_PROJECT_ID.appspot.com`.



3. On the Firebase ML page, click Get started if you haven't yet enabled Firebase ML.

In [4]:
FIREBASE_CREDENTIAL_PATH = 'gs://credential-csp/gcp-ml-172005-firebase-adminsdk-5gdtb-38c6644f1e.json'
FIREBASE_GCS_BUCKET = 'gcp-ml-172005.appspot.com'

## Create training modules

In [5]:
_trainer_densenet_module_file = 'flower_densenet_trainer.py'
_trainer_mobilenet_module_file = 'flower_mobilenet_trainer.py'

In [6]:
%%writefile {_trainer_densenet_module_file}

from typing import List
from absl import logging
from tensorflow import keras
from tfx import v1 as tfx
import tensorflow as tf


_IMAGE_FEATURES = {
    "image": tf.io.FixedLenFeature([], tf.string),  
    "class": tf.io.FixedLenFeature([], tf.int64), 
    "one_hot_class": tf.io.VarLenFeature(tf.float32),
}

_CONCRETE_INPUT = "numpy_inputs"
_INPUT_SHAPE = (224, 224, 3)
_TRAIN_BATCH_SIZE = 64
_EVAL_BATCH_SIZE = 64
_EPOCHS = 2


def _parse_fn(example):
    example = tf.io.parse_single_example(example, _IMAGE_FEATURES)
    image = tf.image.decode_jpeg(example["image"], channels=3)
    class_label = tf.cast(example["class"], tf.int32)
    return image, class_label


def _input_fn(file_pattern: List[str], batch_size: int) -> tf.data.Dataset:
    """Generates features and label for training.

    Args:
        file_pattern: List of paths or patterns of input tfrecord files.
        batch_size: representing the number of consecutive elements of returned
            dataset to combine in a single batch.

    Returns:
        A dataset that contains (features, indices) tuple where features is a
            dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    logging.info(f"Reading data from: {file_pattern}")
    tfrecord_filenames = tf.io.gfile.glob(file_pattern[0] + ".gz")
    dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
    dataset = dataset.map(_parse_fn).batch(batch_size)
    return dataset.repeat()


def _make_keras_model() -> tf.keras.Model:
    """Creates a DenseNet121-bases model for classifying flowers data.

    Returns:
    A Keras Model.
    """
    inputs = keras.Input(shape=_INPUT_SHAPE)
    base_model = keras.applications.DenseNet121(
        include_top=False, input_shape=_INPUT_SHAPE, pooling="avg"
    )
    base_model.trainable = False
    x = keras.applications.densenet.preprocess_input(inputs)
    x = base_model(
        x, training=False
    )  # Ensures BatchNorm runs in inference model in this model
    outputs = keras.layers.Dense(5, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    model.summary(print_fn=logging.info)
    return model


def _preprocess(bytes_input):
    decoded = tf.io.decode_jpeg(bytes_input, channels=3)
    resized = tf.image.resize(decoded, size=(224, 224))
    return resized


@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def preprocess_fn(bytes_inputs):
    decoded_images = tf.map_fn(
        _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False
    )
    return {
        _CONCRETE_INPUT: decoded_images
    } 


def _model_exporter(model: tf.keras.Model):
    m_call = tf.function(model.call).get_concrete_function(
        [tf.TensorSpec(shape=[None, 224, 224, 3], dtype=tf.float32, name=_CONCRETE_INPUT)]
    )

    @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
    def serving_fn(bytes_inputs):
        # This function comes from the Computer Vision book from O'Reilly.
        labels = tf.constant(["daisy", "dandelion", "roses", "sunflowers", "tulips"],
                            dtype=tf.string)
        images = preprocess_fn(bytes_inputs)
        
        probs = m_call(**images)
        indices = tf.argmax(probs, axis=1)
        pred_source = tf.gather(params=labels, indices=indices)
        pred_confidence = tf.reduce_max(probs, axis=1)
        return {'label': pred_source,
                'confidence': pred_confidence}

    return serving_fn

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
        fn_args: Holds args used to train the model as name/value pairs.
    """
    train_dataset = _input_fn(fn_args.train_files, batch_size=_TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, batch_size=_EVAL_BATCH_SIZE)

    model = _make_keras_model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=_EPOCHS,
    )
    _, acc = model.evaluate(eval_dataset, steps=fn_args.eval_steps)
    logging.info(f"Validation accuracy: {round(acc * 100, 2)}%")
    # The result of the training should be saved in `fn_args.serving_model_dir`
    # directory.
    tf.saved_model.save(
        model, fn_args.serving_model_dir, signatures={"serving_default": _model_exporter(model)}
    )


Overwriting flower_densenet_trainer.py


In [7]:
%%writefile {_trainer_mobilenet_module_file}

from typing import List
from absl import logging
from tensorflow import keras
from tfx import v1 as tfx
import tensorflow as tf


_IMAGE_FEATURES = {
    "image": tf.io.FixedLenFeature([], tf.string),  
    "class": tf.io.FixedLenFeature([], tf.int64),  
    "one_hot_class": tf.io.VarLenFeature(tf.float32),
}

_INPUT_SHAPE = (224, 224, 3)
_TRAIN_BATCH_SIZE = 64
_EVAL_BATCH_SIZE = 64
_EPOCHS = 2


def _parse_fn(example):
    example = tf.io.parse_single_example(example, _IMAGE_FEATURES)
    image = tf.image.decode_jpeg(example["image"], channels=3)
    class_label = tf.cast(example["class"], tf.int32)
    return image, class_label


def _input_fn(file_pattern: List[str], batch_size: int) -> tf.data.Dataset:
    """Generates features and label for training.

    Args:
        file_pattern: List of paths or patterns of input tfrecord files.
        batch_size: representing the number of consecutive elements of returned
            dataset to combine in a single batch.

    Returns:
        A dataset that contains (features, indices) tuple where features is a
            dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    logging.info(f"Reading data from: {file_pattern}")
    tfrecord_filenames = tf.io.gfile.glob(file_pattern[0] + ".gz")
    dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
    dataset = dataset.map(_parse_fn).batch(batch_size)
    return dataset.repeat()


def _make_keras_model() -> tf.keras.Model:
    """Creates a MobileNetV3-bases model for classifying flowers data.

    Returns:
    A Keras Model.
    """
    inputs = keras.Input(shape=_INPUT_SHAPE)
    base_model = keras.applications.MobileNetV3Small(
        include_top=False, input_shape=_INPUT_SHAPE, pooling="avg"
    )
    base_model.trainable = False
    x = keras.applications.mobilenet_v3.preprocess_input(inputs)
    x = base_model(
        x, training=False
    )  # Ensures BatchNorm runs in inference model in this model
    outputs = keras.layers.Dense(5, activation="softmax")(x)
    model = keras.Model(inputs, outputs)

    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )

    model.summary(print_fn=logging.info)
    return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
        fn_args: Holds args used to train the model as name/value pairs.
    """
    train_dataset = _input_fn(fn_args.train_files, batch_size=_TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, batch_size=_EVAL_BATCH_SIZE)

    model = _make_keras_model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=_EPOCHS,
    )
    _, acc = model.evaluate(eval_dataset, steps=fn_args.eval_steps)
    logging.info(f"Validation accuracy: {round(acc * 100, 2)}%")

    # Convert the model.
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    tflite_model = converter.convert()

    # Save the model.
    # The result of the training should be saved in `fn_args.serving_model_dir` directory.    
    with tf.io.gfile.GFile(fn_args.serving_model_dir + '/model.tflite', 'wb') as f:
        f.write(tflite_model)

Overwriting flower_mobilenet_trainer.py


In [8]:
!gsutil cp -r *.py {MODULE_ROOT}/
!gsutil ls -lh {MODULE_ROOT}/

Copying file://firebase_publisher.py [Content-Type=text/x-python]...
Copying file://flower_densenet_trainer.py [Content-Type=text/x-python]...       
Copying file://flower_mobilenet_trainer.py [Content-Type=text/x-python]...      
Copying file://vertex_deployer.py [Content-Type=text/x-python]...               
/ [4 files][ 12.5 KiB/ 12.5 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying file://vertex_uploader.py [Content-Type=text/x-python]...
/ [5 files][ 13.8 KiB/ 13.8 KiB]                                                
Operation completed over 5 objects/13.8 KiB.                                     
  3.16 KiB  2021-08-05T16:02:35Z  gs://demo-experiments-gde-csp/pipeline_module/two-way-vertex-pipelines5/firebase_publisher.p

## Create the pipeline

To create the end-to-end pipeline, we will need to write two custom TFX components:

* One will take the pushed model from `Pusher` and will upload it to Vertex AI. 
* One will deploy the uploaded model to an Endpoint.

We will then need to build a Docker image using these custom components and serve the pipeline using this image. We will use [Cloud Build](https://cloud.google.com/build) in order to build the Docker image. 

In [59]:
_vertex_uploader_module_file = 'vertex_uploader.py'
_vertex_deployer_module_file = 'vertex_deployer.py'
_firebase_publisher_module_file = 'firebase_publisher.py'

In [60]:
%%writefile {_vertex_uploader_module_file}

import os
import tensorflow as tf

from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.types.standard_artifacts import String
from google.cloud import aiplatform as vertex_ai
from tfx import v1 as tfx
from absl import logging


@component
def VertexUploader(
    project: Parameter[str],
    region: Parameter[str],
    model_display_name: Parameter[str],
    pushed_model_location: Parameter[str],
    serving_image_uri: Parameter[str],
    uploaded_model: tfx.dsl.components.OutputArtifact[String]
):

    vertex_ai.init(project=project, location=region)

    pushed_model_dir = os.path.join(
        pushed_model_location, tf.io.gfile.listdir(pushed_model_location)[-1]
    )

    logging.info(f"Model registry location: {pushed_model_dir}")

    vertex_model = vertex_ai.Model.upload(
        display_name=model_display_name,
        artifact_uri=pushed_model_dir,
        serving_container_image_uri=serving_image_uri,
        parameters_schema_uri=None,
        instance_schema_uri=None,
        explanation_metadata=None,
        explanation_parameters=None,
    )

    uploaded_model.set_string_custom_property("model_resource_name", str(vertex_model.resource_name))
    logging.info(f"Model resource: {str(vertex_model.resource_name)}")
    

Overwriting vertex_uploader.py


In [61]:
%%writefile {_vertex_deployer_module_file}

from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.types.standard_artifacts import String
from google.cloud import aiplatform as vertex_ai
from tfx import v1 as tfx
from absl import logging


@component
def VertexDeployer(
    project: Parameter[str],
    region: Parameter[str],
    model_display_name: Parameter[str],
    deployed_model_display_name: Parameter[str]
):  

    logging.info(f"Endpoint display: {deployed_model_display_name}")
    vertex_ai.init(project=project, location=region)

    endpoints = vertex_ai.Endpoint.list(
        filter=f'display_name={deployed_model_display_name}', 
        order_by="update_time")
    
    if len(endpoints) > 0:
        logging.info(f"Endpoint {deployed_model_display_name} already exists.")
        endpoint = endpoints[-1]
    else:
        endpoint = vertex_ai.Endpoint.create(deployed_model_display_name)

    model = vertex_ai.Model.list(
        filter=f'display_name={model_display_name}',
        order_by="update_time"
    )[-1]

    endpoint = vertex_ai.Endpoint.list(
        filter=f'display_name={deployed_model_display_name}',
        order_by="update_time"
    )[-1]

    deployed_model = endpoint.deploy(
        model=model,
        # Syntax from here: https://git.io/JBQDP
        traffic_split={"0": 100},
        machine_type="n1-standard-4",
        min_replica_count=1,
        max_replica_count=1
    )

    logging.info(f"Model deployed to: {deployed_model}")
    

Overwriting vertex_deployer.py


In [62]:
%%writefile {_firebase_publisher_module_file}

from tfx import types
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import Parameter
from tfx import v1 as tfx
from absl import logging

import firebase_admin
from firebase_admin import ml
from firebase_admin import storage
from firebase_admin import credentials
from google.cloud import storage as gcs_storage

@component
def FirebasePublisher(
    pushed_model: tfx.dsl.components.InputArtifact[tfx.types.standard_artifacts.PushedModel],
    credential_uri: Parameter[str],
    firebase_dest_gcs_bucket: Parameter[str],
    model_display_name: Parameter[str],
    model_tag: Parameter[str]
) -> tfx.dsl.components.OutputDict(result=str):
    assert model_uri.split("://")[0] == "gs"
    assert credential_uri.split("://")[0] == "gs"
    
    # create gcs client instance
    gcs_client = gcs_storage.Client()
    
    # get credential for firebase
    credential_gcs_bucket = credential_uri.split("//")[1].split('/')[0]
    credential_blob_path = '/'.join(credential_uri.split("//")[1].split('/')[1:])
    
    bucket = gcs_client.bucket(credential_gcs_bucket)
    blob = bucket.blob(credential_blob_path)
    blob.download_to_filename('credential.json')
    logging.info(f"download credential.json from {credential_uri} is completed")    
    
    # get tflite model file
    model_uri = f'{pushed_model.uri}/model.tflite'    
    tflite_gcs_bucket = model_uri.split("//")[1].split('/')[0]
    tflite_blob_path =  '/'.join(model_uri.split("//")[1].split('/')[1:])
    
    bucket = gcs_client.bucket(tflite_gcs_bucket)
    blob = bucket.blob(tflite_blob_path)
    blob.download_to_filename('model.tflite')    
    logging.info(f"download model.tflite from {model_uri} is completed")
    
    firebase_admin.initialize_app(
        credentials.Certificate("credential.json"),
        options={
            "storageBucket": firebase_dest_gcs_bucket
        })
    logging.info("firebase_admin initialize app is completed")
    
    model_list = ml.list_models(list_filter=f'display_name={model_display_name}')
    # update
    if len(model_list.models) > 0:
        # get the first match model
        model = model_list.models[0]
        source = ml.TFLiteGCSModelSource.from_tflite_model_file('model.tflite')
        model.model_format = ml.TFLiteFormat(model_source=source)  
        
        updated_model = ml.update_model(model)
        ml.publish_model(updated_model.model_id)        

        logging.info("model exists, so update it in FireBase ML")
        return {
            "result": "model updated"
        }
    # create
    else:    
        # load a tflite file and upload it to Cloud Storage
        source = ml.TFLiteGCSModelSource.from_tflite_model_file('model.tflite')

        # create the model object
        tflite_format = ml.TFLiteFormat(model_source=source)
        model = ml.Model(
            display_name=model_display_name,
            tags=[model_tag],
            model_format=tflite_format
        )

        # Add the model to your Firebase project and publish it
        new_model = ml.create_model(model)
        ml.publish_model(new_model.model_id)    

        logging.info("model doesn exists, so create one in FireBase ML")        
        return {
            "result": "model created"
        }

Overwriting firebase_publisher.py


Create a package called `custom_components` and copy the modules we just wrote. 

In [63]:
!mkdir -p ./custom_components
!touch ./custom_components/__init__.py
!cp -r {_vertex_uploader_module_file} {_vertex_deployer_module_file} {_firebase_publisher_module_file} custom_components

In [64]:
!ls -lh custom_components

total 16K
-rw-r--r-- 1 jupyter jupyter    0 Aug  5 14:20 __init__.py
drwxr-xr-x 2 jupyter jupyter 4.0K Aug  5 06:50 __pycache__
-rw-r--r-- 1 jupyter jupyter 3.2K Aug  5 14:20 firebase_publisher.py
-rw-r--r-- 1 jupyter jupyter 1.5K Aug  5 14:20 vertex_deployer.py
-rw-r--r-- 1 jupyter jupyter 1.4K Aug  5 14:20 vertex_uploader.py


### `Dockerfile` configuration 

In [13]:
DATASET_DISPLAY_NAME = "flowers"
VERSION = "tfx-1-0-0"
TFX_IMAGE_URI = f"gcr.io/{GOOGLE_CLOUD_PROJECT}/{DATASET_DISPLAY_NAME}:{VERSION}"
print(f"URI of the custom image: {TFX_IMAGE_URI}")

URI of the custom image: gcr.io/gcp-ml-172005/flowers:tfx-1-0-0


In [66]:
%%writefile Dockerfile

FROM gcr.io/tfx-oss-public/tfx:1.0.0
RUN mkdir -p custom_components
COPY custom_components/* ./custom_components/
RUN pip install --upgrade google-cloud-aiplatform google-cloud-storage firebase-admin

Overwriting Dockerfile


In [None]:
!gcloud builds submit --tag $TFX_IMAGE_URI . --timeout=15m --machine-type=e2-highcpu-8

In [9]:
# Specify training worker configurations. To minimize costs we can even specify two
# different configurations: a beefier machine for the Endpoint model and slightly less
# powerful machine for the mobile model.
TRAINING_JOB_SPEC = {
    'project': GOOGLE_CLOUD_PROJECT,
    'worker_pool_specs': [{
        'machine_spec': {
            'machine_type': 'n1-standard-4',
            'accelerator_type': 'NVIDIA_TESLA_K80',
            'accelerator_count': 1
        },
        'replica_count': 1,
        'container_spec': {
            'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
        },
    }],
}

In [10]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [14]:
from custom_components.vertex_uploader import VertexUploader
from custom_components.vertex_deployer import VertexDeployer
from custom_components.firebase_publisher import FirebasePublisher

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     densenet_module_file: str, mobilenet_module_file: str,
                     serving_model_dir: str, firebase_crediential_path: str, firebase_gcs_bucket: str,
                     project_id: str, region: str) -> tfx.dsl.Pipeline:
    """Creates a three component flowers pipeline with TFX."""
    # Brings data into the pipeline.
    # input_base: gs://flowers-public/tfrecords-jpeg-224x224
    example_gen = tfx.components.ImportExampleGen(input_base=data_root)

    # Uses user-provided Python function that trains a model.
    densenet_trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=densenet_module_file,
        examples=example_gen.outputs['examples'],
        train_args=tfx.proto.TrainArgs(num_steps=52),
        eval_args=tfx.proto.EvalArgs(num_steps=5),
        custom_config={
            tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
                True,
            tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
                region,
            tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
                TRAINING_JOB_SPEC,
            'use_gpu':
                True,
        }
    ).with_id("densenet_trainer")

    # Pushes the model to a filesystem destination.
    pushed_model_location = os.path.join(serving_model_dir, "densenet")
    densnet_pusher = tfx.components.Pusher(
        model=densenet_trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=pushed_model_location))).with_id("densnet_pusher")
    
    # Vertex AI upload.
    model_display_name = "densenet_flowers_latest"
    uploader = VertexUploader(
        project=project_id,
        region=region,
        model_display_name=model_display_name,
        pushed_model_location=pushed_model_location,
        serving_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-5:latest" 
    ).with_id("vertex_uploader")
    uploader.add_upstream_node(densnet_pusher)

    # Create an endpoint.
    deployer = VertexDeployer(
        project=project_id,
        region=region,
        model_display_name=model_display_name,
        deployed_model_display_name=model_display_name + "_" + TIMESTAMP
    ).with_id("vertex_deployer")
    deployer.add_upstream_node(uploader)

    # Same for the MobileNet-based model. But it will be later pushed
    # to Firebase since it offers better features for TFLite. For now, we'll
    # be pushing the model to a GCS location.
    mobilenet_trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
        module_file=mobilenet_module_file,
        examples=example_gen.outputs['examples'],
        train_args=tfx.proto.TrainArgs(num_steps=52),
        eval_args=tfx.proto.EvalArgs(num_steps=5),
        custom_config={
            tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
                True,
            tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
                region,
            tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
                TRAINING_JOB_SPEC,
            'use_gpu':
                True,
        }
    ).with_id("mobilenet_trainer")

    pushed_location_mobilenet = os.path.join(serving_model_dir, "mobilenet")
    mobilenet_pusher = tfx.components.Pusher(
        model=mobilenet_trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=pushed_location_mobilenet))).with_id("mobilenet_pusher")

    firebase_publisher = FirebasePublisher(
        pushed_model=mobilenet_pusher.outputs['pushed_model'],
        credential_uri=firebase_crediential_path,
        firebase_dest_gcs_bucket=firebase_gcs_bucket,
        model_display_name=model_display_name,
        model_tag='mobilenet'
    ).with_id("firebase_publisher")    
    
    # Following components will be included in the pipeline.
    components = [
        example_gen,
        densenet_trainer, densnet_pusher, uploader, deployer,
        mobilenet_trainer, mobilenet_pusher, firebase_publisher
    ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components)

## Compile the pipeline

In [15]:
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

# Important: We need to pass the custom Docker image URI to the
# `KubeflowV2DagRunnerConfig` to take effect.
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(default_image=TFX_IMAGE_URI),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        densenet_module_file=os.path.join(MODULE_ROOT, _trainer_densenet_module_file),
        mobilenet_module_file=os.path.join(MODULE_ROOT, _trainer_mobilenet_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        firebase_crediential_path=FIREBASE_CREDENTIAL_PATH,
        firebase_gcs_bucket=FIREBASE_GCS_BUCKET, 
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION
    )
)

## Submit the pipeline for execution to Vertex AI

Generally, it's a good idea to first do a local run of the end-to-end pipeline before submitting it an online orchestrator. We can use `tfx.orchestration.LocalDagRunner()` for that but for the purposes of this notebook we won't be doing that. 

In [16]:
from kfp.v2.google import client

pipelines_client = client.AIPlatformClient(
    project_id=GOOGLE_CLOUD_PROJECT,
    region=GOOGLE_CLOUD_REGION,
)

_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE, enable_caching=True)

The pipeline should come out as the following:

![](https://i.ibb.co/98Ry74n/Screen-Shot-2021-08-06-at-1-43-35-AM.png)

## Making predictions with the Endpoint

Some code is used from [here](https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/unofficial/gapic/custom/showcase_custom_image_classification_online.ipynb). 

### Imports and initialization

In [17]:
from google.cloud.aiplatform import gapic as aip
from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson, ParseDict
from google.protobuf.struct_pb2 import Struct, Value

import base64

In [18]:
vertex_ai.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

### Programatically retrieve the latest Endpoint macthing a name

In [19]:
model_display_name = "densenet_flowers_latest"
deployed_model_display_name = model_display_name + "_" + TIMESTAMP

endpoint = vertex_ai.Endpoint.list(
    filter=f'display_name={deployed_model_display_name}',
    order_by="update_time"
)[-1]

endpoint_id = endpoint.name
endpoint_id

'3904532915999997952'

### Sample data

In [20]:
image_path = tf.keras.utils.get_file("image.jpg", 
                                            "https://m.economictimes.com/thumb/msid-71307470,width-1201,height-900,resizemode-4,imgsize-1040796/roses.jpg")
bytes = tf.io.read_file(image_path)
b64str = base64.b64encode(bytes.numpy()).decode("utf-8")

Downloading data from https://m.economictimes.com/thumb/msid-71307470,width-1201,height-900,resizemode-4,imgsize-1040796/roses.jpg


### Investigating the input key

In [21]:
pushed_model_location = os.path.join(SERVING_MODEL_DIR, "densenet")
model_path_to_deploy = os.path.join(
    pushed_model_location, tf.io.gfile.listdir(pushed_model_location)[-1]
)

loaded = tf.saved_model.load(model_path_to_deploy)
serving_input = list(
    loaded.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
print("Serving function input:", serving_input)

Serving function input: bytes_inputs


### Make predictions

In [22]:
def predict_image(image, endpoint, parameters_dict):
    # The format of each instance should conform to the deployed model's prediction input schema.
    instances_list = [{serving_input: {"b64": image}}]
    instances = [json_format.ParseDict(s, Value()) for s in instances_list]

    endpoint = vertex_ai.Endpoint(endpoint)
    print(endpoint.predict(instances=instances))

predict_image(b64str, endpoint_id, None)

Prediction(predictions=[{'confidence': 0.669201732, 'label': 'roses'}], deployed_model_id='7558456345704267776', explanations=None)
