## Content

1. [TFX](#TFX)
2. [Apache Beam](#Apache-Beam)
3. [Data Ingestion](#Data-Ingestion)
    * [Data Loading](#Data-Loading)
    * [Data Preparation](#Data-Preparation)
    * [Versioning Datasets](#Versioning-Datasets)
    * [Ingestion Strategies](#Ingestion-Strategies)

4. [Data Validation](#Data-Validation)
    * [Data Validation](#Data-Validation)
    * [Processing Large Datasets with GCP](#Processing-Large-Datasets-with-GCP)
    * [Integrating TFDV into Your ML Pipeline](#Integrating-TFDV-into-Your-ML-Pipeline)
5. [Data Preprocessing with TFT](#Data-Preprocessing-with-TFT)
6. [Integrate TFT into Your Machine Learning Pipeline](#Integrate-TFT-into-Your-Machine-Learning-Pipeline)

7. [Model training](#Model-training)
    * [The TFX Trainer Component](#The-TFX-Trainer-Component)
    
8. [Model analysis and validation](#Model-analysis-and-validation)
9. [Model deployment](#Model-deployment)

### TFX

In [None]:
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_transform.beam as beam

In [None]:
import tensorflow as tf
from tfx.orchestration.experimental.interactive.interactive_context import \
 InteractiveContext

Once the requirements are imported, you can create a context object. The
context object handles component execution and displays the component’s
artifacts. At this point, the InteractiveContext also sets up a simple inmemory ML MetadataStore:

In [None]:
context = InteractiveContext()

After setting up your pipeline component(s) (e.g., StatisticsGen), you
can then execute each component object through the run function of the
context object, as shown in the following example:

```
from tfx.components import StatisticsGen
statistics_gen = StatisticsGen(
 examples=example_gen.outputs['examples'])
context.run(statistics_gen)
```

The component itself receives the outputs of the previous component (in
our case, the data ingestion component ExampleGen) as an instantiation
argument. After executing the component’s tasks, the component
automatically writes the metadata of the output artifact to the metadata
store.

### Apache Beam

Data pipelines usually start and end with data being read or written, which
is handled in Apache Beam through collections, often called
PCollections. The collections are then transformed, and the final result
can be expressed as a collection again and written to a filesystem.

Similar to the ReadFromText operation, Apache Beam provides functions
to write collections to a text file (e.g., WriteToText).

In [None]:
import apache_beam as beam
with beam.Pipeline() as p:
    lines = p | beam.io.ReadFromText(input_file)

In Apache Beam, data is manipulated through transformations. As we see in
this example and later in Chapter 5, the transformations can be chained by
using the pipe operator |. If you chain multiple transformations of the same
type, you have to provide a name for the operation, noted by the string
identifier between the pipe operator and the right-angle brackets. In the
following example, we apply all transformations sequentially on our lines
extracted from the text file:

In [None]:
counts = (
 lines
 | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
 | 'GroupAndSum' >> beam.CombinePerKey(sum))


You can also apply Python functions as part of a transformation. The
following example shows how the function format_result can be applied
to earlier produced summation results. The function converts the resulting
tuples into a string that then can be written to a text file:

In [None]:
def format_result(word_count):
    """Convert tuples (token, count) into a string"""
    (word, count) = word_count
    return "{}: {}".format(word, count)
output = counts | 'Format' >> beam.Map(format_result)

The previous snippets and following
examples are a modified version of the Apache Beam introduction. For
readability, the example has been reduced to the bare minimum Apache
Beam code:

In [None]:
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


input_file = "gs://dataflow-samples/shakespeare/kinglear.txt"
output_file = "/tmp/output.txt"
# Define pipeline options object.
pipeline_options = PipelineOptions()


with beam.Pipeline(options=pipeline_options) as p:
    # Read the text file or file pattern into a PCollection.
    lines = p | ReadFromText(input_file)
    
    # Count the occurrences of each word.
    counts = (
                lines
                | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum))
    
    # Format the counts into a PCollection of strings.
    
    def format_result(word_count):
        (word, count) = word_count
        return "{}: {}".format(word, count)
    
    output = counts | 'Format' >> beam.Map(format_result)
    # Write the output using a "Write" transform that has side effects.
    output | WriteToText(output_file)

### Data Ingestion

#### Data Loading

In this step of our pipeline, we read data files or request the data for our
pipeline run from an external service (e.g., Google Cloud BigQuery).
Before passing the ingested dataset to the next component, we divide the
available data into separate datasets (e.g., training and validation datasets)
and then convert the datasets into TFRecord files containing the data
represented as tf.Example data structures.

In [None]:
import os
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")
examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples)

context.run(example_gen)

Sometimes our data can’t be expressed efficiently as CSVs (e.g., when we
want to load images for computer vision problems or large corpora for
natural language processing problems). In these cases, it is recommended to
convert the datasets to TFRecord data structures and then load the saved
TFRecord files with the ImportExampleGen component.

In [None]:
import os
from tfx.components import ImportExampleGen
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "tfrecord_data")
examples = external_input(os.path.join(base_dir, data_dir))
example_gen = ImportExampleGen(input=examples)

context.run(example_gen)


Loading custom files. To convert data of any type to TFRecord files, we need to create a
tf.Example structure for every data record in the dataset. tf.Example is a
simple but highly flexible data structure, which is a key-value mapping:
{"string": value}

In the case of TFRecord data structure, a tf.Example expects a
tf.Features object, which accepts a dictionary of features containing keyvalue pairs. The key is always a string identifier representing the feature
column, and the value is a tf.train.Feature object.

To reduce code redundancy, we’ll define helper functions to assist with
converting the data records into the correct data structure used by
tf.Example:

In [None]:
import tensorflow as tf

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

import csv
import tensorflow as tf
original_data_file = os.path.join(
                                    os.pardir, os.pardir, "data",
                                    "consumer-complaints.csv")

tfrecord_filename = "consumer-complaints.tfrecord"
tf_record_writer = tf.io.TFRecordWriter(tfrecord_filename)

with open(original_data_file) as csv_file:
    reader = csv.DictReader(csv_file, delimiter=",", quotechar='"')
    for row in reader:
        
        example = tf.train.Example(features=tf.train.Features(feature={
            "product": _bytes_feature(row["product"]),
            "sub_product": _bytes_feature(row["sub_product"]),
            "issue": _bytes_feature(row["issue"]),
            "sub_issue": _bytes_feature(row["sub_issue"]),
            "state": _bytes_feature(row["state"]),
            "zip_code": _int64_feature(int(float(row["zip_code"]))),
            "company": _bytes_feature(row["company"]),
            "company_response": _bytes_feature(row["company_response"]),
            "consumer_complaint_narrative": \
            _bytes_feature(row["consumer_complaint_narrative"]),
            "timely_response": _bytes_feature(row["timely_response"]),
            "consumer_disputed": _bytes_feature(row["consumer_disputed"]),
        }))
        tf_record_writer.write(example.SerializeToString())
    tf_record_writer.close()

Remote data files. The ExampleGen component can read files from remote cloud storage
buckets like Google Cloud Storage or AWS Simple Storage Service (S3).
TFX users can provide the bucket path to the external_input function, as
shown in the following example:

In [None]:
examples = external_input("gs://example_compliance_data/")
example_gen = CsvExampleGen(input=examples)

Databases. TFX provides two components to ingest datasets directly from databases. In
the following sections, we introduce the BigQueryExampleGen component
to query data from BigQuery tables and the PrestoExampleGen component
to query data from Presto databases.

In [None]:
# Google Cloud’s BigQuery tables

from tfx.components import BigQueryExampleGen
query = """
         SELECT * FROM `<project_id>.<database>.<table_name>`
        """
example_gen = BigQueryExampleGen(query=query)

# Presto databases

from proto import presto_config_pb2
from presto_component.component import PrestoExampleGen

query = """
         SELECT * FROM `<project_id>.<database>.<table_name>`
        """

presto_config = presto_config_pb2.PrestoConnConfig(
        host='localhost',
        port=8080)

example_gen = PrestoExampleGen(presto_config, query=query)

#### Data Preparation

Each of the introduced ExampleGen components allows us to configure
input settings (input_config) and output settings (output_config) for our
dataset. If we would like to ingest datasets incrementally, we can define a
span as the input configuration. At the same time, we can configure how the
data should be split.

Splitting one dataset into subsets. The following example shows how we can extend our data ingestion by
requiring a three-way split: training, evaluation, and test sets with a ratio of
6:2:2. 

In [None]:
from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")

output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
    example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
    example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
    example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
    ]))

examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, output_config=output)
context.run(example_gen)

Preserving existing splits. In some situations, we have already generated the subsets of the datasets
externally, and we would like to preserve these splits when we ingest the
datasets. We can achieve this by providing an input configuration.

For the following configuration, let’s assume that our dataset has been split
externally and saved in subdirectories:
```
└── data
   ├── train
   |     └─ 20k-consumer-complaints-training.csv
   ├── eval
   │     └─ 4k-consumer-complaints-eval.csv
   └── test
         └─ 2k-consumer-complaints-test.csv
```

In [None]:
import os
from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input
base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")

input = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='train', pattern='train/*'),
    example_gen_pb2.Input.Split(name='eval', pattern='eval/*'),
    example_gen_pb2.Input.Split(name='test', pattern='test/*')
])

examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, input_config=input)

Spanning Datasets. One of the significant use cases for machine learning pipelines is that we
can update our machine learning models when new data becomes available.
For this scenario, the ExampleGen component allows us to use spans. Think
of a span as a snapshot of data. Every hour, day, or week, a batch extract,
transform, load (ETL) process could make such a data snapshot and create
a new span.

```
└── data
   ├── export0
   |     └─ 20k-consumer-complaints-training.csv
   ├── export1
   │     └─ 4k-consumer-complaints-eval.csv
   └── export2
         └─ 2k-consumer-complaints-test.csv
```

In [None]:
from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")

input = example_gen_pb2.Input(splits=[
     example_gen_pb2.Input.Split(pattern='export{SPAN}/*')
])

# or

input = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='train',
                                pattern='export{SPAN}/train/*'),
    example_gen_pb2.Input.Split(name='eval',
                                pattern='export{SPAN}/eval/*')
])

examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, input_config=input)
context.run(example_gen)

#### Versioning Datasets

In machine learning pipelines, we want to track the produced models
together with the used datasets, which were used to train the machine
learning model. To do this, it is useful to version our datasets.

Data versioning allows us to track the ingested data in more detail. This
means that we not only store the file name and path of the ingested data in
the ML MetadataStore (because it’s currently supported by the TFX
components) but also that we track more metainformation about the raw
dataset, such as a hash of the ingested data. Such version tracking would
allow us to verify that the dataset used during the training is still the dataset
at a later point in time. Such a feature is critical for end-to-end ML
reproducibility.


However, such a feature is currently not supported by the TFX ExampleGen
component. If you would like to version your datasets, you can use thirdparty data versioning tools and version the data before the datasets are
ingested into the pipeline.

If you would like to version your datasets, you can use one of the following
tools:
1. Data Version Control (DVC).
DVC is an open source version control system for machine learning
projects. It lets you commit hashes of your datasets instead of the entire
dataset itself. Therefore, the state of the dataset is tracked (e.g., via
git), but the repository isn’t cluttered with the entire dataset.
2. Pachyderm.
Pachyderm is an open source machine learning platform running on
Kubernetes. It originated with the concept of versioning for data (“Git
for data”) but has now expanded into an entire data platform, including
pipeline orchestration based on data versions.

### Ingestion Strategies

##### Structured Data
Structured data is often stored in a database or on a disk in file format,
supporting tabular data. If the data exists in a database, we can either export
it to CSVs or consume the data directly with the PrestoExampleGen or the
BigQueryExampleGen components (if the services are available).
Data available on a disk stored in file formats supporting tabular data
should be converted to CSVs and ingested into the pipeline with the
CsvExampleGen component. Should the amount of data grow beyond a few
hundred megabytes, you should consider converting the data into TFRecord
files or store the data with Apache Parquet.
##### Text Data for Natural Language Problems
Text corpora can snowball to a considerable size. To ingest such datasets
efficiently, we recommend converting the datasets to TFRecord or Apache
Parquet representations. Using performant data file types allows an efficient
and incremental loading of the corpus documents. The ingestion of the
corpora from a database is also possible; however, we recommend
considering network traffic costs and bottlenecks.
##### Image Data for Computer Vision Problems
We recommend converting image datasets from the image files to
TFRecord files, but not to decode the images. Any decoding of highly
compressed images only increases the amount of disk space needed to store
the intermediate tf.Example records. The compressed images can be
stored in tf.Example records as byte strings:

In [None]:
import tensorflow as tf
base_path = "/path/to/images"
filenames = os.listdir(base_path)

def generate_label_from_path(image_path):
    ...
    return label
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


tfrecord_filename = 'data/image_dataset.tfrecord'
with tf.io.TFRecordWriter(tfrecord_filename) as writer:
    for img_path in filenames:
        image_path = os.path.join(base_path, img_path)
        try:
            raw_file = tf.io.read_file(image_path)
        except FileNotFoundError:
            print("File {} could not be found".format(image_path))
        continue
            example = tf.train.Example(features=tf.train.Features(feature={
                'image_raw': _bytes_feature(raw_file.numpy()),
                'label': _int64_feature(generate_label_from_path(image_path))
                }))
            writer.write(example.SerializeToString())

The example code reads images from a provided path /path/to/images and
stores the image as byte strings in the tf.Example. We aren’t preprocessing
our images at this point in the pipeline.

## Data Validation

Data validataion mean three
distinct checks on our data:
* Check for data anomalies.
* Check that the data schema hasn’t changed.
* Check that the statistics of our new datasets still align with
statistics from our previous training datasets.


The data validation step in our pipeline performs these checks and
highlights any failures. If a failure is detected, we can stop the workflow
and address the data issue by hand, for example, by curating a new dataset.


Generating Statistics from Your Data

In [None]:
import tensorflow_data_validation as tfdv

stats = tfdv.generate_statistics_from_csv(
        data_location='/data/consumer_complaints.csv',
        delimiter=',')

stats = tfdv.generate_statistics_from_tfrecord(
        data_location='/data/consumer_complaints.tfrecord')


Generating Schema from Your Data. Once we have generated our summary statistics, the next step is to generate
a schema of our dataset. Data schema are a form of describing the
representation of your datasets. A schema defines which features are
expected in your dataset and which type each feature is based on (float,
integer, bytes, etc.). Besides, your schema should define the boundaries of
your data (e.g., outlining minimums, maximums, and thresholds of allowed
missing records for a feature).


The schema definition of your dataset can then be used to validate future
datasets to determine if they are in line with your previous training sets.


In [None]:
schema = tfdv.infer_schema(stats)

Comparing Datasets. Let’s say we have two datasets: training and validation datasets. Before
training our machine learning model, we would like to determine how
representative the validation set is in regards to the training set. Does the
validation data follow our training data schema?

In [None]:
train_stats = tfdv.generate_statistics_from_tfrecord(
    data_location=train_tfrecord_filename)
val_stats = tfdv.generate_statistics_from_tfrecord(
    data_location=val_tfrecord_filename)
tfdv.visualize_statistics(lhs_statistics=val_stats,
    rhs_statistics=train_stats,
    lhs_name='VAL_DATASET', rhs_name='TRAIN_DATASET')

# Anomalies can be detected using the following code:
anomalies = tfdv.validate_statistics(statistics=val_stats, schema=schema)
tfdv.display_anomalies(anomalies)


Updating the Schema. The preceding anomaly protocol shows us how to detect variations from the
schema that is autogenerated from our dataset. But another use case for
TFDV is manually setting the schema according to our domain knowledge
of the data.

In [None]:
schema = tfdv.load_schema_text(schema_location)

sub_issue_feature = tfdv.get_feature(schema, 'feature_name')
sub_issue_feature.presence.min_fraction = 0.9

tfdv.write_schema_text(schema, schema_location)

#  revalidate the statistics to view the updated anomalies
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

Data Skew and Drift. TFDV provides a built-in “skew comparator” that detects large differences
between the statistics of two datasets. This isn’t the statistical definition of
skew (a dataset that is asymmetrically distributed around its mean). It is
defined in TFDV as the L-infinity norm of the difference between the
serving_statistics of two datasets. If the difference between the two
datasets exceeds the threshold of the L-infinity norm for a given feature,
TFDV highlights it as an anomaly using the anomaly detection defined
earlier in this chapter

In [None]:
tfdv.get_feature(schema,
    'company').skew_comparator.infinity_norm.threshold = 0.01

skew_anomalies = tfdv.validate_statistics(statistics=train_stats,
    schema=schema,
    serving_statistics=serving_stats)

Similar to this skew example, you should define your drift_comparator
for the features you would like to watch and compare. You can then call
validate_statistics with the two dataset statistics as arguments, one for
your baseline (e.g., yesterday’s dataset) and one for a comparison (e.g.,
today’s dataset):

In [None]:
tfdv.get_feature(schema,
    'company').drift_comparator.infinity_norm.threshold = 0.01
drift_anomalies = tfdv.validate_statistics(statistics=train_stats_today,
    schema=schema,
    previous_statistics=\
    train_stats_yesterday)

Biased Datasets. Another potential problem with an input dataset is bias. We define bias here
as data that is in some way not representative of the real world. 

### Processing Large Datasets with GCP

As we collect more data, the data validation becomes a more timeconsuming step in our machine learning workflow. One way of reducing the
time to perform the validation is by taking advantage of available cloud
solutions.

Set up a pipeline object:

In [None]:
from apache_beam.options.pipeline_options import (
 PipelineOptions, GoogleCloudOptions, StandardOptions)


options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = '<YOUR_GCP_PROJECT_ID>'
google_cloud_options.job_name = '<YOUR_JOB_NAME>'
google_cloud_options.staging_location = 'gs://<YOUR_GCP_BUCKET>/staging'
google_cloud_options.temp_location = 'gs://<YOUR_GCP_BUCKET>/tmp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

from apache_beam.options.pipeline_options import SetupOptions
setup_options = options.view_as(SetupOptions)
setup_options.extra_packages = [
                                '/path/to/tensorflow_data_validation'
                                '-0.22.0-cp37-cp37m-manylinux2010_x86_64.whl']

data_set_path = 'gs://<YOUR_GCP_BUCKET>/train_reviews.tfrecord'
output_path = 'gs://<YOUR_GCP_BUCKET>/'
tfdv.generate_statistics_from_tfrecord(data_set_path,
                                        output_path=output_path,
                                        pipeline_options=options)



### Integrating TFDV into Your ML Pipeline

TFX provides a pipeline component called StatisticsGen, which accepts
the output of the previous ExampleGen components as input and then
performs the generation of statistics:


In [None]:
from tfx.components import StatisticsGen
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

from tfx.components import SchemaGen
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)
context.run(schema_gen)

from tfx.components import ExampleValidator
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)


### Data Preprocessing with TFT

Deploying Preprocessing Steps and the ML Model as
One Artifact. To avoid a misalignment between the preprocessing steps and the trained
model, the exported model of our pipeline should include the preprocessing
graph and the trained model. We can then deploy the model like any other
TensorFlow model, but during our inference, the data will be preprocessed
on the model server as part of the model inference.

TFT processes the data that we ingested into our pipeline with the earlier
generated dataset schema, and it outputs two artifacts:
* Preprocessed training and evaluation datasets in the TFRecord
format. The produced datasets can be consumed downstream in the
Trainer component of our pipeline.
* Exported preprocessing graph (with assets), which will be used
when we’ll export our machine learning model.


The key to TFT is the preprocessing_fn function.
The function defines all transformations we want to apply to the raw data.
When we execute the Transform component, the preprocessing_fn
function will receive the raw data, apply the transformation, and return the
processed data. The data is provided as TensorFlow Tensors or
SparseTensors (depending on the feature). All transformations applied to
the tensors have to be TensorFlow operations. This allows TFT to
effectively distribute the preprocessing steps.


In [None]:
def preprocessing_fn(inputs):
    x = inputs['x']
    x_normalized = tft.scale_to_0_1(x)
    return {
    'x_xf': x_normalized
    }

def process_image(raw_image):
    raw_image = tf.reshape(raw_image, [-1])
    img_rgb = tf.io.decode_jpeg(raw_image, channels=3)
    img_gray = tf.image.rgb_to_grayscale(img_rgb)
    img = tf.image.convert_image_dtype(img, tf.float32)
    resized_img = tf.image.resize_with_pad(
    img,
    target_height=300,
    target_width=300
    )
    img_grayscale = tf.image.rgb_to_grayscale(resized_img)
    return tf.reshape(img_grayscale, [-1, 300, 300, 1])

Best Practices:
* Feature names matter
The naming of the output features of the preprocessing is important. As
you will see in the following TFT implementations, we reuse the name
of the input feature and append _xf. Also, the names of the input nodes
of the TensorFlow models need to match the names of the output
features from the preprocessing_fn function.
* Consider the data types
TFT limits the data types of the output features. It exports all
preprocessed features as either tf.string, tf.float32, or tf.int64
values. This is important in case your model can’t consume these data
types. Some models from TensorFlow Hub require inputs to be
presented as tf.int32 values (e.g., BERT models). We can avoid that
situation if we cast the inputs to the correct data types inside our models
or if we convert the data types in the estimator input functions.
* Preprocessing happens in batches
When you write preprocessing functions, you might think of it as
processing one data row at a time. In fact, TFT performs the operations
in batches. This is why we will need to reshape the output of the
preprocessing_fn() function to a Tensor or SparseTensor when we
use it in the context of our Transform component.
* Remember, no eager execution
The functions inside of the preprocessing_fn() function need to be
represented by TensorFlow ops. If you want to lower an input string,
you couldn’t use lower(). You have to use the TensorFlow operation
tf.strings.lower() to perform the same procedure in a graph mode.
Eager execution isn’t supported; all operations rely on pure TensorFlow
graph operations.


Standalone Execution of TFT. After we have defined our preprocessing_fn function, we need to focus
on how to execute the Transform function. For the execution, we have two
options. We can either execute the preprocessing transformations in a
standalone setup or as part of our machine learning pipeline in the form of a
TFX component.

In our example, we would like to apply the normalization preprocessing
function that we introduced earlier on our tiny raw dataset, shown in the
following source code:

In [None]:
raw_data = [
    {'x': 1.20},
    {'x': 2.99},
    {'x': 100.00}
]

First, we need to define a data schema. We can generate a schema from a
feature specification, as shown in the following source code.

In [None]:
import tensorflow as tf
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
    'x': tf.io.FixedLenFeature([], tf.float32),
    }))

With the dataset loaded and the data schema generated, we can now execute
the preprocessing function preprocessing_fn, which we defined earlier. TFT provides bindings for the execution on Apache Beam with the function
AnalyzeAndTransformDataset. This function is performing the two-step
process we discussed earlier: first analyze the dataset and then transform it.
The execution is performed through the Python context manager
tft_beam.Context, which allows us to set, for example, the desired batch
size. However, we recommend using the default batch size because it is
more performant in common use cases. The following example shows the
usage of the AnalyzeAndTransformDataset function:

In [None]:
import tempfile
import apache_beam as beam
import tensorflow_transform.beam.impl as tft_beam
with beam.Pipeline() as pipeline:
    with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
        tfrecord_file = "/your/tf_records_file.tfrecord"
        raw_data = (
            pipeline | beam.io.ReadFromTFRecord(tfrecord_file))
        transformed_dataset, transform_fn = (
            (raw_data, raw_data_metadata) |
            tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))


### Integrate TFT into Your Machine Learning Pipeline

In the following code, we define our features. For simpler processing later
on, we group the input feature names in dictionaries representing each
transform output data type: one-hot encoded features, bucketized features,
and raw string representations:

In [None]:
import tensorflow as tf
import tensorflow_transform as tft
LABEL_KEY = "consumer_disputed"
# Feature name, feature dimensionality.
ONE_HOT_FEATURES = {
    "product": 11,
    "sub_product": 45,
    "company_response": 5,
    "state": 60,
    "issue": 90
}
# Feature name, bucket count.
BUCKET_FEATURES = {
    "zip_code": 10
}
# Feature name, value is unused.
TEXT_FEATURES = {
    "consumer_complaint_narrative": None
}

def transformed_name(key):
    return key + '_xf'

def fill_in_missing(x):
    default_value = '' if x.dtype == tf.string or to_string else 0
    if type(x) == tf.SparseTensor:
        x = tf.sparse.to_dense(
        tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value)
    return tf.squeeze(x, axis=1)

def convert_num_to_one_hot(label_tensor, num_labels=2):
    one_hot_tensor = tf.one_hot(label_tensor, num_labels)
    return tf.reshape(one_hot_tensor, [-1, num_labels])

def convert_zip_code(zip_code):
    if zip_code == '':
        zip_code = "00000"
    zip_code = tf.strings.regex_replace(zip_code, r'X{0,5}', "0")
    zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)
    return zip_code

With all the helper functions in place, we can now loop over each feature
column and transform it depending on the type. For example, for our
features to be converted to one-hot features, we convert the category names
to an index with tft.compute_and_apply_vocabulary() and then
convert the index to a one-hot vector representation with our helper
function convert_num_to_one_hot(). Since we are using
tft.compute_and_apply_vocabulary(), TensorFlow Transform will first
loop over all categories and then determine a complete category to index
mapping. This mapping will then be applied during our evaluation and
serving phase of the model:


In [None]:
def preprocessing_fn(inputs):
    outputs = {}
    for key in ONE_HOT_FEATURES.keys():
        dim = ONE_HOT_FEATURES[key]
        index = tft.compute_and_apply_vocabulary(
            fill_in_missing(inputs[key]), top_k=dim + 1)
        outputs[transformed_name(key)] = convert_num_to_one_hot(
            index, num_labels=dim + 1)
    ...
    return outputs

Our processing of the bucket features is very similar. We decided to
bucketize the zipcodes because one-hot encoded zip codes seemed too
sparse. Each feature is bucketized into, in our case, 10 buckets, and we
encode the index of the bucket as one-hot vectors:

In [None]:
for key, bucket_count in BUCKET_FEATURES.items():
    temp_feature = tft.bucketize(
        convert_zip_code(fill_in_missing(inputs[key])),
        bucket_count,
        always_return_num_quantiles=False)
    outputs[transformed_name(key)] = convert_num_to_one_hot(
        temp_feature,
        num_labels=bucket_count + 1)

Our text input features as well as our label column don’t require any
transformations; therefore, we simply convert them to dense features in case
a feature might be sparse:

In [None]:
for key in TEXT_FEATURES.keys():
    outputs[transformed_name(key)] = \
        fill_in_missing(inputs[key])

outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])

If we use the Transform component from TFX in our pipeline, it expects
the transformation code to be provided in a separate Python file. The name
of the module file can be set by the user (e.g., in our case module.py), but
the entry point preprocessing_fn() needs to be contained in the module
file and the function can’t be renamed:

In [None]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath("module.py"))
context.run(transform)

### Model training

One very important feature of training a model in a TFX pipeline is that the
data preprocessing steps are saved along
with the trained model weights. This is incredibly useful once our model is
deployed to production because it means that the preprocessing steps will
always produce the features the model is expecting. Without this feature, it
would be possible to update the data preprocessing steps without updating
the model, and then the model would fail in production or the predictions
would be based on the wrong data. 

#### The TFX Trainer Component

run_fn() Function. The Trainer component will look for a run_fn() function in our module
file and use the function as an entry point to execute the training process.


The run_fn() function is a generic entry point to the training steps and not
tf.Keras specific. It carries out the following steps:
* Loading the training and validation data (or the data generator)
* Defining the model architecture and compiling the model
* Training the model
* Exporting the model to be evaluated in the next pipeline step



In [None]:
LABEL_KEY = 'labels'

def _gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames,
        compression_type='GZIP')

def input_fn(file_pattern,
            tf_transform_output, batch_size=32):
    
    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy())
    
    dataset = tf.data.experimental.make_batched_features_dataset(
                        file_pattern=file_pattern,
                        batch_size=batch_size,
                        features=transformed_feature_spec,
                        reader=_gzip_reader_fn,
                        label_key=transformed_name(LABEL_KEY))
    return dataset

def run_fn(fn_args):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    train_dataset = input_fn(fn_args.train_files, tf_transform_output)
    eval_dataset = input_fn(fn_args.eval_files, tf_transform_output)
    
    model = get_model()
    
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)
    
    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(
                model,
                tf_transform_output).get_concrete_function(
                    tf.TensorSpec(
                        shape=[None],
                        dtype=tf.string,
                        name='examples')
                    )
    }
    model.save(fn_args.serving_model_dir,
                save_format='tf', signatures=signatures)

The run_fn exports the get_serve_tf_examples_fn as part of the model
signature. When a model has been exported and deployed, every prediction
request will pass through the serve_tf_examples_fn()

In [None]:
def get_serve_tf_examples_fn(model, tf_transform_output):
    
    model.tft_layer = tf_transform_output.transform_features_layer()
    
    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        outputs = model(transformed_features)
        return {'outputs': outputs}
    
    return serve_tf_examples_fn


the Trainer component takes the following as
input:
The Python module file, here saved as module.py, containing the
run_fn(), input_fn(), get_serve_tf_examples_fn(), and
other associated functions we discussed earlier
The transformed examples generated by the Transform component
The transform graph generated by the Transform component
The schema generated by the data validation component
The number of training and evaluation steps

In [None]:
from tfx.components import Trainer
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.proto import trainer_pb2
TRAINING_STEPS = 1000
EVALUATION_STEPS = 100
trainer = Trainer(
    module_file=os.path.abspath("module.py"),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    transformed_examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=TRAINING_STEPS),
    eval_args=trainer_pb2.EvalArgs(num_steps=EVALUATION_STEPS))

context.run(trainer)


### Model analysis and validation

TensorFlow Model Analysis. TFMA gives us an easy way to get more detailed metrics than just those
used during model training. It lets us visualize metrics as time series across
model versions, and it gives us the ability to view metrics on slices of a
dataset.

It takes a saved model and an evaluation dataset as input. In this example,
we’ll assume a Keras model is saved in SavedModel format and an
evaluation dataset is available in the TFRecord file format.
First, the SavedModel must be converted to an EvalSharedModel. Next, we provide an EvalConfig. In this step, we tell TFMA what our label
is, provide any specifications for slicing the model by one of the features,
and stipulate all the metrics we want TFMA to calculate and display. Then, run the model analysis step:


In [None]:
import tensorflow_model_analysis as tfma
eval_shared_model = tfma.default_eval_shared_model(
    eval_saved_model_path=_MODEL_DIR,
    tags=[tf.saved_model.SERVING])

eval_config=tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key='consumer_disputed')],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(class_name='BinaryAccuracy'),
            tfma.MetricConfig(class_name='ExampleCount'),
            tfma.MetricConfig(class_name='FalsePositives'),
            tfma.MetricConfig(class_name='TruePositives'),
            tfma.MetricConfig(class_name='FalseNegatives'),
            tfma.MetricConfig(class_name='TrueNegatives')
        ])
    ]
)

eval_result = tfma.run_model_analysis(
    eval_shared_model=eval_shared_model,
    eval_config=eval_config,
    data_location=_EVAL_DATA_FILE,
    output_path=_EVAL_RESULT_LOCATION,
    file_format='tfrecords')

Analysis and Validation in TFX. Up to this point in this chapter, we’ve focused on model analysis with a
human in the loop. There are several components in TFX that handle
this part of the pipeline: the Resolver, the Evaluator, and the Pusher.


ResolverNode. A Resolver component is required if we want to compare a new model
against a previous model. ResolverNodes are generic components that
query the metadata store. It checks for the last blessed model
and returns it as a baseline so it can be passed on to the Evaluator
component with the new candidate model.

In [None]:
from tfx.components import ResolverNode
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
model_resolver = ResolverNode(
    instance_name='latest_blessed_model_resolver',
    resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing)
)
context.run(model_resolver)

Evaluator Component. The Evaluator component uses the TFMA library to evaluate a model’s
predictions on a validation dataset.

In [None]:
import tensorflow_model_analysis as tfma
eval_config=tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key='consumer_disputed')],
    slicing_specs=[tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=['product'])],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(class_name='BinaryAccuracy'),
            tfma.MetricConfig(class_name='ExampleCount'),
            tfma.MetricConfig(class_name='AUC')
        ])
    ]
)

from tfx.components import Evaluator
evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config
)
context.run(evaluator)


TFX Pusher Component. Pusher component is a small but important part of our pipeline. It
takes as input a saved model, the output of the Evaluator component, and
a file path for the location our models will be stored for serving. It then
checks whether the Evaluator has blessed the model (i.e., the model is an
improvement on the previous version, and it is above any thresholds we
have set). If it has been blessed, the Pusher pushes the model to the serving
file path.


In [None]:
from tfx.components import Pusher
from tfx.proto import pusher_pb2
_serving_model_dir = "serving_model_dir"
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

### Model deployment

Machine learning models can be deployed in three main ways: with a model
server, in a user’s browser, or on an edge device. The most common way
today to deploy a machine learning model is with a model server.

Most introductions to deploying machine learning models follow roughly
the same workflow:
* Create a web app with Python (i.e., with web frameworks like
Flask or Django).
* Create an API endpoint in the web app
* Load the model structure and its weights.
* Call the predict method on the loaded model.
* Return the prediction results as an HTTP request.

However, we do not recommend using this strategy to deploy
machine learning models to production endpoints because:


1. Lack of Code Separation. This means that there
would be no separation between the API code and the machine learning
model, which can be problematic when data scientists want to update a
model and such an update requires coordination with the API team.

2. Lack of Model Version Control. If
you wanted to add a new version, you would have to create a new endpoint
(or add some branching logic to the existing endpoint). This requires extra
attention to keep all endpoints structurally the same, and it requires a lot of
boilerplate code.

3. Inefficient Model Inference. This means each
request is preprocessed and inferred individually. The key reason why we
argue that such a setup is only for demonstration purposes is that it is highly
inefficient. During the training of your model, you will probably use a
batching technique that allows you to compute multiple samples at the same
time and then apply the gradient change for your batch to your network’s
weights. You can apply the same technique when you want the model to
make predictions. 


TensorFlow Serving. TensorFlow Serving provides you the functionality to load models from a
given source (e.g., AWS S3 buckets) and notifies the loader if the source
has changed.

Before we dive into the TensorFlow Serving configurations, let’s discuss
how you can export your machine learning models so that they can be used
by TensorFlow Serving.
Depending on your type of TensorFlow model, the export steps are slightly
different.

In [None]:
# for keras models
saved_model_path = model.save(file path="./saved_models", save_format="tf")

# for TensorFlow Estimator
import tensorflow as tf
def serving_input_receiver_fn():
    # an example input feature
    input_feature = tf.compat.v1.placeholder(
        dtype=tf.string, shape=[None, 1], name="input")
    fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
        features={"input_feature": input_feature})
    return fn

# Estimator model 
estimator = tf.estimator.Estimator(model_fn, "model", params={})
estimator.export_saved_model(
    export_dir_base="saved_models/",
    serving_input_receiver_fn=serving_input_receiver_fn)

TensorFlow Serving
allows two different API types: REST and gRPC.

* REST API - REST is a communication “protocol” used by today’s web services. REST clients communicate with the server
using the standard HTTP methods like GET, POST, DELETE, etc. The
payloads of the requests are often encoded as XML or JSON data formats.

* gRPC - While gRPC
supports different data formats, the standard data format used with gRPC is
protocol buffer, which we used throughout this book. gRPC provides lowlatency communication and smaller payloads if protocol buffers are used.


#### Making Predictions from the Model Server

In [None]:
import requests
def get_rest_request(text, model_name="my_model"):
    url = "http://localhost:8501/v1/models/{}:predict".format(model_name)
    payload = {"instances": [text]}
    response = requests.post(url=url, json=payload)
    return response

rs_rest = get_rest_request(text="classify my text")
rs_rest.json()

# --------------------------- OR ----------------------------
import grpc
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc
import tensorflow as tf
def create_grpc_stub(host, port=8500):
    hostport = "{}:{}".format(host, port)
    channel = grpc.insecure_channel(hostport)
    stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
    return stub

def grpc_request(stub, data_sample, model_name='my_model', \
    signature_name='classification'):
    
    request = predict_pb2.PredictRequest()
    request.model_spec.name = model_name
    request.model_spec.signature_name = signature_name
    request.inputs['inputs'].CopyFrom(tf.make_tensor_proto(data_sample,
        shape=[1,1]))
    result_future = stub.Predict.future(request, 10)
    return result_future