## Chapter 3. Data Ingestion

Steps:
1.   Read data files or request data from an external service (cloud service)
2.   Divide data (train, validation, test sets)
3.   Convert datasets into TFRecord files containing data as tf.Example data structures - these are binary files which can be digested efficiently



In [None]:
#ingestion of a folder containing CSV data

import os
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")
examples = external_input(os.path.join(base_dir, data_dir))   # define data path
example_gen = CsvExampleGen(input=examples)   # instantiate the pipeline component

context.run(example_gen)  # execute the component interactively


# when data can't be expressed efficiently as a CSV, convert the dataset to TFRecord data structures
# and load TFRecord files with ImportExampleGen component

To load new file types into your pipeline, override the executor_class.

In [None]:
# helper functions to reduce code redundancy.
# these convert the data records into the correct data structure used by tf.Example

import tensorflow as tf

def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [None]:
# ExampleGen component reads files from cloud (Google Cloud or AWS)
# this will also require properly setting up cloud specific credentials

examples = external_input("gs://example_compliance_data/")
example_gen = CsvExampleGen(input=examples)

Components to ingest datasets directly from databases
1.   BigQueryExampleGen - queries data from BigQuery tables
2.   PrestoExampleGen - queries data from Presto databases

In [None]:
# splits data into training, evaluation, and test sets (6:2:2), hash_buckets sets the ratios

from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input

base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")
output = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[   # define preferred splits
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),   # specify the ratio
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
        example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
    ]))

examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, output_config=output)    #add output_config argument

context.run(example_gen)

In [None]:
# inspect artifacts by printing list of artifacts

for artifact in example_gen.outputs['examples'].get():
    print(artifact)

Artifact(type_name: ExamplesPath,
    uri: /path/to/CsvExampleGen/examples/1/train/, split: train, id: 2)
Artifact(type_name: ExamplesPath,
    uri: /path/to/CsvExampleGen/examples/1/eval/, split: eval, id: 3)
Artifact(type_name: ExamplesPath,
    uri: /path/to/CsvExampleGen/examples/1/test/, split: test, id: 4)

"One of the significant use cases for machine learning pipelines is that we can update our machine learning models when new data becomes available. For this scenario, the ExampleGen component allows us to use spans. Think of a span as a snapshot of data. Every hour, day, or week, a batch extract, transform, load (ETL) process could make such a data snapshot and create a new span.

A span can replicate the existing data records. As shown in the following, export-1 contains the data from the previous export-0 as well as newly created records that were added since the export-0 export:"

"In machine learning pipelines, we want to track the produced models together with the used datasets, which were used to train the machine learning model. To do this, it is useful to version our datasets.

Data versioning allows us to track the ingested data in more detail. This means that we not only store the file name and path of the ingested data in the ML MetadataStore (because it’s currently supported by the TFX components) but also that we track more metainformation about the raw dataset, such as a hash of the ingested data. Such version tracking would allow us to verify that the dataset used during the training is still the dataset at a later point in time. Such a feature is critical for end-to-end ML reproducibility.

However, such a feature is currently not supported by the TFX ExampleGen component. If you would like to version your datasets, you can use third-party data versioning tools and version the data before the datasets are ingested into the pipeline. Unfortunately, none of the available tools will write the metadata information to the TFX ML MetadataStore directly."

Third party data versioning
*   Data Version Control (DVC)
*   Pachyderm
