In [None]:
%%bash

mkdir -p ../data/dataset1
mkdir -p ../data/dataset2

python3 ../utils/download_dataset.py
python3 ../utils/convert_to_tfrecords.py

In [1]:
import sys
import os

root_dir = os.path.split(os.getcwd())[0]

sys.path.append(root_dir)
from utils.configurations.config import Config

# Content of the Table

>- [Data Ingestion](#Data-Ingestion)
>-[ What is InteractiveContext?](#What-is-InteractiveContext?)
>-[Output of the component](#Output-of-the-component)
>-[what metadata store is for?](#what-metadata-store-is-for?)
>>- [atrifacts Tables](#atrifacts-Tables)
>>- [Contexts Tables](#Contexts-Tables)
>>- [Executions Tables](#Executions-Tables)
>- [Loding dataset from tf_records](#Loding-dataset-from-tf_records)
>-[Configuration Options](#Configuration-Options)
>>- [splitting](#splitting)
>>- [If data is stored in spitted manner](#If-data-is-stored-in-spitted-manner)
>>- [Span](#Span)
>-[Add-ons](#Add-ons)

## Data Ingestion

This component of the pipeline is used to read data files or request the data for our pipeline run or from an external service (e.g., Google Cloud BigQuery) and outputs an artifact for the further step. Before passing the ingested dataset to the next component, we divide the available data into training and validation datasets (split ratio and no of splits are configurable) and then convert the datasets into TFRecord files containing the data represented as tf.Example data structures.

In [None]:
import warnings
warnings.filterwarnings('ignore', 'absl')

In [None]:
import pprint
import shutil
import pandas as pd
from collections import defaultdict

import tensorflow as tf

from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input
from tfx.orchestration.experimental.interactive.interactive_context \
        import InteractiveContext

pp = pprint.PrettyPrinter()

In [None]:
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

## What is InteractiveContext?
The notebook is also used as an orchestater to run the pipeline components manually. The InteractiveContext class will be used in notebooks which helps us to reviewed the components artifacts immediately.

Once you have confirmed the full functionality of your pipeline Components, you can convert your interactive pipeline to a production-ready pipeline by orchestrating it with DataFlow, kubeflow etc.

In [None]:
pipeline_name = Config.PIPELINE_NAME
base_root = os.path.split(os.getcwd())[0]
pipeline_root = os.path.join(base_root, Config.PIPELINE_FOLDER)
beam_args = [
    '--runner=DirectRunner'
]

if not os.path.exists(pipeline_root):
    os.makedirs(pipeline_root)


context = InteractiveContext(pipeline_name = pipeline_name,
                            pipeline_root = pipeline_root,
                            beam_pipeline_args = beam_args)

In [None]:
data_dir = os.path.join(root_dir, 'data', 'dataset1')

print(*os.listdir(data_dir), sep = '\n')

CsvExampleGen is used to read multiple csv data file from given direct and outputs data in TFRecords format (split no and ratio will be depends on the configuration given) which will be used by further compents


>note:
configuring split ratio and number, span patter is demonstrated  in 'Configuration Option' Section

In [None]:
examples = external_input(data_dir)
example_gen = CsvExampleGen(input = examples)

Below cell will run the component and shows the artifact and its property

the metadata of the
run will be shown in the Jupyter Notebook. The outputs of the component, highlighting the storage locations of the training and the evaluation
dataset

In [None]:
context.run(example_gen)

DataIngestion compent with default configuration will create train and eval folder and the data will be split in 2:1 ration

In [None]:
example_gen_prop = example_gen.outputs['examples'].get()[0]

print('Artifact Location: ')
print(f'\t {example_gen_prop.uri}')
print()

print('Files: ')
print('\t train')
print(f'\t\t {os.listdir(os.path.join(example_gen_prop.uri, "train"))}')
print('\t eval')
print(f'\t\t {os.listdir(os.path.join(example_gen_prop.uri, "eval"))}')

## Output of the component

In [None]:
split_names = eval(example_gen_prop.split_names)
artifact = os.path.join(example_gen_prop.uri, split_names[0])
files = [os.path.join(artifact, i) for i in os.listdir(artifact)]

train = tf.data.TFRecordDataset(filenames = files, compression_type = 'GZIP')

In [None]:
for data in train.take(1):
    serialized_example = data.numpy()
    example = tf.train.Example()
    example.ParseFromString(serialized_example)
    pp.pprint(example)

## what metadata store is for?

The Metadata Store uses the following data model to record and retrieve metadata from the storage backend.

- ArtifactType describes an artifact's type and its properties that are stored in the metadata store. You can register these types on-the-fly with the metadata store in code, or you can load them in the store from a serialized format. Once you register a type, its definition is available throughout the lifetime of the store.
- An Artifact describes a specific instance of an ArtifactType, and its properties that are written to the metadata store.
- An ExecutionType describes a type of component or step in a workflow, and its runtime parameters.
- An Execution is a record of a component run or a step in an ML workflow and the runtime parameters. An execution can be thought of as an instance of an ExecutionType. Executions are recorded when you run an ML pipeline or step.
- An Event is a record of the relationship between artifacts and executions. When an execution happens, events record every artifact that was used by the execution, and every artifact that was produced. These records allow for lineage tracking throughout a workflow. By looking at all events, MLMD knows what executions happened and what artifacts were created as a result. MLMD can then recurse back from any artifact to all of its upstream inputs.
- A ContextType describes a type of conceptual group of artifacts and executions in a workflow, and its structural properties. For example: projects, pipeline runs, experiments, owners etc.
- A Context is an instance of a ContextType. It captures the shared information within the group. For example: project name, changelist commit id, experiment annotations etc. It has a user-defined unique name within its ContextType.
- An Attribution is a record of the relationship between artifacts and contexts.
- An Association is a record of the relationship between executions and contexts.



For the execution tracking of the artifacts and the lineage tracking capabilities (for example, telling which model or statistics correspond to which dataset or pipeline run), we’ve  to deal with Events, Contexts and Executions.

- Events associate artifact_ids with execution_ids
- Executions only track type_ids and timestamps
- Contexts correlate type_ids with Pipeline runs and timestamp information

The tables ExecutionProperty and ContextProperty contain extra data
- ExecutionProperties contain input and output configuration passed to each component, along with pipeline and step root directories, and IO locations of artifacts.
- ContextProperties associate context_ids with pipeline component names and timestamps


In [None]:
connection_config = context.metadata_connection_config
store = metadata_store.MetadataStore(connection_config)

base_dir = connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]

In [None]:
def display_properties(input):
    data = defaultdict(list)
    for artifact in input:
        properties = artifact.properties
        custom_properties = artifact.custom_properties
        for key, value in properties.items():
            data['artifact id'].append(artifact.id)
            data['type_id'].append(artifact.type_id)
            data['name'].append(key)
            data['is_customproperty'].append(0)
            data['value'].append(value.string_value)

            
        for key, value in custom_properties.items():
            data['artifact id'].append(artifact.id)
            data['type_id'].append(artifact.type_id)
            data['name'].append(key)
            data['is_customproperty'].append(1)
            data['value'].append(value.string_value)
    return pd.DataFrame(data)


def display_types(types):
    table = {'id': [], 'name': []}
    for a_type in types:
        table['id'].append(a_type.id)
        table['name'].append(a_type.name.split('.')[-1])
    return pd.DataFrame(data=table)

def display_artifacts(store, artifacts):
    table = defaultdict(list)
    for a in artifacts:
        table['artifact id'].append(a.id)
        artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
        table['type'].append(artifact_type.name)
        table['uri'].append(a.uri.replace(base_dir, './'))
        table['create_time_since_epoch'].append(a.create_time_since_epoch)
        table['last_update_time_since_epoch'].append(a.last_update_time_since_epoch)
    return pd.DataFrame(data=table)

In [None]:
def display_context(store, artifacts):
    table = defaultdict(list)
    for a in artifacts:
        table['artifact id'].append(a.id)
        artifact_type = store.get_context_types_by_id([a.type_id])[0]
        table['type'].append(artifact_type.name)
        table['name'].append(a.name)
        table['create_time_since_epoch'].append(a.create_time_since_epoch)
        table['last_update_time_since_epoch'].append(a.last_update_time_since_epoch)
    return pd.DataFrame(data=table)

def display_executions(store, artifacts):
    table = defaultdict(list)
    for a in artifacts:
        table['artifact id'].append(a.id)
        artifact_type = store.get_execution_types_by_id([a.type_id])[0]
        table['type'].append(artifact_type.name.split('.')[-1])
        e_state = a.last_known_state
        if e_state == 2:
            table['last_known_state'].append('Running')
        elif e_state == 3:
            table['last_known_state'].append('Success')
        else:
            table['last_known_state'].append(e_state)
        table['create_time_since_epoch'].append(a.create_time_since_epoch)
        table['last_update_time_since_epoch'].append(a.last_update_time_since_epoch)
    return pd.DataFrame(data=table)

### atrifacts Tables

In [None]:
display_artifacts(store, store.get_artifacts())

In [None]:
display_types(store.get_artifact_types())

In [None]:
display_properties(store.get_artifacts())

### Contexts Tables

In [None]:
display_context(store, store.get_contexts())

In [None]:
display_types(store.get_context_types())

In [None]:
display_properties(store.get_contexts())

### Executions Tables

In [None]:
display_executions(store, store.get_executions())

In [None]:
display_properties(store.get_executions())

In [None]:
display_types(store.get_execution_types())

## Loding dataset from tf_records

Why TFRecord?

If you are working with large datasets, using a binary file format for storage of your data can have a significant impact on the performance of your import pipeline and as a consequence on the training time of your model. Binary data takes up less space on disk, takes less time to copy and can be read much more efficiently from disk. This is especially true if your data is stored on spinning disks, due to the much lower read/write performance in comparison with SSDs.

However, pure performance isn’t the only advantage of the TFRecord file format. It is optimized for use with Tensorflow in multiple ways. To start with, it makes it easy to combine multiple datasets and integrates seamlessly with the data import and preprocessing functionality provided by the library. Especially for datasets that are too large to be stored fully in memory this is an advantage as only the data that is required at the time (e.g. a batch) is loaded from disk and then processed. Another major advantage of TFRecords is that it is possible to store sequence data — for instance, a time series or word encodings — in a way that allows for very efficient and (from a coding perspective) convenient import of this type of data. 

[reference](https://www.quora.com/Is-it-especially-good-to-use-tfRecord-as-input-data-format-if-I-am-using-Keras-Tensorflow)

In [None]:
from tfx.components import ImportExampleGen

root_dir = os.path.split(os.getcwd())[0]
data_dir = os.path.join(root_dir, 'data', 'dataset2')

print(*os.listdir(data_dir), sep = '\n')

ImportExampleGen is used to load TFRecord files into the pipeline.

It will make sense to load nlp data as TFRecord file were text corpora can snowball to a considerable size.To ingest such datasets efficiently, it is always recommend to converting the datasets as TFRecord or Apache Parquet representations. 

Image datasets from the image files has to be convert into TFRecord files, but
not to decode the images. Any decoding of highly compressed images only increases
the amount of disk space needed to store the intermediate tf.Example records.

In [None]:
examples = external_input(data_dir)
example_gen = ImportExampleGen(input=examples)
context.run(example_gen)

In [None]:
display_executions(store, store.get_executions())

In [None]:
display_properties(store.get_artifacts())

## Configuration Options

### splitting

Later in our pipeline, we will want to evaluate our machine learning model during the
training and test it during the model analysis step. Therefore, it is beneficial to split
the dataset into the required subsets.

In [None]:
from tfx.proto import example_gen_pb2

Configuring output as train, test and eval with 6:2:2 ration

The following cell is volentierly scripted to go under exception and complete the run succesfully after the exception accor.

This is done to demonstrate the use of metadatastore when the execution of pipeline gone under some execption in production

In [None]:
try:
    data_dir = os.path.join(os.pardir, "data/dataset")

    output = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6), 
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2), 
        example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)]
                                                ))

    examples = external_input(data_dir)
    example_gen = CsvExampleGen(input=examples, output_config=output)
    context.run(example_gen)
except:
    data_dir = os.path.join(os.pardir, "data/dataset1")

    output = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6), 
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2), 
        example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)]
                                                ))

    examples = external_input(data_dir)
    example_gen = CsvExampleGen(input=examples, output_config=output)
    context.run(example_gen)

In [2]:
folder = Config.PIPELINE_FOLDER

In [3]:
%%bash -s "$folder"
tree ../$1

../temp_
├── CsvExampleGen
│   └── examples
│       ├── 1
│       │   ├── eval
│       │   │   └── data_tfrecord-00000-of-00001.gz
│       │   └── train
│       │       └── data_tfrecord-00000-of-00001.gz
│       ├── 3
│       ├── 4
│       │   ├── eval
│       │   │   └── data_tfrecord-00000-of-00001.gz
│       │   ├── test
│       │   │   └── data_tfrecord-00000-of-00001.gz
│       │   └── train
│       │       └── data_tfrecord-00000-of-00001.gz
│       └── 6
│           ├── eval
│           │   └── data_tfrecord-00000-of-00001.gz
│           └── train
│               └── data_tfrecord-00000-of-00001.gz
├── ExampleValidator
│   └── anomalies
│       └── 9
│           ├── eval
│           │   └── anomalies.pbtxt
│           └── train
│               └── anomalies.pbtxt
├── ImportExampleGen
│   └── examples
│       ├── 2
│       │   ├── eval
│       │   │   └── data_tfrecord-00000-of-00001.gz
│       │   └── train
│       │       └── data_tfrecord-00000-of-00001.gz
│       └── 5
│    

Exception run is marked with red color. you can find that last_known_state is marked as 'Running' using this you can trace the exception component and backtrack to find the reason for that happend

In [None]:
def highlight(s):
    if s.last_known_state == 'Running':
        return ['background-color: red']*5
    else:
        return ['background-color: white']*5

execution = display_executions(store, store.get_executions())
execution.style.apply(highlight, axis = 1)

In [None]:
artifact_id = execution['artifact id'].loc[execution.last_known_state == 'Running'].values[0]

def highlight(s):
    if s['artifact id'] == artifact_id:
        return ['background-color: lightblue']*5
    elif s['artifact id'] == artifact_id + 1:
        return ['background-color: lightgreen']*5
    else:
        return ['background-color: white']*5

execution_prop = display_properties(store.get_executions())
execution_prop = execution_prop.loc[(execution_prop['artifact id'] == artifact_id) | (execution_prop['artifact id'] == artifact_id+1)].sort_values(by=['name','artifact id'])
execution_prop.style.apply(highlight, axis = 1)

### If data is stored in spitted manner

In some situations, we have already generated the subsets of the datasets externally,
and we would like to preserve these splits when we ingest the datasets. We can ach‐
ieve this by providing an input configuration.

In [None]:
example_gen_prop = example_gen.outputs['examples'].get()[0]

shutil.copytree(example_gen_prop.uri, '../data/dataset3')

In [None]:
from tfx.proto import example_gen_pb2

root_dir = os.path.split(os.getcwd())[0]
data_dir = os.path.join(root_dir, 'data', 'dataset3')

input = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern='train/*'),
example_gen_pb2.Input.Split(name='eval', pattern='eval/*'),
example_gen_pb2.Input.Split(name='test', pattern='test/*')
])

examples = external_input(os.path.join(base_dir, data_dir))
example_gen = ImportExampleGen(input=examples, input_config=input)
context.run(example_gen)

In [None]:
execution_property = display_properties(store.get_executions())
execution_property.loc[execution_property['artifact id'] == max(execution_property['artifact id'])]

### Span

One of the significant use cases for machine learning pipelines is that we can update
our machine learning models when new data becomes available. For this scenario,
the ExampleGen component allows us to use spans. Think of a span as a snapshot of
data. Every hour, day, or week, a batch extract, transform, load (ETL) process could
make such a data snapshot and create a new span.
A span can replicate the existing data records. As shown in the following, export-1
contains the data from the previous export-0 as well as newly created records

We can now specify the patterns of the spans. The input configuration accepts a
{SPAN} placeholder, which represents the number (0, 1, 2, ...) shown in our folder
structure. With the input configuration, the ExampleGen component now picks up
the “latest” span. In our example, this would be the data available under folder
export-2

In [None]:
%%bash

mkdir -p ../data/dataset4/export-0
mkdir -p ../data/dataset4/export-1
mkdir -p ../data/dataset4/export-2

file_l_count=$(wc -l < ../data/dataset1/consumer_complaints_with_narrative.csv)
head -n $(( file_l_count/3 )) ../data/dataset1/consumer_complaints_with_narrative.csv >> ../data/dataset4/export-0/consumer_complaints_with_narrative_$(( file_l_count/3 )).csv
head -n $(( file_l_count/2)) ../data/dataset1/consumer_complaints_with_narrative.csv >> ../data/dataset4/export-1/consumer_complaints_with_narrative_$(( file_l_count/2 )).csv
cp ../data/dataset1/consumer_complaints_with_narrative.csv ../data/dataset4/export-2/consumer_complaints_with_narrative_$file_l_count.csv

tree ../data/dataset4

In [None]:
base_dir = os.path.split(os.getcwd())[0]
data_dir = os.path.join(base_dir, "data", "dataset4")


input = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(pattern='export-{SPAN}/*')
])
examples = external_input(data_dir)
example_gen = CsvExampleGen(input=examples, input_config=input)
context.run(example_gen)

In [None]:
execution_prperties = display_properties(store.get_executions())
temp_val = execution_prperties.loc[(execution_prperties['name'] == 'input_base') | 
                         (execution_prperties['name'] == 'span')]
temp_val = temp_val.reset_index()
temp_val.drop('index', axis = 1, inplace = True)
temp_val = temp_val.sort_values(['artifact id', 'name'])

you can find that the span for current run is stored as 2 which means that ExampleGen component automaticaly fetched the current datafile from the given folder based on pattern configured in input_config

In [None]:
temp_val.style.highlight_max(subset = ['value'],
                       color = 'lightgreen', axis = 0)


## Add-ons

### Ingesting Data from avro or parquest file format

#### from Avro-serialized data

```
from tfx.components import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import avro_executor
from tfx.utils.dsl_utils import external_input
examples = external_input(avro_dir_path)

example_gen = FileBasedExampleGen(
    input=examples,
    executor_class=avro_executor.Executor)
```

####  from Parquet-serialized data

```
from tfx.components.example_gen.custom_executors import parquet_executor
example_gen = FileBasedExampleGen(
input=examples,
executor_class=parquet_executor.Executor)
```

### Ingesting data from Data Base

#### from bigquery database
```
from tfx.components import BigQueryExampleGen
query = """
SELECT * FROM `<project_id>.<database>.<table_name>`
"""
example_gen = BigQueryExampleGen(query=query)
```



>Note:
            In TFX versions greater than 0.22.0, the BigQueryExampleGen
            component needs to be imported from tfx.extensions.goo
            gle_cloud_big_query :
>```
from tfx.extensions.google_cloud_big_query.example_gen import component as big_query_example_gen_component
big_query_example_gen_component.BigQueryExampleGen(query=query)
>```

#### from presto database
```
from proto import presto_config_pb2
from presto_component.component import PrestoExampleGen

query = """
SELECT * FROM `<project_id>.<database>.<table_name>`
"""
presto_config = presto_config_pb2.PrestoConnConfig(
host='localhost',
port=8080)
example_gen = PrestoExampleGen(presto_config, query=query)
```