In [9]:
import os
import sys
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

PIPELINE_NAME = "email-classifier-pipeline"

# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/transform.py"
TRAINER_MODULE_FILE = "modules/trainer.py"
# requirement_file = os.path.join(root, "requirements.txt")

# pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

In [10]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:

    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing", # Add a space here
        "----direct_num_workers=0"
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        beam_pipeline_args=beam_args
    )

In [11]:
from modules.components import init_components

components = init_components(
    DATA_ROOT,
    training_module=TRAINER_MODULE_FILE,
    transform_module=TRANSFORM_MODULE_FILE,
    training_steps=5000,
    eval_steps=1000,
    serving_model_dir=serving_model_dir,
)

pipeline = init_local_pipeline(components, pipeline_root)
BeamDagRunner().run(pipeline=pipeline)

Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 email_xf (InputLayer)       [(None, 1)]               0         
                                                                 
 tf.reshape (TFOpLambda)     (None,)                   0         
                                                                 
 text_vectorization (TextVe  (None, 100)               0         
 ctorization)                                                    
                                                                 
 embedding (Embedding)       (None, 100, 16)           160000    
                                                                 
 global_average_pooling1d (  (None, 16)                0         
 GlobalAveragePooling1D)                                         
                                                                 
 dense (Dense)               (None, 64)                1088  




Epoch 1: val_binary_accuracy improved from -inf to 0.52381, saving model to output/email-classifier-pipeline/Trainer/model/8/Format-Serving


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
