
This notebook contains an examples on how to author and run Python function
components within the TFX InteractiveContext and in a locally-orchestrated TFX
pipeline.

For more context and information, see the [Custom Python function components](https://www.tensorflow.org/tfx/guide/custom_function_component)
page on the TFX documentation site.

In [None]:
from __future__ import absolute_import
from BashColors import C
import json, os, sys
from os.path import *
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '5'

from tfx import v1 as tfx
print(f'tfx version: {tfx.__version__}')

## Custom Python function components

In this section, we will create components from Python functions. We will notbe
doing any real ML problem — these simple functions are just used to illustrate
the Python function component development process.

See [Python function based component
guide](https://www.tensorflow.org/tfx/guide/custom_function_component)
for more documentation.

In [None]:
%%writefile my_generator.py
# Create Python custom components
# Write a function that generate some dummy data. This is written to its own Python module file.
import os
import tensorflow as tf  # Used for writing files.
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
    """Create a file with dummy data in the output artifact."""
    with tf.io.gfile.GFile(os.path.join(data.uri,
                                        'data_file.txt'), 'w') as f:
        f.write('Dummy data')
        
    # Set metadata and ensure that it gets passed to downstream components.
    data.set_string_custom_property('my_custom_field', 'my_custom_value')

In [None]:
%%writefile my_consumer.py
# write a second component that uses the dummy data produced.
# Calculate hash of the data and return it.
import hashlib
import os
import tensorflow as tf
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
    """Reads the contents of data and calculate."""
    with tf.io.gfile.GFile(os.path.join(data.uri,
                                        'data_file.txt'), 'r') as f:
        contents = f.read()
    h = hashlib.new(algorithm)
    h.update(tf.compat.as_bytes(contents))
    hash.value = h.hexdigest()

    # Read a custom property from the input artifact and set to the output.
    custom_value = data.get_string_custom_property('my_custom_field')
    hash.set_string_custom_property('input_custom_field', custom_value)

### Run in-notebook with the InteractiveContext
Now, we will demonstrate usage of our new components in the TFX
InteractiveContext.

For more information on what you can do with the TFX notebook
InteractiveContext, see the in-notebook [TFX Keras Component Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/components_keras).

In [None]:
from my_generator import MyGenerator
from my_consumer import MyConsumer

In [None]:
# Construct the InteractiveContext
# Create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()

#### Run your component interactively with `context.run()`
Next, we run our components interactively within the notebook with
`context.run()`. Our consumer component uses the outputs of the generator
component.

In [None]:
generator = MyGenerator()
context.run(generator)

In [None]:
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)

After execution, we can inspect the contents of the "hash" output artifact of
the consumer component on disk.

In [None]:
!tail -v {consumer.outputs['hash'].get()[0].uri}

That's it, and you've now written and executed your own custom components!

### Write a pipeline definition

Next, we will author a pipeline using these same components. While using the
`InteractiveContext` within a notebook works well for experimentation, defining
a pipeline lets you deploy your pipeline on local or remote runners for
production usage.

Here, we will demonstrate usage of the LocalDagRunner running locally on your
machine. For production execution, the Airflow or Kubeflow runners may
be more suitable.

#### Construct a pipeline

In [None]:
import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
print(f'PIPELINE_ROOT: {C.BIBlue}{PIPELINE_ROOT}{C.ColorOff}')
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
print(f'PIPELINE_NAME: {C.BIBlue}{PIPELINE_NAME}{C.ColorOff}')
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))
print(f'METADATA_CONNECTION_CONFIG: {C.BIBlue}{METADATA_CONNECTION_CONFIG}{C.ColorOff}')

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

In [None]:
# Run pipeline with the `LocalDagRunner`
tfx.orchestration.LocalDagRunner().run(my_pipeline)

In [None]:
# Inspect the output artifacts generated by this pipeline execution.
!find {PIPELINE_ROOT}

You have now written your own custom components and orchestrated their
execution on the LocalDagRunner! For next steps, check out additional tutorials
and guides on the [TFX website](https://www.tensorflow.org/tfx).