In [1]:
import tensorflow as tf
from tfx import v1 as tfx

print(f"Tensorflow version: {tf.__version__}")
print(f"tfx version: {tfx.__version__}")

Tensorflow version: 2.11.0
tfx version: 1.12.0


# 1. Set up pipeline

## Setting up pipeline variables

In [2]:
import os
from absl import logging

PIPELINE_NAME = "penguin-simple"

PIPELINE_ROOT = os.path.join("pipelines", PIPELINE_NAME)
METADATA_PATH = os.path.join("metadata", PIPELINE_NAME, "metadata.db")
SERVING_MODEL_DIR = os.path.join("serving_model", PIPELINE_NAME)

logging.set_verbosity(logging.INFO)

## Download example data

We will download the example dataset for use in our TFX pipeline. The dataset we are using is Palmer Penguins dataset which is also used in other TFX examples.

There are four numeric features in this dataset:

- culmen_length_mm
- culmen_depth_mm
- flipper_length_mm
- body_mass_g

In [3]:
# import urllib.request
# import tempfile

# DATA_ROOT = tempfile.mkdtemp(prefix="tfx-data") # Creating a temporary directory
# _data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
# _data_filepath = os.path.join(DATA_ROOT, "data.csv")
# urllib.request.urlretrieve(_data_url, _data_filepath)

In [4]:
# with open(_data_filepath) as f:
#     for _ in range(5):
#         print(f.readline())

In [5]:
import requests
from pathlib import Path

DATA_ROOT = os.path.join("data.csv")
Path(DATA_ROOT).touch()

In [6]:
url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
r = requests.get(url, allow_redirects=True)
with open(DATA_ROOT, "wb") as f:
    f.write(r.content)

In [7]:
with open(DATA_ROOT) as f:
    for _ in range(3):
        print(f.readline())

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g

0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667

0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556



In [8]:
_train_module_file="penguin_trainer.py"

## Create a pipeline

A pipeline consists of following three components:
- **[CsvExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)**: Reads in data files and convert them to TFX internal format for further processing
- **[Trainer](https://www.tensorflow.org/tfx/guide/trainer)**: Trains an ML model
- **[Pusher](https://www.tensorflow.org/tfx/guide/pusher)**: Copies the trained model outside of the TFX pipeline.

In [9]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
    
    """Creating a three-component penguin pipeline with TFX"""
    
    # Brings data into the pipeline
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)
    
    # Uses user-provided Python function that trains a model
    trainer = tfx.components.Trainer(
        module_file=module_file,
        examples=example_gen.outputs["examples"],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5)
    )
    
    # Pushes the model to a filesystem destination
    pusher = tfx.components.Pusher(
        model=trainer.outputs["model"],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir))
    )
    
    # Include all three components into one pipeline
    components = [
        example_gen,
        trainer,
        pusher,
    ]
    
    
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(metadata_path),
        components=components
        )
    

# 2. Run the pipeline

TFX supports multiple orchestrators to run pipelines.
In this tutorial we will use `LocalDagRunner` which is included in the TFX
Python package and runs pipelines on local environment.
We often call TFX pipelines "DAGs" which stands for directed acyclic graph.

See
[TFX on Cloud AI Platform Pipelines](https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines)
or
[TFX Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop)
to learn more about other orchestration systems.

In [None]:
# Caution: there is a path issue running the following code:
# The TFX does not have path support on Windows OS

tfx.orchestration.LocalDagRunner().run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=_train_module_file,
        serving_model_dir=SERVING_MODEL_DIR,
        metadata_path=METADATA_PATH
    ))