##### Copyright &copy; 2019 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Using the KFP Orchestrator for TFX

The notebook demonstrates how to define, deploy and run the TFX pipeline that utilizes Google Cloud Platform's (GCP) Cloud AI Platfom (CAIP) services for processing and Kubeflow Pipelines (KFP) for orchestrations. In this example, KFP is running on Google Kubernetes (GKE) Engine and utilizes Cloud SQL for ML Metadata and Google Cloud Storage (GCS) for artifact store.

### Import packages
We import necessary packages, required to define, compile and deploy the pipeline.

In [None]:
import os
import tempfile
import urllib

import tensorflow as tf

import tfx
import kfp
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.proto import evaluator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.orchestration.kubeflow.proto import kubeflow_pb2

from tfx.extensions.google_cloud_ai_platform.trainer import executor as ai_platform_trainer_executor  # pylint: disable=g-import-not-at-top
from tfx.extensions.google_cloud_ai_platform.pusher import executor as ai_platform_pusher_executor  # pylint: disable=g-import-not-at-top

from tfx.utils.dsl_utils import external_input

from tfx.components.base import executor_spec

from typing import Dict, List, Text


Check the versions

In [None]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

### Download example data
We download the sample dataset for use in our TFX pipeline.  We're working with a variant of the [Online News Popularity](https://archive.ics.uci.edu/ml/datasets/online+news+popularity) dataset, which summarizes a heterogeneous set of features about articles published by Mashable in a period of two years. The goal is to predict how popular the article will be on social networks. Specifically, in the original dataset the objective was to predict the number of times each article will be shared on social networks. In this variant, the goal is to predict the article's popularity percentile. For example, if the model predicts a score of 0.7, then it means it expects the article to be shared more than 70% of all articles.

In [None]:
# Download the example data.
DATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \
   'online_news_popularity_for_course/online_news_popularity_for_course.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

Take a quick look at the CSV file

In [None]:
!head {_data_filepath}

### Upload the dataset to GCS

The pipeline will execute on GKE running on GCP and will utilize Cloud services, including Dataflow, AI Platform Training, and AI Platform Prediction. To make the dataset accessible to the pipeline and the services you need to upload it to GCS.

In [None]:
_artifact_store_bucket = 'tfw-demo-artifact-store'
_gcs_data_root = 'gs://{}/{}/'.format(_artifact_store_bucket, 'data')

!gsutil cp $_data_filepath $_gcs_data_root

### Create the Transform and Trainer routines

Create a file containing routines utilized by the Transform and Trainer components. To simplify the demo configuration we define both Transform and Trainer routines in a single file.

In [None]:
_module_file = 'transform_train.py'

In [None]:
%%writefile {_module_file}

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
from tensorflow_transform.tf_metadata import schema_utils

DENSE_FLOAT_FEATURE_KEYS = [
    "timedelta", "n_tokens_title", "n_tokens_content",
    "n_unique_tokens", "n_non_stop_words", "n_non_stop_unique_tokens",
    "n_hrefs", "n_self_hrefs", "n_imgs", "n_videos", "average_token_length",
    "n_keywords", "kw_min_min", "kw_max_min", "kw_avg_min", "kw_min_max",
    "kw_max_max", "kw_avg_max", "kw_min_avg", "kw_max_avg", "kw_avg_avg",
    "self_reference_min_shares", "self_reference_max_shares",
    "self_reference_avg_shares", "is_weekend", "global_subjectivity",
    "global_sentiment_polarity", "global_rate_positive_words",
    "global_rate_negative_words", "rate_positive_words", "rate_negative_words",
    "avg_positive_polarity", "min_positive_polarity", "max_positive_polarity",
    "avg_negative_polarity", "min_negative_polarity", "max_negative_polarity",
    "title_subjectivity", "title_sentiment_polarity", "abs_title_subjectivity",
    "abs_title_sentiment_polarity"]

VOCAB_FEATURE_KEYS = ["data_channel"]

BUCKET_FEATURE_KEYS = ["LDA_00", "LDA_01", "LDA_02", "LDA_03", "LDA_04"]

CATEGORICAL_FEATURE_KEYS = ["weekday"]

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [6]

#UNUSED: date, slug

LABEL_KEY = "n_shares_percentile"
VOCAB_SIZE = 10
OOV_SIZE = 5
FEATURE_BUCKET_COUNT = 10

def transformed_name(key):
  return key + '_xf'

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=VOCAB_SIZE,
        num_oov_buckets=OOV_SIZE)

  for key in BUCKET_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[transformed_name(key)] = _fill_in_missing(inputs[key])

  # How popular is this article?
  outputs[transformed_name(LABEL_KEY)] = _fill_in_missing(inputs[LABEL_KEY])

  return outputs

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

def transformed_names(keys):
  return [transformed_name(key) for key in keys]


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _build_estimator(config, hidden_units=None, warm_start_from=None):
  """Build an estimator for predicting the popularity of online news articles

  Args:
    config: tf.estimator.RunConfig defining the runtime environment for the
      estimator (including model_dir).
    hidden_units: [int], the layer sizes of the DNN (input layer first)
    warm_start_from: Optional directory to warm start from.

  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)
      for key in transformed_names(VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)
      for key in transformed_names(BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              transformed_names(CATEGORICAL_FEATURE_KEYS),
              MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  return tf.estimator.DNNLinearCombinedRegressor(
      config=config,
      linear_feature_columns=categorical_columns,
      dnn_feature_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25],
      warm_start_from=warm_start_from)


def _example_serving_receiver_fn(tf_transform_output, schema):
  """Build the serving in inputs.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    Tensorflow graph which parses examples, applying tf-transform to them.
  """
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_feature_spec.pop(LABEL_KEY)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(
      serving_input_receiver.features)

  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(tf_transform_output, schema):
  """Build everything needed for the tf-model-analysis to run the model.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    EvalInputReceiver function, which contains:
      - Tensorflow graph which parses raw untransformed features, applies the
        tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  """
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  features = serving_input_receiver.features.copy()
  transformed_features = tf_transform_output.transform_raw_features(features)
  
  # NOTE: Model is driven by transformed features (since training works on the
  # materialized output of TFT, but slicing will happen on raw features.
  features.update(transformed_features)

  return tfma.export.EvalInputReceiver(
      features=features,
      receiver_tensors=serving_input_receiver.receiver_tensors,
      labels=transformed_features[transformed_name(LABEL_KEY)])


def _input_fn(filenames, tf_transform_output, batch_size=200):
  """Generates features and labels for training or evaluation.

  Args:
    filenames: [str] list of CSV files to read data from.
    tf_transform_output: A TFTransformOutput.
    batch_size: int First dimension size of the Tensors returned by input_fn

  Returns:
    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

  transformed_features = dataset.make_one_shot_iterator().get_next()
  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(
      transformed_name(LABEL_KEY))


# TFX will call this function
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.
  Args:
    hparams: Holds hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.
  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  train_batch_size = 40
  eval_batch_size = 40

  tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(
      hparams.train_files,
      tf_transform_output,
      batch_size=train_batch_size)

  eval_input_fn = lambda: _input_fn(
      hparams.eval_files,
      tf_transform_output,
      batch_size=eval_batch_size)

  train_spec = tf.estimator.TrainSpec(
      train_input_fn,
      max_steps=hparams.train_steps)

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_output, schema)

  exporter = tf.estimator.FinalExporter('online-news', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='online-news-eval')

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=999, keep_checkpoint_max=1)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = _build_estimator(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ],
      config=run_config,
      warm_start_from=hparams.warm_start_from)

  # Create an input receiver for TFMA processing
  receiver_fn = lambda: _eval_input_receiver_fn(
      tf_transform_output, schema)

  return {
      'estimator': estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn
  }

### Copy the module file to GCS
The Transform and Trainer routines need access to the file at runtime. 

In [None]:
_gcs_module_file = 'gs://{}/{}/{}'.format(_artifact_store_bucket, 'module', _module_file)

!gsutil cp $_module_file $_gcs_module_file

## Define the Pipeline

The next step is to create a function defining the workflow. As you examine the function, you will notice that the Trainer and Pusher components are configured to use AI Platform Training and AI Platform Prediction and other components are configured to utilize Dataflow.


In [None]:
def _create_pipeline(
    pipeline_name: Text, 
    pipeline_root: Text, 
    data_root: Text,
    module_file: Text,
    beam_pipeline_args: List[Text],
    ai_platform_training_args: Dict[Text, Text],
    ai_platform_serving_args: Dict[Text, Text]) -> pipeline.Pipeline:
  """Implements the online news pipeline with TFX."""

  examples = external_input(data_root)

  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input_base=examples)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)

  # Generates schema based on statistics files.
  infer_schema = SchemaGen(
      stats=statistics_gen.outputs.output)

  # Performs anomaly detection based on statistics and data schema.
  validate_stats = ExampleValidator(
      stats=statistics_gen.outputs.output, schema=infer_schema.outputs.output)

  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      input_data=example_gen.outputs.examples,
      schema=infer_schema.outputs.output,
      module_file=module_file)

  # Uses user-provided Python function that implements a model using
  # TensorFlow's Estimators API.
  trainer = Trainer(
      custom_executor_spec=executor_spec.ExecutorClassSpec(
          ai_platform_trainer_executor.Executor),
      module_file=module_file,
      transformed_examples=transform.outputs.transformed_examples,
      schema=infer_schema.outputs.output,
      transform_output=transform.outputs.transform_output,
      train_args=trainer_pb2.TrainArgs(num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(num_steps=5000),
      custom_config={'ai_platform_training_args': ai_platform_training_args})

  # Uses TFMA to compute a evaluation statistics over features of a model.
  model_analyzer = Evaluator(
      examples=example_gen.outputs.examples,
      model_exports=trainer.outputs.output,
      feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[
          evaluator_pb2.SingleSlicingSpec(
              column_for_slicing=['weekday'])
      ]))

  # Performs quality validation of a candidate model (compared to a baseline).
  model_validator = ModelValidator(
      examples=example_gen.outputs.examples, model=trainer.outputs.output)

  # Checks whether the model passed the validation steps and pushes the model
  # to a file destination if check passed.
  pusher = Pusher(
      custom_executor_spec=executor_spec.ExecutorClassSpec(
         ai_platform_pusher_executor.Executor),
      model_export=trainer.outputs.output,
      model_blessing=model_validator.outputs.blessing,
      custom_config={'ai_platform_serving_args': ai_platform_serving_args})

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen, statistics_gen, infer_schema, validate_stats, transform,
          trainer, model_analyzer, model_validator, #pusher
      ],
      # enable_cache=True,
      beam_pipeline_args=beam_pipeline_args
  )

### Compile and deploy the pipeline
You will now configure the GCP services utilized by the pipeline and then compile the pipeline to a YAML format utilized by Kubeflow Pipelines.

#### Configure GCP services

In [None]:
# GCP project and region to be used by AI Platform services
_project_id = 'jk-tfw-demo'
_gcp_region = 'us-central1'

# Connection setting to Cloud SQL based ML Metadata
_metadata_config = kubeflow_pb2.KubeflowMetadataConfig()
_metadata_config.mysql_db_service_host.environment_variable = 'MYSQL_SERVICE_HOST'
_metadata_config.mysql_db_service_port.environment_variable = 'MYSQL_SERVICE_PORT'
_metadata_config.mysql_db_name.value = 'metadb'
_metadata_config.mysql_db_user.value = 'root' 
_metadata_config.mysql_db_password.value = ''

# GCS folder to be used to output the artifacts generated by the pipeline
_pipeline_name = 'online_news_pipeline'
_pipeline_root = 'gs://{}/{}/'.format(_artifact_store_bucket, _pipeline_name)

# Dataflow settings
_beam_tmp_folder = 'gs://{}/beam/tmp'.format(_artifact_store_bucket)
_beam_pipeline_args = [
    '--runner=DataflowRunner',
    '--experiments=shuffle_mode=auto',
    '--project=' + _project_id,
    '--temp_location=' + _beam_tmp_folder,
    '--region=' + _gcp_region,
]

# AI Platform Training settings
# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job
_ai_platform_training_args = {
    'project': _project_id,
    'region': _gcp_region
}

# AI Platform Prediction settings
# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models
_model_name = 'online_news'
_ai_platform_serving_args = {
    'project_id': _project_id,
    'model_name': _model_name
}

#### Compile the pipeline

In [None]:
# Define the filename of the generated YAML
_pipeline_file_name = '{}.yaml'.format(_pipeline_name)

# Compile the pipeline
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
    kubeflow_metadata_config=_metadata_config
)
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config, output_filename=_pipeline_file_name).run(
    _create_pipeline(
        pipeline_name=_pipeline_name,
        pipeline_root=_pipeline_root,
        data_root = _gcs_data_root,
        module_file=_gcs_module_file,
        beam_pipeline_args=_beam_pipeline_args,
        ai_platform_training_args=_ai_platform_training_args,
        ai_platform_serving_args=_ai_platform_serving_args,
    ))

#### Upload the pipeline to Kubeflow Pipelines

Get credentials to GKE.

In [None]:
_cluster_name = 'tfw-demo-kfp-cluster'
_cluster_zone = 'us-central1-a'

!gcloud config set project $_project_id
!gcloud container clusters get-credentials $_cluster_name --zone $_cluster_zone

In [None]:
_client = kfp.Client()
_client.upload_pipeline(_pipeline_file_name, _pipeline_name)

### Run the pipeline

You can now trigger the run using the KFP UI or programmatically using `kfp.Client()`. 