# NLP ML on TFX with Vertex Pipelines

This pipeline demonstrates data preprocessing, training, and export of a classification based on the BERT model.

# Project Setup

## Install Required Packages

In [None]:
!pip install -Uq --use-deprecated=legacy-resolver tfx
!pip install -Uq tensorflow-text  # The tf-text version needs to match the tf version

print("Restart your runtime enable after installing the packages")

## Import relevant packages

In [None]:
import glob
import os
import pprint
import re
import tempfile
from shutil import rmtree
from typing import List, Dict, Tuple, Union

import numpy as np
import pandas as pd
import tensorflow as tf
import tfx
import tensorflow_data_validation as tfdv
import tensorflow_hub as hub
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.saved import saved_transform_io
# from tensorflow_transform.tf_metadata import (dataset_metadata, dataset_schema,
#                                               metadata_io, schema_utils)
from tfx.components import (Evaluator, ExampleValidator, ImportExampleGen,
                            ModelValidator, Pusher, SchemaGen,
                            StatisticsGen, Trainer, Transform)
from tfx.dsl.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.proto import evaluator_pb2, example_gen_pb2, pusher_pb2, trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
# from tfx.utils.dsl_utils import external_input
from tfx.dsl.components.common import resolver
from typing import Dict, List, Text
from tfx.orchestration import metadata, pipeline

from tfx.extensions.google_cloud_ai_platform.trainer import component as gcp_trainer
from tfx.extensions.google_cloud_ai_platform.tuner import component as gcp_tuner
from tfx.extensions.google_cloud_ai_platform.pusher import component as gcp_pusher
from tfx.extensions.google_cloud_ai_platform.constants import ENABLE_VERTEX_KEY
from tfx.extensions.google_cloud_ai_platform.constants import VERTEX_REGION_KEY

import tensorflow_datasets as tfds
import tensorflow_model_analysis as tfma
import tensorflow_text as text
from setuptools import distutils

from tfx.orchestration.experimental.interactive.interactive_context import \
    InteractiveContext

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
tfx.__version__

## Download the IMDB Dataset from TensorFlow Datasets

For our demo example, we are using the [IMDB data set](https://www.kaggle.com/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews) to train a sentiment model based on the pre-trained BERT model. The data set is provided through [TensorFlow Datasets](https://www.tensorflow.org/datasets). Our ML pipeline can read TFRecords, however it expects only TFRecord files in the data folder so removing JSON files

In [100]:
!rm -rf tfds/

In [101]:
!mkdir tfds/

def clean_before_download(base_data_dir):
    rmtree(base_data_dir)
    
def delete_unnecessary_files(base_path):
    counter = 0
    file_list = ["dataset_info.json", "label.labels.txt", "features.json"]

    for f in file_list:
        try:
            os.remove(os.path.join(base_path, f))
            counter += 1
        except OSError:
            pass

    for f in glob.glob(base_path + "imdb_reviews-unsupervised.*"):
        os.remove(f)
        counter += 1
    print(f"Deleted {counter} files")

def get_dataset(name='imdb_reviews', version="1.0.0"):

    base_data_dir = "tfds/"
    config="plain_text"
    version="1.0.0"

    clean_before_download(base_data_dir)
    tfds.disable_progress_bar()
    builder = tfds.text.IMDBReviews(data_dir=base_data_dir, 
                                    config=config, 
                                    version=version)
    download_config = tfds.download.DownloadConfig(
        download_mode=tfds.GenerateMode.FORCE_REDOWNLOAD)
    builder.download_and_prepare(download_config=download_config)

    base_tfrecords_filename = os.path.join(base_data_dir, "imdb_reviews", config, version, "")
    train_tfrecords_filename = base_tfrecords_filename + "imdb_reviews-train*"
    test_tfrecords_filename = base_tfrecords_filename + "imdb_reviews-test*"
    label_filename = os.path.join(base_tfrecords_filename, "label.labels.txt")
    labels = [label.rstrip('\n') for label in open(label_filename)]
    delete_unnecessary_files(base_tfrecords_filename)
    return (train_tfrecords_filename, test_tfrecords_filename), labels

tfrecords_filenames, labels = get_dataset()

[1mDownloading and preparing dataset 80.23 MiB (download: 80.23 MiB, generated: Unknown size, total: 80.23 MiB) to tfds/imdb_reviews/plain_text/1.0.0...[0m
[1mDataset imdb_reviews downloaded and prepared to tfds/imdb_reviews/plain_text/1.0.0. Subsequent calls will reuse this data.[0m
Deleted 4 files
PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/318948681665/locations/us-central1/pipelineJobs/tfx-bert-classif-20230129103749 current state:
PipelineState.PIPELINE_STATE_RUNNING


# TFX Pipeline

The TensorFlow Extended Pipeline is more or less following the example setup shown here. We'll only note deviations from the original setup.

## Initializing the Interactive TFX Pipeline

In [None]:
context = InteractiveContext()

## Loading the dataset

In [None]:
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=45),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=5)
             ]))
# Load the data from our prepared TFDS folder
data_root = "tfds/imdb_reviews/plain_text/1.0.0/"
example_gen = ImportExampleGen(input_base=data_root, 
                               output_config=output)

context.run(example_gen)

## TensorFlow Data Validation

In [None]:
%%skip_for_export

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

context.show(statistics_gen.outputs['statistics'])

In [None]:
%%skip_for_export

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)
context.run(schema_gen)

context.show(schema_gen.outputs['schema'])

In [None]:
%%skip_for_export

# check the data schema for the type of input tensors
tfdv.load_schema_text(schema_gen.outputs['schema'].get()[0].uri + "/schema.pbtxt")

In [None]:
%%skip_for_export

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

context.show(example_validator.outputs['anomalies'])

## TensorFlow Transform

This is where we perform the BERT processing. 


In [None]:
%%skip_for_export
%%writefile transform.py

import tensorflow as tf
import tensorflow_text as text
import tensorflow_hub as hub

MAX_SEQ_LEN = 512  # max number is 512
BERT_TFHUB_URL = "https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/3"

def load_bert_layer(model_url=BERT_TFHUB_URL):
    # Load the pre-trained BERT model as layer in Keras
    bert_layer = hub.KerasLayer(
        handle=model_url,
        trainable=False)
    return bert_layer

do_lower_case = load_bert_layer().resolved_object.do_lower_case.numpy()

def preprocessing_fn(inputs):
    """Preprocess input column of text into transformed columns of.
        * input token ids
        * input mask
        * input type ids
    """

    CLS_ID = tf.constant(101, dtype=tf.int64)
    SEP_ID = tf.constant(102, dtype=tf.int64)
    PAD_ID = tf.constant(0, dtype=tf.int64)

    vocab_file_path = load_bert_layer().resolved_object.vocab_file.asset_path
    
    bert_tokenizer = text.BertTokenizer(vocab_lookup_table=vocab_file_path, 
                                        token_out_type=tf.int64, 
                                        lower_case=do_lower_case) 
    
    def tokenize_text(text, sequence_length=MAX_SEQ_LEN):
        """
        Perform the BERT preprocessing from text -> input token ids
        """

        # convert text into token ids
        tokens = bert_tokenizer.tokenize(text)
        
        # flatten the output ragged tensors 
        tokens = tokens.merge_dims(1, 2)[:, :sequence_length]
        
        # Add start and end token ids to the id sequence
        start_tokens = tf.fill([tf.shape(text)[0], 1], CLS_ID)
        end_tokens = tf.fill([tf.shape(text)[0], 1], SEP_ID)
        tokens = tokens[:, :sequence_length - 2]
        tokens = tf.concat([start_tokens, tokens, end_tokens], axis=1)

        # truncate sequences greater than MAX_SEQ_LEN
        tokens = tokens[:, :sequence_length]

        # pad shorter sequences with the pad token id
        tokens = tokens.to_tensor(default_value=PAD_ID)
        pad = sequence_length - tf.shape(tokens)[1]
        tokens = tf.pad(tokens, [[0, 0], [0, pad]], constant_values=PAD_ID)

        # and finally reshape the word token ids to fit the output 
        # data structure of TFT  
        return tf.reshape(tokens, [-1, sequence_length])

    def preprocess_bert_input(text):
        """
        Convert input text into the input_word_ids, input_mask, input_type_ids
        """
        input_word_ids = tokenize_text(text)
        input_mask = tf.cast(input_word_ids > 0, tf.int64)
        input_mask = tf.reshape(input_mask, [-1, MAX_SEQ_LEN])
        
        zeros_dims = tf.stack(tf.shape(input_mask))
        input_type_ids = tf.fill(zeros_dims, 0)
        input_type_ids = tf.cast(input_type_ids, tf.int64)

        return (
            input_word_ids, 
            input_mask,
            input_type_ids
        )

    input_word_ids, input_mask, input_type_ids = \
        preprocess_bert_input(tf.squeeze(inputs['text'], axis=1))

    return {
        'input_word_ids': input_word_ids,
        'input_mask': input_mask,
        'input_type_ids': input_type_ids,
        'label': inputs['label']
    }

In [None]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath("transform.py"))
context.run(transform)

#### Check the Output Data Struture of the TF Transform Operation

In [None]:
from tfx_bsl.coders.example_coder import ExampleToNumpyDict

pp = pprint.PrettyPrinter()

# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = transform.outputs['transformed_examples'].get()[0].uri

print(train_uri)

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_folders = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]
tfrecord_filenames = []
for tfrecord_folder in tfrecord_folders:
    for name in os.listdir(tfrecord_folder):
        tfrecord_filenames.append(os.path.join(tfrecord_folder, name))


# Create a TFRecordDataset to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

for tfrecord in dataset.take(1):
    serialized_example = tfrecord.numpy()
    example = ExampleToNumpyDict(serialized_example)
    pp.pprint(example)

## Training of the Keras Model

In [None]:
%%skip_for_export
%%writefile trainer.py

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from typing import Text

import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
import tensorflow_hub as hub

LABEL_KEY = 'label'
BERT_TFHUB_URL = "https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/3"

def load_bert_layer(model_url=BERT_TFHUB_URL):
    # Load the pre-trained BERT model as layer in Keras
    bert_layer = hub.KerasLayer(
        handle=model_url,
        trainable=False)
    return bert_layer

def _gzip_reader_fn(filenames):
    """Small utility returning a record reader that can read gzip'ed files."""
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')

def get_model(tf_transform_output, max_seq_length=512):

    # dynamically create inputs for all outputs of our transform graph
    feature_spec = tf_transform_output.transformed_feature_spec()  
    feature_spec.pop(LABEL_KEY)

    inputs = {
        key: tf.keras.layers.Input(shape=(max_seq_length), name=key, dtype=tf.int64)
            for key in feature_spec.keys()
    }

    input_word_ids = tf.cast(inputs["input_word_ids"], dtype=tf.int32)
    input_mask = tf.cast(inputs["input_mask"], dtype=tf.int32)
    input_type_ids = tf.cast(inputs["input_type_ids"], dtype=tf.int32)

    bert_layer = load_bert_layer()
    encoder_inputs = dict(
        input_word_ids=tf.reshape(input_word_ids, (-1, max_seq_length)),
        input_mask=tf.reshape(input_mask, (-1, max_seq_length)),
        input_type_ids=tf.reshape(input_type_ids, (-1, max_seq_length)),
    )
    outputs = bert_layer(encoder_inputs)
    
    # Add additional layers depending on your problem
    x = tf.keras.layers.Dense(256, activation='relu')(outputs["pooled_output"])
    dense = tf.keras.layers.Dense(64, activation='relu')(x)
    pred = tf.keras.layers.Dense(1, activation='sigmoid')(dense)

    keras_model = tf.keras.Model(
        inputs=[
                inputs['input_word_ids'], 
                inputs['input_mask'], 
                inputs['input_type_ids']], 
        outputs=pred)
    keras_model.compile(loss='binary_crossentropy', 
                        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), 
                        metrics=['accuracy']
                        )
    return keras_model


def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

        transformed_features = model.tft_layer(parsed_features)

        outputs = model(transformed_features)
        return {'outputs': outputs}

    return serve_tf_examples_fn

def _input_fn(file_pattern: Text,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 32) -> tf.data.Dataset:
    """Generates features and label for tuning/training.

    Args:
      file_pattern: input tfrecord file pattern.
      tf_transform_output: A TFTransformOutput.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy())

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=_gzip_reader_fn,
        label_key=LABEL_KEY)

    return dataset

# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
    """Train the model based on given args.

    Args:
      fn_args: Holds args used to train the model as name/value pairs.
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 32)
    eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 32)

    mirrored_strategy = tf.distribute.MirroredStrategy()
    with mirrored_strategy.scope():
        model = get_model(tf_transform_output=tf_transform_output)

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(model,
                                      tf_transform_output).get_concrete_function(
                                          tf.TensorSpec(
                                              shape=[None],
                                              dtype=tf.string,
                                              name='examples')),
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)


In [None]:
# NOTE: Adjust the number of training and evaluation steps when training in an production setup
TRAINING_STEPS = 5
EVALUATION_STEPS = 5

trainer = Trainer(
    module_file=os.path.abspath("trainer.py"),
    # custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=TRAINING_STEPS),
    eval_args=trainer_pb2.EvalArgs(num_steps=EVALUATION_STEPS))
context.run(trainer)

In [None]:
# model_resolver = tfx.dsl.Resolver(
#     instance_name='latest_blessed_model_resolver',
#     resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
#     model=Channel(type=Model),
#     model_blessing=Channel(type=ModelBlessing))

model_resolver = resolver.Resolver(
      strategy_class=tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy.LatestBlessedModelStrategy ,
      model=tfx.types.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.types.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')

context.run(model_resolver)

## TensorFlow Model Evaluation

In [None]:
eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key='label')],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(
                class_name='CategoricalAccuracy',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -1e-2})))
        ])
    ]
)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config
)

context.run(evaluator)

In [None]:
# Check the blessing
!ls {evaluator.outputs['blessing'].get()[0].uri}

## Model Export for Serving

In [None]:
!mkdir /serving_model_dir

serving_model_dir = "/serving_model_dir"

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=serving_model_dir)))

context.run(pusher)

## Test your Exported Model

In [None]:
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

push_uri = pusher.outputs.model_push.get()[0].uri
latest_version = max(os.listdir(push_uri))
latest_version_path = os.path.join(push_uri, latest_version)
loaded_model = tf.saved_model.load(latest_version_path)

example_str = b"I am so sad today, I wish this movie was never released !!!"
example = tf.train.Example(features=tf.train.Features(feature={
    'text': _bytes_feature(example_str)}))

serialized_example = example.SerializeToString()
f = loaded_model.signatures["serving_default"]
print(f(tf.constant([serialized_example])))

## End to End pipeline Setup

In [None]:
# https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/
def create_final_pipeline(
    pipeline_name: Text,
    root_path: Text,
    data_path: Text,
    # training_params: Dict[Text, Text],
    transform_module: Text,
    model_module: Text,
    project_id = "",
    region = "",
    use_vertex_ai = False,
    beam_pipeline_args = None, ## ['--direct_num_workers=%d' % 0],
    vertex_trainer_image = "",
    train_steps = 5,
    eval_steps = 5,    
) -> pipeline.Pipeline:

    TRAINING_STEPS = train_steps
    EVALUATION_STEPS = eval_steps
    
    _pipeline_root = os.path.join(root_path, 'pipelines');      # Join ~/tfx/pipelines/
    _metadata_db_root = os.path.join(root_path, 'metadata.db');    # Join ~/tfx/metadata.db
    _log_root = os.path.join(root_path, 'logs');
    _model_root = os.path.join(root_path, 'model');
    _serving_model_dir = os.path.join(root_path, 'serving_model')

    # Full pipeline
    # example_gen = CsvExampleGen(input_base=data_path)
    output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=45),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=5)
             ]))
    
    # Load the data from our prepared TFDS folder
    # data_path = "tfds/imdb_reviews/plain_text/1.0.0/"
    example_gen = ImportExampleGen(input_base=data_path, 
                                   output_config=output)

    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

    schema_gen = SchemaGen(
                    statistics=statistics_gen.outputs['statistics'],
                    infer_feature_shape=True)

    example_validator = ExampleValidator(
                    statistics=statistics_gen.outputs['statistics'],
                    schema=schema_gen.outputs['schema'])

    # transform = Transform(
    #     examples=example_gen.outputs['examples'],
    #     schema=schema_gen.outputs['schema'],
    #     force_tf_compat_v1=True,
    #     module_file=transform_module)
    
    transform = Transform(
            examples=example_gen.outputs['examples'],
            schema=schema_gen.outputs['schema'],
            module_file=transform_module)

    ##Don't use Vertex AI services
    if use_vertex_ai == False:
        # tuner = Tuner(
        #         module_file=model_module,
        #         # custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
        #         examples=transform.outputs['transformed_examples'],
        #         train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=100), ##goes as train_steps / steps_per_epoch
        #         eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=3), ##goes as eval_steps
        #         transform_graph=transform.outputs['transform_graph'],
        #         custom_config=({"epochs": 10}))
        
        # trainer = Trainer(
        #             custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
        #             module_file=model_module,
        #             transformed_examples=transform.outputs['transformed_examples'],
        #             # schema=schema_importer.outputs.result,
        #             transform_graph=transform.outputs['transform_graph'],
        #             hyperparameters=tuner.outputs['best_hyperparameters'],
        #             custom_config=training_params,
        #             train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=100),
        #             eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=3))
        trainer = Trainer(
            module_file=os.path.abspath("trainer.py"),
            # custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
            examples=transform.outputs['transformed_examples'],
            transform_graph=transform.outputs['transform_graph'],
            schema=schema_gen.outputs['schema'],
            train_args=trainer_pb2.TrainArgs(num_steps=TRAINING_STEPS),
            eval_args=trainer_pb2.EvalArgs(num_steps=EVALUATION_STEPS))
        
        model_resolver = resolver.Resolver(
              strategy_class=tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy.LatestBlessedModelStrategy ,
              model=tfx.types.Channel(type=tfx.types.standard_artifacts.Model),
              model_blessing=tfx.types.Channel(
                  type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
                      'latest_blessed_model_resolver')
        
        eval_config = tfma.EvalConfig(
            model_specs=[tfma.ModelSpec(label_key='label')],
            slicing_specs=[tfma.SlicingSpec()],
            metrics_specs=[
                tfma.MetricsSpec(metrics=[
                    tfma.MetricConfig(
                        class_name='CategoricalAccuracy',
                        threshold=tfma.MetricThreshold(
                            value_threshold=tfma.GenericValueThreshold(
                                lower_bound={'value': 0.3}),
                            change_threshold=tfma.GenericChangeThreshold(
                                direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                absolute={'value': -1e-2})))
                ])
            ]
        )

        evaluator = Evaluator(
            examples=example_gen.outputs['examples'],
            model=trainer.outputs['model'],
            baseline_model=model_resolver.outputs['model'],
            eval_config=eval_config
        )
        
        ## Push without evaluation for now
        pusher = Pusher(
            model=trainer.outputs['model'],
            # model_blessing=evaluator.outputs['blessing'],
            push_destination=pusher_pb2.PushDestination(
                filesystem=pusher_pb2.PushDestination.Filesystem(
                    base_directory=_serving_model_dir))
        )
        
    else:
        ##Trainer
        vertex_trainer_job_spec = {
              'project': project_id,
              'worker_pool_specs': [{
                  'machine_spec': {
                      'machine_type': 'n1-standard-4',
                      'accelerator_type': 'NVIDIA_TESLA_K80', ##GPU Training
                      'accelerator_count': 1,
                  },
                  'replica_count': 1,
                  'container_spec': {
                      'image_uri': vertex_trainer_image,
                  },
              }],
          }
        
        trainer = gcp_trainer.Trainer(
                      module_file=model_module,
                      examples=transform.outputs['transformed_examples'],
                      train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=TRAINING_STEPS),
                      eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=EVALUATION_STEPS),
                      # hyperparameters=tuner.outputs['best_hyperparameters'],
                      transform_graph=transform.outputs['transform_graph'],
                      custom_config={
                          ENABLE_VERTEX_KEY:
                              True,
                          VERTEX_REGION_KEY:
                              region,
                          tfx.extensions.google_cloud_ai_platform.trainer.executor.TRAINING_ARGS_KEY:
                              vertex_trainer_job_spec,
                          # "epochs": training_params["epochs"]
                      })

        # NEW: Pushes the model to Vertex AI Endpoints
        # NEW: Configuration for pusher.
        vertex_serving_spec = {
          'project_id': project_id,
          'endpoint_name': 'tfx_bertclassif_ep',
          'machine_type': 'n1-standard-4',
        }
        
        serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest'
        pusher = gcp_pusher.Pusher(
              model=trainer.outputs['model'],
              custom_config={
                  ENABLE_VERTEX_KEY:
                      True,
                  VERTEX_REGION_KEY:
                      region,
                  tfx.extensions.google_cloud_ai_platform.constants.VERTEX_CONTAINER_IMAGE_URI_KEY:
                      serving_image,
                  tfx.extensions.google_cloud_ai_platform.constants.SERVING_ARGS_KEY:
                    vertex_serving_spec,
              })
    
    # This pipeline obj carries the business logic of the pipeline, but no runner-specific information
    # was included.    
    return pipeline.Pipeline(
        pipeline_name=  pipeline_name,
        pipeline_root=  root_path,
        components=[
            example_gen, 
            statistics_gen, 
            schema_gen, 
            example_validator,
            transform, 
            trainer,
            # model_resolver,
            # evaluator,
            pusher
        ],
        metadata_connection_config = metadata.sqlite_metadata_connection_config(_metadata_db_root),
        enable_cache=False,
        beam_pipeline_args=beam_pipeline_args,
    )

## Run pipeline in Local

In [None]:
#First Run full pipeline locally
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

##Define all paths
_tfx_root = os.path.join(os.getcwd(), 'tfx')
_transform_module_file = 'transform.py'
_model_trainer_module_file = 'trainer.py'

#Config params
# training_params = {"epochs": 20}

#Create and run pipeline
p_ = create_final_pipeline(root_path = _tfx_root, 
                           pipeline_name="local_pipeline", 
                           data_path=os.path.join(os.getcwd(), "tfds/imdb_reviews/plain_text/1.0.0/"),
                           # training_params=training_params,
                           transform_module=os.path.abspath(_transform_module_file),
                           model_module=os.path.abspath(_model_trainer_module_file)
                          )

## UNCOMMENT: To test Local pipeline( only for DEBUGGING )
LocalDagRunner().run(p_)

## TFX Pipeline on GCP deployment - Pre-steps

- Build Dockerfile for TFX base image
- Upload image to GAR
- Copy code and data to GCS for usage in pipeline

In [None]:
%%writefile Dockerfile
##Build Dockerfile
FROM tensorflow/tfx:1.12.0

COPY requirements.txt .
RUN pip install -r requirements.txt

In [None]:
## Build Docker image
!docker build -t us-central1-docker.pkg.dev/hasanrafiq-test-331814/tfx-images/latest:latest -f Dockerfile .

#Get access to write to GAR
!yes | gcloud auth configure-docker us-central1-docker.pkg.dev

## Push Docker image to GAR
!docker push us-central1-docker.pkg.dev/hasanrafiq-test-331814/tfx-images/latest:latest

In [None]:
%%bash
# move necessary files to GCS
PROJECT_ID="hasanrafiq-test-331814" 
BUCKET_NAME="gs://gcs-${PROJECT_ID}"

gsutil cp trainer.py ${BUCKET_NAME}/tfx/bert-classif/code/
gsutil cp transform.py ${BUCKET_NAME}/tfx/bert-classif/code/

In [None]:
%%bash
# move necessary files to GCS
PROJECT_ID="hasanrafiq-test-331814" 
BUCKET_NAME="gs://gcs-${PROJECT_ID}"

gsutil cp tfds/imdb_reviews/plain_text/1.0.0/* ${BUCKET_NAME}/tfx/bert-classif/data/

## End to End pipeline - Vertex Pipeline( w/o GCP ML services )

In [None]:
PIPELINE_NAME = 'tfx-bert-classif'
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '-pipeline.json'
GOOGLE_CLOUD_REGION = 'us-central1'

PROJECT_ID = "hasanrafiq-test-331814" 
BUCKET_NAME = "gs://gcs-" + PROJECT_ID
PIPELINE_ROOT = f"{BUCKET_NAME}"

TRANSFORM_MODULE = f"{BUCKET_NAME}/tfx/bert-classif/code/transform.py"
MODEL_MODULE = f"{BUCKET_NAME}/tfx/bert-classif/code/trainer.py"

In [None]:
import os
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

# training_params = {"epochs": 20}
use_dataflow_services = False  ##If need DataFlow for dataprocessing

##Define all paths
_tfx_root = os.path.join(PIPELINE_ROOT, 'tfx')

##Pickup runner as per value of "use_cloud_services"
runners = {True : 'DataflowRunner', False : 'DirectRunner'}

##Dataflow configuration
beam_pipeline_args = [
    f'--runner={runners[use_dataflow_services]}',
    f'--project={PROJECT_ID}',
    f'--temp_location={BUCKET_NAME}/tmp',
    f'--region={GOOGLE_CLOUD_REGION}',
    f'--staging_location={BUCKET_NAME}/staging'
]

##Set Vertex Runner
# KFP V2 Runner - Vertex
custom_container = "us-central1-docker.pkg.dev/hasanrafiq-test-331814/tfx-images/latest:latest"
gcp_runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
            default_image=custom_container),
    output_filename=PIPELINE_DEFINITION_FILE)

#KFP V1 Runner - AI Platform
# runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
#       kubeflow_metadata_config=kubeflow_dag_runner.get_default_kubeflow_metadata_config())
# gcp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config)

p_ = create_final_pipeline(root_path = _tfx_root, 
                           pipeline_name=PIPELINE_NAME, 
                           data_path=os.path.join(_tfx_root, "bert-classif/data"),
                           # training_params=training_params,
                           beam_pipeline_args=beam_pipeline_args,
                           transform_module=TRANSFORM_MODULE,
                           model_module=MODEL_MODULE,
                           project_id=PROJECT_ID,
                           region=GOOGLE_CLOUD_REGION,
                           use_vertex_ai=False
                          )

# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = gcp_runner.run(p_)

In [None]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME,
                                pipeline_root=os.path.join(PIPELINE_ROOT, 'pipeline_root'))
job.run(sync=False)

## End to End pipeline - Vertex Pipeline( with GCP ML services )

In [None]:
import os
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

# training_params = {"epochs": 20}
use_dataflow_services = False  ##If need DataFlow for dataprocessing

##Define all paths
_tfx_root = os.path.join(PIPELINE_ROOT, 'tfx')

##Pickup runner as per value of "use_cloud_services"
runners = {True : 'DataflowRunner', False : 'DirectRunner'}

##Dataflow configuration
beam_pipeline_args = [
    f'--runner={runners[use_dataflow_services]}',
    f'--project={PROJECT_ID}',
    f'--temp_location={BUCKET_NAME}/tmp',
    f'--region={GOOGLE_CLOUD_REGION}',
    f'--staging_location={BUCKET_NAME}/staging'
]

##Set Vertex Runner
# KFP V2 Runner - Vertex
custom_container = "us-central1-docker.pkg.dev/hasanrafiq-test-331814/tfx-images/latest:latest"
gcp_runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
            default_image=custom_container),
    output_filename=PIPELINE_DEFINITION_FILE)

#KFP V1 Runner - AI Platform
# runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
#       kubeflow_metadata_config=kubeflow_dag_runner.get_default_kubeflow_metadata_config())
# gcp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config)

p_ = create_final_pipeline(root_path = _tfx_root, 
                           pipeline_name=PIPELINE_NAME, 
                           data_path=os.path.join(_tfx_root, "bert-classif/data"),
                           # training_params=training_params,
                           beam_pipeline_args=beam_pipeline_args,
                           transform_module=TRANSFORM_MODULE,
                           model_module=MODEL_MODULE,
                           project_id=PROJECT_ID,
                           region=GOOGLE_CLOUD_REGION,
                           use_vertex_ai=True,
                           vertex_trainer_image=custom_container,
                           train_steps = 20,
                           eval_steps = 5
                          )

# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = gcp_runner.run(p_)

In [None]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

aiplatform.init(location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME,
                                pipeline_root=os.path.join(PIPELINE_ROOT, 'pipeline_root'))
job.run(sync=False)