##### Copyright &copy; 2019 The TensorFlow Authors.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Using the Apache Beam Orchestrator for TFX

This notebook demonstrates how to use Apache Beam as an orchestrator for TFX. Pipelines are also created in Apache Beam, which means that Beam sequences tasks according to the dependencies of each task, running each task as its dependencies are met.  Beam is also highly scalable and runs tasks in parallel in a distributed environment.  That makes Beam very powerful as an orchestrator for other pipelines, including TFX.

When using the InteractiveContext in a notebook, running each cell orchestrates the creation and running of each of the components in the TFX pipeline.  When using a separate orchestrator, as in this example, the components are only run once the TFX pipeline DAG has been defined and the orchestrator has been triggered to start an execution run.

In this example you will define all the supporting code for the TFX components before instantiating the components and running the TFX pipeline using an Apache Beam orchestrator.  This is the pattern which is typically used in a production deployment of TFX.

## Setup
First, we install the necessary packages, download data, import modules and set up paths.

### Install TFX and TensorFlow

> #### Note
> Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime.  Following restart, you should rerun this cell.

In [0]:
!pip install -q -U \
  tensorflow==2.0.0 \
  tfx==0.15.0rc0

### Import packages
We import necessary packages, including standard TFX component classes.

In [0]:
import os
import tempfile
import urllib

import tensorflow as tf

import tfx
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.proto import evaluator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.utils.dsl_utils import external_input

Check the versions

In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

### Download example data
We download the sample dataset for use in our TFX pipeline.  We're working with a variant of the [Online News Popularity](https://archive.ics.uci.edu/ml/datasets/online+news+popularity) dataset, which summarizes a heterogeneous set of features about articles published by Mashable in a period of two years. The goal is to predict how popular the article will be on social networks. Specifically, in the original dataset the objective was to predict the number of times each article will be shared on social networks. In this variant, the goal is to predict the article's popularity percentile. For example, if the model predicts a score of 0.7, then it means it expects the article to be shared more than 70% of all articles.

In [0]:
# Download the example data.
DATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \
   'online_news_popularity_for_course/online_news_popularity_for_course.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

Take a quick look at the CSV file

In [0]:
!head {_data_filepath}

### Set up pipeline paths

We create the filenames for the Python modules for the Transform and Trainer components, and a directory for the Serving model.

In [0]:
# Set up paths.
_constants_module_file = 'online_news_constants.py'
_transform_module_file = 'online_news_transform.py'
_trainer_module_file = 'online_news_trainer.py'
_serving_model_dir = os.path.join(tempfile.mkdtemp(),
                                  'serving_model/online_news_simple')

Define some constants and functions for both the `Transform` component and the `Trainer` component.  Define them in a Python module, in this case saved to disk using the `%%writefile` magic command since you are working in a notebook.

In [0]:
%%writefile {_constants_module_file}

DENSE_FLOAT_FEATURE_KEYS = [
    "timedelta", "n_tokens_title", "n_tokens_content",
    "n_unique_tokens", "n_non_stop_words", "n_non_stop_unique_tokens",
    "n_hrefs", "n_self_hrefs", "n_imgs", "n_videos", "average_token_length",
    "n_keywords", "kw_min_min", "kw_max_min", "kw_avg_min", "kw_min_max",
    "kw_max_max", "kw_avg_max", "kw_min_avg", "kw_max_avg", "kw_avg_avg",
    "self_reference_min_shares", "self_reference_max_shares",
    "self_reference_avg_shares", "is_weekend", "global_subjectivity",
    "global_sentiment_polarity", "global_rate_positive_words",
    "global_rate_negative_words", "rate_positive_words", "rate_negative_words",
    "avg_positive_polarity", "min_positive_polarity", "max_positive_polarity",
    "avg_negative_polarity", "min_negative_polarity", "max_negative_polarity",
    "title_subjectivity", "title_sentiment_polarity", "abs_title_subjectivity",
    "abs_title_sentiment_polarity"]

VOCAB_FEATURE_KEYS = ["data_channel"]

BUCKET_FEATURE_KEYS = ["LDA_00", "LDA_01", "LDA_02", "LDA_03", "LDA_04"]

CATEGORICAL_FEATURE_KEYS = ["weekday"]

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [6]

#UNUSED: date, slug

LABEL_KEY = "n_shares_percentile"
VOCAB_SIZE = 10
OOV_SIZE = 5
FEATURE_BUCKET_COUNT = 10

def transformed_name(key):
  return key + '_xf'

Now define a module containing the `preprocessing_fn()` function that we will pass to the `Transform` component.

In [0]:
%%writefile {_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft
from online_news_constants import *

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=VOCAB_SIZE,
        num_oov_buckets=OOV_SIZE)

  for key in BUCKET_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[transformed_name(key)] = _fill_in_missing(inputs[key])

  # How popular is this article?
  outputs[transformed_name(LABEL_KEY)] = _fill_in_missing(inputs[LABEL_KEY])

  return outputs

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

Create a Python module containing a `trainer_fn` function, which must return an estimator.  If you prefer creating a Keras model, you can do so and then convert it to an estimator using `keras.model_to_estimator()`.

In [0]:
%%writefile {_trainer_module_file}

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from online_news_constants import *


def transformed_names(keys):
  return [transformed_name(key) for key in keys]


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _build_estimator(config, hidden_units=None, warm_start_from=None):
  """Build an estimator for predicting the popularity of online news articles

  Args:
    config: tf.estimator.RunConfig defining the runtime environment for the
      estimator (including model_dir).
    hidden_units: [int], the layer sizes of the DNN (input layer first)
    warm_start_from: Optional directory to warm start from.

  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)
      for key in transformed_names(VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)
      for key in transformed_names(BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              transformed_names(CATEGORICAL_FEATURE_KEYS),
              MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  return tf.estimator.DNNLinearCombinedRegressor(
      config=config,
      linear_feature_columns=categorical_columns,
      dnn_feature_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25],
      warm_start_from=warm_start_from)


def _example_serving_receiver_fn(tf_transform_output, schema):
  """Build the serving in inputs.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    Tensorflow graph which parses examples, applying tf-transform to them.
  """
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_feature_spec.pop(LABEL_KEY)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(
      serving_input_receiver.features)

  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(tf_transform_output, schema):
  """Build everything needed for the tf-model-analysis to run the model.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    EvalInputReceiver function, which contains:
      - Tensorflow graph which parses raw untransformed features, applies the
        tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  """
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  features = serving_input_receiver.features.copy()
  transformed_features = tf_transform_output.transform_raw_features(features)
  
  # NOTE: Model is driven by transformed features (since training works on the
  # materialized output of TFT, but slicing will happen on raw features.
  features.update(transformed_features)

  return tfma.export.EvalInputReceiver(
      features=features,
      receiver_tensors=serving_input_receiver.receiver_tensors,
      labels=transformed_features[transformed_name(LABEL_KEY)])


def _input_fn(filenames, tf_transform_output, batch_size=200):
  """Generates features and labels for training or evaluation.

  Args:
    filenames: [str] list of CSV files to read data from.
    tf_transform_output: A TFTransformOutput.
    batch_size: int First dimension size of the Tensors returned by input_fn

  Returns:
    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

  transformed_features = dataset.make_one_shot_iterator().get_next()
  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(
      transformed_name(LABEL_KEY))


# TFX will call this function
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.
  Args:
    hparams: Holds hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.
  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  train_batch_size = 40
  eval_batch_size = 40

  tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(
      hparams.train_files,
      tf_transform_output,
      batch_size=train_batch_size)

  eval_input_fn = lambda: _input_fn(
      hparams.eval_files,
      tf_transform_output,
      batch_size=eval_batch_size)

  train_spec = tf.estimator.TrainSpec(
      train_input_fn,
      max_steps=hparams.train_steps)

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_output, schema)

  exporter = tf.estimator.FinalExporter('online-news', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='online-news-eval')

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=999, keep_checkpoint_max=1)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = _build_estimator(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ],
      config=run_config,
      warm_start_from=hparams.warm_start_from)

  # Create an input receiver for TFMA processing
  receiver_fn = lambda: _eval_input_receiver_fn(
      tf_transform_output, schema)

  return {
      'estimator': estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn
  }

## Create the Pipeline

Creating the pipeline defines the dependencies between components and the artifacts that they require as input, which in turn defines the order in which they can be run.  In this example you also use an ML-Metadata database, which is this case is backed by SQL Lite.

In [0]:
_pipeline_name = 'online_news_beam'

_pipeline_root = tempfile.mkdtemp(prefix='tfx-pipelines')
_pipeline_root = os.path.join(_pipeline_root, 'pipelines', _pipeline_name)

# Sqlite ML-metadata db path.
_metadata_root = tempfile.mkdtemp(prefix='tfx-metadata')
_metadata_path = os.path.join(_metadata_root, 'metadata.db')

def _create_pipeline(pipeline_name, pipeline_root, data_root,
                     transform_module_file, trainer_module_file,
                     serving_model_dir, metadata_path):
  """Implements the online news pipeline with TFX."""
  input_data = external_input(data_root)

  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input=input_data)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  infer_schema = SchemaGen(
      statistics=statistics_gen.outputs['statistics'])

  # Performs anomaly detection based on statistics and data schema.
  validate_stats = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=infer_schema.outputs['schema'])

  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=infer_schema.outputs['schema'],
      module_file=transform_module_file)

  # Uses user-provided Python function that implements a model using
  # TensorFlow's Estimators API.
  trainer = Trainer(
      module_file=trainer_module_file,
      transformed_examples=transform.outputs['transformed_examples'],
      schema=infer_schema.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=trainer_pb2.TrainArgs(num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(num_steps=5000))

  # Uses TFMA to compute a evaluation statistics over features of a model.
  model_analyzer = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[
          evaluator_pb2.SingleSlicingSpec(
              column_for_slicing=['weekday'])
      ]))

  # Performs quality validation of a candidate model (compared to a baseline).
  model_validator = ModelValidator(
      examples=example_gen.outputs['examples'], model=trainer.outputs['model'])

  # Checks whether the model passed the validation steps and pushes the model
  # to a file destination if check passed.
  pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=model_validator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen, statistics_gen, infer_schema, validate_stats, transform,
          trainer, model_analyzer, model_validator, pusher
      ],
      enable_cache=True,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      additional_pipeline_args={},
  )

online_news_pipeline = _create_pipeline(
        pipeline_name=_pipeline_name,
        pipeline_root=_pipeline_root,
        data_root=_data_root,
        transform_module_file=_transform_module_file,
        trainer_module_file=_trainer_module_file,
        serving_model_dir=_serving_model_dir,
        metadata_path=_metadata_path)

### Run the pipeline

Create a `BeamDagRunner` and use it to run the pipeline.

>#### Note
> This same pattern is also used to create pipelines with other orchestrators, the only difference here being that a Beam orchestrator is used.  When using a Beam orchestrator running the pipeline also triggers an execution run, while other orchestrators may only load the pipeline and wait for a trigger event.

#### Results

Running this pipeline produces a lot of log messages, which can be instructive to read through.  For example, log messages like these show the sequencing of components through the pipeline.

```
INFO:tensorflow:Component CsvExampleGen is running.
INFO:tensorflow:Run driver for CsvExampleGen
...
INFO:tensorflow:Run executor for CsvExampleGen
...
INFO:tensorflow:Run publisher for CsvExampleGen
...
INFO:tensorflow:Component CsvExampleGen is finished.
INFO:tensorflow:Component StatisticsGen is running.
```

In [0]:
BeamDagRunner().run(online_news_pipeline)