<a href="https://colab.research.google.com/github/victorviro/Deep_learning_python/blob/master/Introduction_to_TensorFlow_Extended_and_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction

In the previous [notebook](https://github.com/victorviro/Deep_learning_python/blob/master/Introduction_to_Machine_Learning_pipelines.ipynb), we introduced the concept of machine learning pipelines and discussed the components that make up a pipeline. In this notebook, we introduce [TensorFlow Extended (TFX)](https://www.tensorflow.org/tfx). The TFX library supplies all the components we will need for our machine learning pipelines. We define our pipeline tasks using TFX, and they can then be executed with a pipeline orchestrator such as Airflow or Kubeflow Pipelines.

We will guide through the installation of TFX, explaining basic concepts and terminologys. In later notebooks, we take an in-depth look at the individual components that make up our pipelines. We also introduce Apache Beam in this notebook. [Beam](https://github.com/apache/beam) is an open source tool for defining and executing data-processing jobs. It has two uses in TFX pipelines: first, it runs under the hood of many TFX components to carry out processing steps like data validation or data preprocessing. Second, it can be used as a pipeline orchestrator. We introduce Beam here because it will help us understand TFX components, and it is essential if we wish to write custom components.

## What Is TFX?


Machine learning pipelines can become very complicated and consume a lot of overhead to manage task dependencies. At the same time, machine learning pipelines can include a variety of tasks, including tasks for data validation, preprocessing, model training, and any post-training tasks. As we discussed in the previous notebook, the connections between tasks are often brittle and cause the pipelines to fail. These connections are also known as the glue code from the publication [“Hidden Technical Debt in Machine Learning Systems”](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf). Having brittle connections ultimately means that production models will be updated infrequently, and data scientists and machine learning engineers loathe updating stale models. Pipelines also require well-managed distributed processing, which is why TFX leverages Apache Beam. This is especially true for large workloads.

Google faced the same problem internally and decided to develop a platform to simplify the pipeline definitions and to minimize the amount of task code to write. The open source version of Google’s internal ML pipeline framework is TFX.

Figure 2-2 shows the general pipeline architecture with TFX. Pipeline orchestration tools are the foundation for executing our tasks. Besides the orchestration tools, we need a data store to keep track of the intermediate pipeline results. The individual components communicate with the data store to receive their inputs, and they return the results to the data store. These results can then be inputs to following tasks. TFX provides the layer that combines all of these tools, and it provides the individual components for the major pipeline tasks.

![](https://i.ibb.co/KLQJSmC/ML-pipeline-architecture.png)

Initially, Google released some of the pipeline functionality as open source TensorFlow libraries (e.g., TensorFlow Serving) under the umbrella of TFX libraries. In 2019, Google then published the open source glue code containing all the required pipeline components to tie the libraries together and to automatically create pipeline definitions for orchestration tools like Apache Airflow, Apache Beam, and Kubeflow Pipelines.

TFX provides a variety of pipeline components that cover a good number of use cases. At the time of writing, the following components were available:

- Data ingestion with `ExampleGen`.

- Data validation with `StatisticsGen`, `SchemaGen`, and the `ExampleValidator`.

- Data preprocessing with `Transform`.

- Model training with `Trainer`.

- Checking for previously trained models with `ResolverNode`.

- Model analysis and validation with `Evaluator`.

- Model deployments with `Pusher`.

Figure 2-3 shows how the components of the pipeline and the libraries fit together.

![](https://i.ibb.co/nnbDRm0/TFX-components.png)

We will discuss the components and libraries in greater detail in the following notebooks. 

**Note**: At the time of writing this notebook, a stable 1.X version of TFX hasn’t been released. The TFX API mentioned in this and the following chapters might be subject to future updates. To the best of our knowledge, all the examples in these notebooks will work with TFX version 0.24.1.

## Installing TFX

TFX can easily be installed by running the following Python installer command:

In [None]:
!pip install tfx

In [1]:
import tfx
import tensorflow as tf

In [2]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.3.0
TFX version: 0.24.1


The `tfx` package comes with a variety of dependencies that will be installed automatically. It installs not only the individual TFX Python packages (e.g., TensorFlow Data Validation), but also their dependencies like Apache Beam.

**Note**: If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.

After installing TFX, we can import the individual Python packages. We recommend taking this approach if we want to use the individual TFX packages (e.g., we want to validate a dataset using TensorFlow Data Validation):

In [3]:
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

Alternatively, we can import the corresponding TFX component (if using the components in the context of a pipeline):

In [4]:
from tfx.components import ExampleValidator
from tfx.components import Evaluator
from tfx.components import Transform

## Overview of TFX Components

A component handles a more complex process than just the execution of a single task. All machine learning pipeline components read from a channel to get input artifacts from the metadata store. The data is then loaded from the path provided by the metadata store and processed. The output of the component, the processed data, is then provided to the next pipeline components. The generic internals of a component are always:

- Receive some input

- Perform an action

- Store the final result

In TFX terms, the three internal parts of the component are called the *driver*, *executor*, and *publisher*. The driver handles the querying of the metadata store. The executor performs the actions of the components. And the publisher manages the saving of the output metadata in the MetadataStore. The driver and the publisher aren’t moving any data. Instead, they read and write references from the MetadataStore. Figure 2-4 shows the structure of a TFX component.

![](https://i.ibb.co/S7GcMb2/TFX-component-overview.png)

The inputs and outputs of the components are called *artifacts*. Examples of artifacts include raw input data, preprocessed data, and trained models. Each artifact is associated with metadata stored in the MetadataStore. The artifact metadata consists of an artifact type as well as artifact properties. This artifact setup guarantees that the components can exchange data effectively. TFX currently provides ten different types of artifacts, which we review in the following notebooks.



## What Is ML Metadata?

The components of TFX "communicate" through *metadata*; instead of passing artifacts directly between the pipeline components, the components consume and publish references to pipeline artifacts. An artifact could be, for example, a raw dataset, a transform graph, or an exported model. Therefore, the metadata is the backbone of our TFX pipelines. One advantage of passing the metadata between components instead of the direct artifacts is that the information can be centrally stored.

In practice, the workflow goes as follows: when we execute a component, it uses the [ML Metadata (MLMD)](https://www.tensorflow.org/tfx/guide/mlmd) API to save the metadata corresponding to the run. For example, the component driver receives the reference for a raw dataset from the metadata store. After the component execution, the component publisher will store the references of the component outputs in the metadata store. MLMD saves the metadata consistently to a MetadataStore, based on a storage backend. Currently, MLMD supports three types of backends:

- In-memory database (via SQLite)

- SQLite

- MySQL

Because the TFX components are so consistently tracked, ML Metadata provides a variety of useful functions. For example, we can compare two artifacts from the same component (we'll see this in a following notebook when we'll discuss model validation). In this particular case, TFX compares the model analysis results from a current run with the results from the previous run. This checks whether the more recently trained model has a better accuracy or loss compared to the previous model. The metadata can also be used to determine all the artifacts that have been based on another, previously created artifact. This creates a kind of audit trail for our machine learning pipelines.

Figure 2-5 shows that each component interacts with the MetadataStore, and the MetadataStore stores the metadata on the provided database backend.

![](https://i.ibb.co/KFWMGb7/store-metadata-MLMD.png)


## Download example data

We download the example dataset for use in our TFX pipeline. The dataset we're using is the [Taxi Trips dataset](https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew) released by the City of Chicago. The columns in this dataset are:

<table>
<tr><td>pickup_community_area</td><td>fare</td><td>trip_start_month</td></tr>
<tr><td>trip_start_hour</td><td>trip_start_day</td><td>trip_start_timestamp</td></tr>
<tr><td>pickup_latitude</td><td>pickup_longitude</td><td>dropoff_latitude</td></tr>
<tr><td>dropoff_longitude</td><td>trip_miles</td><td>pickup_census_tract</td></tr>
<tr><td>dropoff_census_tract</td><td>payment_type</td><td>company</td></tr>
<tr><td>trip_seconds</td><td>dropoff_community_area</td><td>tips</td></tr>
</table>

In [5]:
import os
import urllib.request

data_root = 'tfx_data/'
if not os.path.exists(data_root):
    os.makedirs(data_root)
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
data_filepath = os.path.join(data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, data_filepath)

('tfx_data/data.csv', <http.client.HTTPMessage at 0x7ff4029dbbe0>)

Let's take a quick look at the CSV file.

In [6]:
!head {data_filepath}

pickup_community_area,fare,trip_start_month,trip_start_hour,trip_start_day,trip_start_timestamp,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_miles,pickup_census_tract,dropoff_census_tract,payment_type,company,trip_seconds,dropoff_community_area,tips
,12.45,5,19,6,1400269500,,,,,0.0,,,Credit Card,Chicago Elite Cab Corp. (Chicago Carriag,0,,0.0
,0,3,19,5,1362683700,,,,,0,,,Unknown,Chicago Elite Cab Corp.,300,,0
60,27.05,10,2,3,1380593700,41.836150155,-87.648787952,,,12.6,,,Cash,Taxi Affiliation Services,1380,,0.0
10,5.85,10,1,2,1382319000,41.985015101,-87.804532006,,,0.0,,,Cash,Taxi Affiliation Services,180,,0.0
14,16.65,5,7,5,1369897200,41.968069,-87.721559063,,,0.0,,,Cash,Dispatch Taxi Affiliation,1080,,0.0
13,16.45,11,12,3,1446554700,41.983636307,-87.723583185,,,6.9,,,Cash,,780,,0.0
16,32.05,12,1,1,1417916700,41.953582125,-87.72345239,,,15.4,,,Cash,,1200,,0.0
30,38.45,10,10,5,1444301100,41.839086906,-87.714003807,,,14.6,,,Cash,,2580,,0.0
11,14.65,1,1,3,1358

## Interactive Pipelines

Designing and implementing machine learning pipelines can be frustrating at times. It is sometimes challenging to debug components within a pipeline, for example. This is why the TFX functionality around interactive pipelines is beneficial. In fact, in the following notebooks, we will implement a machine learning pipeline step by step and demonstrate its implementations through an interactive pipeline. The pipeline runs in a Jupyter Notebook, and the component's artifacts can be immediately reviewed. Once we have confirmed the full functionality of our pipeline, we will discuss how we can convert our interactive pipeline to a production-ready pipeline, for example, for execution on Apache Airflow.

Any interactive pipeline is programmed in the context of a Jupyter Notebook or a Google Colab session. In contrast to the orchestration tools we will discuss, interactive pipelines are orchestrated and executed by the user.

We can start an interactive pipeline by importing the required packages:

In [7]:
import tensorflow as tf
from tfx.orchestration.experimental.interactive.interactive_context import \
    InteractiveContext

Once the requirements are imported, we can create a context object. The context object handles component execution and displays the component’s artifacts. At this point, the `InteractiveContext` also sets up a simple in-memory ML MetadataStore:

In [8]:
context = InteractiveContext()



After setting up our pipeline component(s) (e.g., `StatisticsGen`), we can then execute each component object through the run function of the context object, as shown in the following example:

In [9]:
from tfx.utils.dsl_utils import external_input
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen

example_gen = CsvExampleGen(input=external_input(data_root))
context.run(example_gen)

Instructions for updating:
external_input is deprecated, directly pass the uri to ExampleGen.






0,1
.execution_id,1
.component,"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } CsvExampleGen at 0x7ff401bf5080.inputs{}.outputs['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0.exec_properties['input_base']tfx_data/['input_config']{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }['output_config']{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 2,  ""name"": ""train""  },  {  ""hash_buckets"": 1,  ""name"": ""eval""  }  ]  } }['output_data_format']6['custom_config']None['span']0['version']None['input_fingerprint']split:single_split,num_files:1,total_bytes:1922812,xor_checksum:1605016385,sum_checksum:1605016385['_beam_pipeline_args'][]"
.component.inputs,{}
.component.outputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.inputs,{}
.outputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"
.exec_properties,"['input_base']tfx_data/['input_config']{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }['output_config']{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 2,  ""name"": ""train""  },  {  ""hash_buckets"": 1,  ""name"": ""eval""  }  ]  } }['output_data_format']6['custom_config']None['span']0['version']None['input_fingerprint']split:single_split,num_files:1,total_bytes:1922812,xor_checksum:1605016385,sum_checksum:1605016385['_beam_pipeline_args'][]"

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1
.span,0
.split_names,"[""train"", ""eval""]"
.version,0

0,1
['input_base'],tfx_data/
['input_config'],"{  ""splits"": [  {  ""name"": ""single_split"",  ""pattern"": ""*""  }  ] }"
['output_config'],"{  ""split_config"": {  ""splits"": [  {  ""hash_buckets"": 2,  ""name"": ""train""  },  {  ""hash_buckets"": 1,  ""name"": ""eval""  }  ]  } }"
['output_data_format'],6
['custom_config'],
['span'],0
['version'],
['input_fingerprint'],"split:single_split,num_files:1,total_bytes:1922812,xor_checksum:1605016385,sum_checksum:1605016385"
['_beam_pipeline_args'],[]

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1
.span,0
.split_names,"[""train"", ""eval""]"
.version,0


In [10]:
from tfx.components import StatisticsGen

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

0,1
.execution_id,2
.component,"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } StatisticsGen at 0x7ff40ab676d8.inputs['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0.outputs['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (1 artifact) at 0x7ff40b130c50.type_nameExampleStatistics._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""].exec_properties['stats_options_json']None['exclude_splits'][]"
.component.inputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"
.component.outputs,"['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (1 artifact) at 0x7ff40b130c50.type_nameExampleStatistics._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
.inputs,"['examples'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"
.outputs,"['statistics'] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (1 artifact) at 0x7ff40b130c50.type_nameExampleStatistics._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"
.exec_properties,['stats_options_json']None['exclude_splits'][]

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1
.span,0
.split_names,"[""train"", ""eval""]"
.version,0

0,1
['statistics'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (1 artifact) at 0x7ff40b130c50.type_nameExampleStatistics._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
.type_name,ExampleStatistics
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
.type,<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2
.span,0
.split_names,"[""train"", ""eval""]"

0,1
['stats_options_json'],
['exclude_splits'],[]

0,1
['examples'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'Examples' (1 artifact) at 0x7ff401bf5358.type_nameExamples._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type_name,Examples
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'Examples' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1) at 0x7ff401bf52b0.type<class 'tfx.types.standard_artifacts.Examples'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1.span0.split_names[""train"", ""eval""].version0"

0,1
.type,<class 'tfx.types.standard_artifacts.Examples'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/CsvExampleGen/examples/1
.span,0
.split_names,"[""train"", ""eval""]"
.version,0

0,1
['statistics'],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Channel of type 'ExampleStatistics' (1 artifact) at 0x7ff40b130c50.type_nameExampleStatistics._artifacts[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
.type_name,ExampleStatistics
._artifacts,"[0] function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
[0],"function toggleTfxObject(element) {  var objElement = element.parentElement;  if (objElement.classList.contains('collapsed')) {  objElement.classList.remove('collapsed');  objElement.classList.add('expanded');  } else {  objElement.classList.add('collapsed');  objElement.classList.remove('expanded');  } } Artifact of type 'ExampleStatistics' (uri: /tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2) at 0x7ff4005e5cf8.type<class 'tfx.types.standard_artifacts.ExampleStatistics'>.uri/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2.span0.split_names[""train"", ""eval""]"

0,1
.type,<class 'tfx.types.standard_artifacts.ExampleStatistics'>
.uri,/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2
.span,0
.split_names,"[""train"", ""eval""]"


The component itself receives the outputs of the previous component (in our case, the data ingestion component `ExampleGen`) as an instantiation argument. After executing the component’s tasks, the component automatically writes the metadata of the output artifact to the metadata store. The output of some components can be displayed in our notebook. The immediate availability of the results and the visualizations is very convenient. For example, we can use the `StatisticsGen` component to inspect the features of the dataset:

In [11]:
context.show(statistics_gen.outputs['statistics'])

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


After running the previous context function, we can see a visual overview of the statistics of the dataset in our notebook.

Sometimes it can be advantageous to inspect the output artifacts of a component programmatically. After a component object has been executed, we can access the artifact properties, as shown in the following example. The properties depend on the specific artifact:

In [12]:
for artifact in statistics_gen.outputs['statistics'].get():
    print(artifact.uri)

/tmp/tfx-interactive-2020-11-10T13_53_38.267895-fflybx1k/StatisticsGen/statistics/2


Throughout the following notebooks, we will show how each component can be run in an interactive context and we will show the full pipeline and how it can be orchestrated by both Airflow and Kubeflow.

## Alternatives to TFX

Before we take a deep dive into TFX components in the following notebooks, let’s take a moment to look at alternatives to TFX. The orchestration of machine learning pipelines has been a significant engineering challenge in the last few years, and it should come as no surprise that many major Silicon Valley companies have developed their own pipeline frameworks. In the following table, we can find a small selection of frameworks:

- AirBnb: [AeroSolve](https://github.com/airbnb/aerosolve)

- Stripe: [Railyard](https://stripe.com/blog/railyard-training-models)

- Spotify: [Luigi](https://github.com/spotify/luigi)

- Uber: [Michelangelo](https://eng.uber.com/michelangelo-machine-learning-platform/)

- Netflix: [Metaflow](https://metaflow.org/)

Since the frameworks originated from corporations, they were designed with specific engineering stacks in mind. For example, AirBnB’s AeroSolve focuses on Java-based inference code, and Spotify’s Luigi focuses on efficient orchestration. TFX is no different in this regard. At this point, TFX architectures and data structures assume that we are using TensorFlow (or Keras) as our machine learning framework. Some TFX components can be used in combination with other machine learning frameworks. For example, data can be analyzed with TensorFlow Data Validation and later consumed by a scikit-learn model. However, the TFX framework is closely tied to TensorFlow or Keras models. We believe TFX is a stable and mature framework that will ultimately be adopted by a broader base of machine learning engineers.

## Introduction to Apache Beam

A variety of TFX components and libraries (e.g., TensorFlow Transform) rely on Apache Beam to process pipeline data efficiently. Because of the importance for the TFX ecosystem, we would like to provide a brief introduction into how Apache Beam works behind the scenes of the TFX components. In a later notebook, we will then discuss how to use Apache Beam for a second purpose: as a pipeline orchestrator tool.

Apache Beam offers us an open source, vendor-agnostic way to describe data processing steps that then can be executed on various environments. Since it is incredibly versatile, Apache Beam can be used to describe batch processes, streaming operations, and data pipelines. In fact, TFX relies on Apache Beam and uses it under the hood for a variety of components (e.g., TensorFlow Transform or TensorFlow Data Validation). We will discuss the specific use of Apache Beam in the TFX ecosystem when we talk about TensorFlow Data Validation and TensorFlow Transform in later notebooks.

While Apache Beam abstracts away the data processing logic from its supporting runtime tools, it can be executed on multiple distributed processing runtime environments. This means that we can run the same data pipeline on Apache Spark or Google Cloud Dataflow without a single change in the pipeline description. Also, Apache Beam was not just developed to describe batch processes but to support streaming operations seamlessly.

#### Setup

The installation of Apache Beam is straightforward. We can install the latest version with:

In [None]:
#!pip install apache-beam

If we plan to use Apache Beam in the context of Google Cloud Platform—for example, if we want to process data from Google BigQuery or run our data pipelines on Google Cloud Dataflow (as described in (“Processing Large Datasets with GCP”)), we should install Apache Beam as follows:

In [None]:
#!pip install 'apache-beam[gcp]'

If we plan to use Apache Beam in the context of Amazon Web Services (AWS) (e.g., if we want to load data from S3 buckets), we should install Apache Beam as follows:

In [None]:
#!pip install 'apache-beam[boto]'

If we install TFX with the Python package manager pip, Apache Beam will be automatically installed.

#### Basic Data Pipeline

Apache Beam’s abstraction is based on two concepts: collections and transformations. On the one hand, Beam’s collections describe operations where data is being read or written from or to a given file or stream. On the other hand, Beam’s transformations describe ways to manipulate the data. All collections and transformations are executed in the context of a pipeline (expressed in Python through the context manager command `with`). When we define our collections or transformations in our following example, no data is actually being loaded or transformed. This only happens when the pipeline is executed in the context of a runtime environment (e.g., Apache Beam’s DirectRunner, Apache Spark, Apache Flink, or Google Cloud Dataflow).

### Basic collection example

Data pipelines usually start and end with data being read or written, which is handled in Apache Beam through collections, often called `PCollections`. The collections are then transformed, and the final result can be expressed as a collection again and written to a filesystem.

The following example shows how to read a text file and return all lines:

In [13]:
import apache_beam as beam

input_file = "gs://dataflow-samples/shakespeare/kinglear.txt"
output_file = "/content/output.txt"

# Use the context manager to define the pipeline
with beam.Pipeline() as p: 

    # Read the text into a PCollection
    lines = p | beam.io.ReadFromText(input_file) 

Connecting anonymously.


Similar to the `ReadFromText` operation, Apache Beam provides functions to write collections to a text file (e.g., `WriteToText`) (see [apache_beam.io.textio module](https://beam.apache.org/releases/pydoc/2.2.0/apache_beam.io.textio.html)). The write operation is usually performed after all transformations have been executed:

```
with beam.Pipeline() as p:
    lines = p | beam.io.ReadFromText(input_file)
    ...
    # Write the output to the file `output_file`
    output | beam.io.WriteToText(output_file)
```

### Basic transformation example

In Apache Beam, data is manipulated through transformations. As we see in this example and in a later notebook, the transformations can be chained by using the pipe operator `|.` If we chain multiple transformations of the same type, we have to provide a name for the operation, noted by the string identifier between the pipe operator and the right-angle brackets. In the following example, we apply all transformations sequentially on our lines extracted from the text file:

In [15]:
counts = (
    lines
    | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

Let’s walk through this code in detail. As an example, we’ll take the phrases *“Hello, how do you do?”* and *“I am well, thank you”*.

The `Split` transformation uses `re.findall` to split each line into a list of tokens, giving the result:

```
["Hello", "how", "do", "you", "do"]
["I", "am", "well", "thank", "you"]
```

`beam.FlatMap` maps the result into a PCollection:

```
"Hello" "how" "do" "you" "do" "I" "am" "well" "thank" "you"
```

Next, the `PairWithOne` transformation uses `beam.Map` to create a tuple out of every token and the count (1 for each result):

```
("Hello", 1) ("how", 1) ("do", 1) ("you", 1) ("do", 1) ("I", 1) ("am", 1)
("well", 1) ("thank", 1) ("you", 1)
```

Finally, the `GroupAndSum` transformation sums up all individual tuples for each token:

```
("Hello", 1) ("how", 1) ("do", 2) ("you", 2) ("I", 1) ("am", 1) ("well", 1)
("thank", 1)
```

We can also apply Python functions as part of a transformation. The following example shows how the function `format_result` can be applied to earlier produced summation results. The function converts the resulting tuples into a string that then can be written to a text file:

In [16]:
def format_result(word_count):
    """Convert tuples (token, count) into a string"""
    (word, count) = word_count
    return "{}: {}".format(word, count)

output = counts | 'Format' >> beam.Map(format_result)

Apache Beam provides a variety of predefined transformations. However, if our preferred operation isn’t available, we can write our own transformations by using the `Map` operators. Just keep in mind that the operations should be able to run in a distributed way to fully take advantage of the capabilities of the runtime environments.

### Putting it all together

After discussing the individual concepts of Apache Beam’s pipelines, let’s put them all together in one example. The previous snippets and following examples are a modified version of the [Apache Beam introduction](https://beam.apache.org/get-started/wordcount-example/). For readability, the example has been reduced to the bare minimum Apache Beam code:

In [17]:
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

# The text is stored in a Google Cloud Storage bucket
input_file = "gs://dataflow-samples/shakespeare/kinglear.txt" 
output_file = "/tmp/output.txt"

# Define pipeline options object
pipeline_options = PipelineOptions()

# Set up the Apache Beam pipeline
with beam.Pipeline(options=pipeline_options) as p: 
    # Read the text file or file pattern into a PCollection
    lines = p | ReadFromText(input_file) 

    # Perform the transformations on the collection
    # Count the occurrences of each word.
    counts = ( 
        lines
        | 'Split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings
    def format_result(word_count):
        (word, count) = word_count
        return "{}: {}".format(word, count)

    output = counts | 'Format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects
    output | WriteToText(output_file)

The example pipeline downloads Shakespeare’s *King Lear* and performs the token count pipeline on the entire corpus. The results are then written to the text file located at `/tmp/output.txt`.

### Executing our basic pipeline

As an example, we can run the pipeline with Apache Beam’s DirectRunner by executing the following command (assuming that the previous example code was saved as `basic_pipeline.py`). If we want to execute this pipeline on different Apache Beam runners like Apache Spark or Apache Flink, we will need to set the pipeline configurations through the `pipeline_options` object:

```
python basic_pipeline.py
```

The results of the transformations can be found in the designated text file:

In [18]:
! head /tmp/output.txt*

KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
DUKE: 3


## Summary

In this notebook, we presented a high-level overview of TFX and discussed the importance of a metadata store as well as the general internals of a TFX component. We also introduced Apache Beam and showed how to carry out a simple data transformation using Beam.

Everything we discussed in this notebook will be useful to us as we read through the next notebooks on the pipeline components and the pipeline orchestration. The first step is to get our data into the pipeline, and we will cover this in the next notebook.


# References

- [Building Machine Learning Pipelines](https://learning.oreilly.com/library/view/building-machine-learning/9781492053187/)

- [TensorFlow in Production tutorials](https://www.tensorflow.org/tfx/tutorials)

- [TFX Estimator Component Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/components)

- [Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)

- [From Research to Production with TFX Pipelines and ML Metadata](https://blog.tensorflow.org/2019/05/research-to-production-with-tfx-ml.html)

- [Introducing the TFX interactive notebook](https://blog.tensorflow.org/2019/11/introducing-tfx-interactive-notebook.html)

- [TensorFlow Extended (TFX): Real World Machine Learning in Production](https://blog.tensorflow.org/2019/06/tensorflow-extended-tfx-real-world_26.html)

- [Towards ML Engineering: A Brief History Of TensorFlow Extended (TFX)](https://blog.tensorflow.org/2020/09/brief-history-of-tensorflow-extended-tfx.html)

- [TensorFlow Extended (TFX): Using Apache Beam for large scale data processing](https://blog.tensorflow.org/2020/03/tensorflow-extended-tfx-using-apache-beam-large-scale-data-processing.html)