<a href="https://colab.research.google.com/github/mralamdari/A_MLOps_Practives/blob/main/P4_Feature_Engineering_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# grader-required-cell

import os

import tensorflow as tf

from tfx import v1 as tfx
import tensorflow_transform.beam as tft_beam
from google.protobuf.json_format import MessageToDict
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

import tempfile
import pprint
import warnings

pp = pprint.PrettyPrinter()

# ignore tf warning messages
tf.get_logger().setLevel('ERROR')
warnings.filterwarnings("ignore")

In [None]:
!wget https://archive.ics.uci.edu/static/public/492/metro+interstate+traffic+volume.zip

In [None]:
# grader-required-cell

# location of the pipeline metadata store
_pipeline_root = './pipeline'

# directory of the raw data files
_data_root = './data'

# path to the raw training data
_data_filepath = os.path.join(_data_root, 'metro_traffic_volume.csv')

In [None]:
# grader-required-cell

# Preview the dataset
!head {_data_filepath}

#Create the Interactive Context
When pushing to production, you want to automate the pipeline execution using orchestrators such as Apache Beam and Kubeflow. You will not be doing that just yet and will instead execute the pipeline from this notebook. When experimenting in a notebook environment, you will be manually executing the pipeline components (i.e. you are the orchestrator). For that, TFX provides the Interactive Context so you can step through each component and inspect its outputs.

You will initialize the InteractiveContext below. This will create a database in the _pipeline_root directory which the different components will use to save or get the state of the component executions. You will learn more about this in Week 3 when we discuss ML Metadata. For now, you can think of it as the data store that makes it possible for the different pipeline components to work together.

Note: You can configure the database to connect to but for this exercise, we will just use the default which is a newly created local sqlite file. You will see the warning after running the cell below and you can safely ignore it.

In [5]:
# Initialize the InteractiveContext with a local sqlite file.
# If you leave `_pipeline_root` blank, then the db will be created in a temporary directory.
# You can safely ignore the warning about the missing config file.
context = InteractiveContext(pipeline_root=_pipeline_root)

#2 - Run TFX components interactively
In the following exercises, you will create the data pipeline components one-by-one, run each of them, and visualize their output artifacts. Recall that we refer to the outputs of pipeline components as artifacts and these can be inputs to the next stage of the pipeline.

##2.1 - ExampleGen
The pipeline starts with the ExampleGen component. It will:

split the data into training and evaluation sets (by default: 2/3 train, 1/3 eval).
convert each data row into tf.train.Example format. This protocol buffer is designed for Tensorflow operations and is used by the TFX components.
compress and save the data collection under the _pipeline_root directory for other components to access. These examples are stored in TFRecord format. This optimizes read and write operations within Tensorflow especially if you have a large collection of data.

Exercise 1: ExampleGen

In [None]:
# Instantiate ExampleGen with the input CSV dataset
example_gen = tfx.components.CsvExampleGen(input_base=_data_root)

# Run the component using the InteractiveContext instance
context.run(example_gen)

You will notice that an output cell showing the execution results is automatically shown. This metadata is recorded into the database created earlier. This allows you to keep track of your project runs. For example, if you run it again, you will notice the .execution_id incrementing.

The output of the components are called artifacts and you can see an example by navigating through .component.outputs > ['examples'] > Channel > ._artifacts > [0] above. It shows information such as where the converted data is stored (.uri) and the splits generated (.split_names).

You can also examine the output artifacts programmatically with the code below.

In [None]:
# grader-required-cell

try:
    # get the artifact object
    artifact = example_gen.outputs['examples'].get()[0]

    # print split names and uri
    print(f'split names: {artifact.split_names}')
    print(f'artifact uri: {artifact.uri}')

# for grading since context.run() does not work outside the notebook
except IndexError:
    print("context.run() was no-op")
    examples_path = './pipeline/CsvExampleGen/examples'
    dir_id = os.listdir(examples_path)[0]
    artifact_uri = f'{examples_path}/{dir_id}'

else:
    artifact_uri = artifact.uri

If you're wondering , the number in ./pipeline/CsvExampleGen/examples/{number} is the execution id associated with that dataset. If you restart the kernel of this workspace and re-run up to this cell, you will notice a new folder with a different id name created. This shows that TFX is keeping versions of your data so you can roll back if you want to investigate a particular execution.


In a notebook environment, it may be useful to examine a few examples of the data especially if you're still experimenting. Since the data collection is saved in TFRecord format, you will need to use methods that work with that data type. You will need to unpack the individual examples from the TFRecord file and format it for printing. Let's do that in the following cells:

In [None]:
# grader-required-cell

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(artifact_uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

In [None]:
# Define a helper function to get individual examples
def get_records(dataset, num_records):
    '''Extracts records from the given dataset.
    Args:
        dataset (TFRecordDataset): dataset saved by ExampleGen
        num_records (int): number of records to preview
    '''

    # initialize an empty list
    records = []

    # Use the `take()` method to specify how many records to get
    for tfrecord in dataset.take(num_records):

        # Get the numpy property of the tensor
        serialized_example = tfrecord.numpy()

        # Initialize a `tf.train.Example()` to read the serialized data
        example = tf.train.Example()

        # Read the example data (output is a protocol buffer message)
        example.ParseFromString(serialized_example)

        # convert the protocol bufffer message to a Python dictionary
        example_dict = (MessageToDict(example))

        # append to the records list
        records.append(example_dict)

    return records

In [None]:
# Get 3 records from the dataset
sample_records = get_records(dataset, 3)

# Print the output
pp.pprint(sample_records)

Now that `ExampleGen` has finished ingesting the data, the next step is data analysis.


##2.2 - StatisticsGen
The StatisticsGen component computes statistics over your dataset for data analysis, as well as for use in downstream components. It uses the TensorFlow Data Validation library.

StatisticsGen takes as input the dataset ingested using CsvExampleGen.

In [None]:
# Instantiate StatisticsGen with the ExampleGen ingested dataset
statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs['examples'])

# Execute the component
context.run(statistics_gen)

In [None]:
# Show the output statistics
context.show(statistics_gen.outputs['statistics'])