## Local runner pipeline diamonds model notebook

the goal of this notebook is to develop a code for a local pipeline run. 

the structure:

    -diamonds directory - holds all the code, including the notebook code
    -pipeline directory - holds the python code for running the pipeline
    -model directory - holds the code for running the pipeline components 

In [None]:
!pip install --upgrade tfx

In [2]:
import os

In [3]:
data = 'gs://diamonds_data'

if data:
    print("it exists")

it exists


In [4]:
ls -l ~/model

total 4
drwxr-xr-x 4 jupyter jupyter 4096 Feb 25 00:54 [0m[01;34mstored_model[0m/


In [5]:
##create a directory for all the pipeline code

##pipeline_home_directory = '~/diamonds'

##os.makedirs(pipeline_home_directory, exist_ok=True)

components_directory = 'model'
os.makedirs(components_directory, exist_ok=True)

pipeline_directory = 'pipeline'
os.makedirs(pipeline_directory, exist_ok=True)

data_directory = 'gs://diamonds_data/'

if not data_directory:
    data_directory = os.path.join('~', 'data')
    os.makedirs(data_directory, exist_ok=True)

## write all the pipeline components code to python modules

In [6]:
%%writefile model/__init__.py

##module init

Overwriting model/__init__.py


In [7]:

%%writefile model/features.py

NUMERICAL_FEATURES = ['carat','depth','table','x','y','z']

CATEGORICAL_FEATURES = ['clarity','color','cut']

LABEL_KEY = 'price'


def transformed_name(key):
  return key + '_xf'

Overwriting model/features.py


In [8]:
%%writefile model/preprocessing.py
## we are going to preprocess our data through Transform library

## we will need to define a preprocessing module (a python function that will preprocess the data)

##the Transform component is looking for preprocessing_fn  in the module 

## steps to define preprocessing function

## 1. define list of columns based on the data they contain (numerical, cathegorical, etc )

## 2. define a preprocessing_fn

##this is a preprocessing function based on Transform tfx module

import tensorflow as tf
import tensorflow_transform as tft

import importlib

from model import features
importlib.reload(features)

##import pipeline.features as features


##create a list of feature names here

def transformed_name(key):
  return key + '_xf'

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)


def preprocessing_fn(inputs):

  ##create outputs out of inputs 
  outputs = {}

  ##preprocess numerical 
  for feature in features.NUMERICAL_FEATURES:
    ##we have to use dense tensors in our keras layers 
    outputs[transformed_name(feature)] = tft.scale_to_z_score(_fill_in_missing(inputs[feature]))

  for feature in features.CATEGORICAL_FEATURES:
    outputs[transformed_name(feature)] = tft.compute_and_apply_vocabulary(
        x = _fill_in_missing(inputs[feature]),
        num_oov_buckets=1,
        vocab_filename=feature
    )  

  ##for feature in features.LABEL_KEY:
    feature = features.LABEL_KEY
    outputs[transformed_name(feature)] = _fill_in_missing(inputs[feature])

  return outputs

Overwriting model/preprocessing.py


In [9]:
%%writefile model/model.py

##trainer and tuner module

import functools
import absl
import os
from typing import List, Text


import kerastuner
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx.components.tuner.component import TunerFnResult
from tfx_bsl.tfxio import dataset_options

import importlib

##import pipeline.features as features
from model import features
importlib.reload(features)

EPOCHS = 10
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(filenames, compression_type='GZIP')


def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(features.LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text],
              data_accessor: DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=features.transformed_name(features.LABEL_KEY)),
      tf_transform_output.transformed_metadata.schema)
    
  return dataset




def _get_hyperparameters() -> kerastuner.HyperParameters:
  """Returns hyperparameters for building Keras model."""
  hp = kerastuner.HyperParameters()
  # Defines search space.
  hp.Choice('learning_rate', [1e-1,1e-2, 1e-3], default=1e-3)
  hp.Int('n_layers', 1, 2, default=1)
  with hp.conditional_scope('n_layers', 1):
        hp.Int('n_units_1', min_value=8, max_value=128, step=8, default=8)
  with hp.conditional_scope('n_layers', 2):
        hp.Int('n_units_1', min_value=8, max_value=128, step=8, default=8)
        hp.Int('n_units_2', min_value=8, max_value=128, step=8, default=8)        

  return hp


def _build_keras_model(hparams: kerastuner.HyperParameters, 
  tf_transform_output: tft.TFTransformOutput) -> tf.keras.Model:
  """Creates a Keras WideDeep Classifier model.
  Args:
    hparams: Holds HyperParameters for tuning.
    tf_transform_output: A TFTransformOutput.
  Returns:
    A keras Model.
  """
  """
  we are going to skip building feature columns for now
  deep_columns = [
      tf.feature_column.numeric_column(
          key=features.transformed_name(key), 
          shape=())
      for key in features.NUMERIC_FEATURE_KEYS
  ]
  """

  ##creating a list of sparse inputs, specifically indicating it during the input creation  
  input_layers = [
      tf.keras.layers.Input(name=features.transformed_name(column), shape=(1,))
      for column in features.NUMERICAL_FEATURES + features.CATEGORICAL_FEATURES
  ]    
  print("is this ident error too?")
  """
  also skipping these feature columns
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key=features.transformed_name(key), 
          num_buckets=tf_transform_output.num_buckets_for_transformed_feature(features.transformed_name(key)), 
          default_value=0)
      for key in features.CATEGORICAL_FEATURE_KEYS
  ]

  wide_columns = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]
    
  input_layers.update({
      column.categorical_column.key: tf.keras.layers.Input(name=column.categorical_column.key, shape=(), dtype=tf.int32)
      for column in wide_columns
  })
  """

  ##  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)

  """

  for n in range(int(hparams.get('n_layers'))):
    deep = tf.keras.layers.Dense(units=hparams.get('n_units_' + str(n + 1)))(deep)

  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(features.NUM_CLASSES, activation='softmax')(
               tf.keras.layers.concatenate([deep, wide]))

  """
  concat_inputs = tf.keras.layers.concatenate(input_layers)

  for n in range(int(hparams.get('n_layers'))):
    x = tf.keras.layers.Dense(units=hparams.get('n_units_' + str(n +1)), activation='relu')(concat_inputs)
 
  output = tf.keras.layers.Dense(1)(x)

  model = tf.keras.Model(input_layers, output)

  ##since this is a regression model, we are using
  ## loss = 
  model.compile(
      ##commented out stuff is from a template for classification
      ##loss='sparse_categorical_crossentropy',
      loss='mean_squared_error',
      optimizer=tf.keras.optimizers.Adam(lr=hparams.get('learning_rate')),
      ##metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
      metrics=[tf.keras.metrics.RootMeanSquaredError()])
  model.summary(print_fn=absl.logging.info)

  return model    


# TFX Tuner will call this function.
def tuner_fn(fn_args: TrainerFnArgs) -> TunerFnResult:
  """Build the tuner using the KerasTuner API.
  Args:
    fn_args: Holds args as name/value pairs.
      - working_dir: working dir for tuning.
      - train_files: List of file paths containing training tf.Example data.
      - eval_files: List of file paths containing eval tf.Example data.
      - train_steps: number of train steps.
      - eval_steps: number of eval steps.
      - schema_path: optional schema of the input data.
      - transform_graph_path: optional transform graph produced by TFT.
  Returns:
    A namedtuple contains the following:
      - tuner: A BaseTuner that will be used for tuning.
      - fit_kwargs: Args to pass to tuner's run_trial function for fitting the
                    model , e.g., the training and validation dataset. Required
                    args depend on the above tuner's implementation.
  """
  transform_graph = tft.TFTransformOutput(fn_args.transform_graph_path)
  
  # Construct a build_keras_model_fn that just takes hyperparams from get_hyperparameters as input.
  build_keras_model_fn = functools.partial(
      _build_keras_model, tf_transform_output=transform_graph)  

  # BayesianOptimization is a subclass of kerastuner.Tuner which inherits from BaseTuner.   
  tuner = kerastuner.BayesianOptimization(
      build_keras_model_fn,
      max_trials=20,
      overwrite=True,
      hyperparameters=_get_hyperparameters(),
      # New entries allowed for n_units hyperparameter construction conditional on n_layers selected.
#       allow_new_entries=True,
#       tune_new_entries=True,
      ##objective=kerastuner.Objective('val_sparse_categorical_accuracy', 'max'),
      objective=kerastuner.Objective('val_loss', 'min'),
      distribution_strategy=tf.distribute.MirroredStrategy(),
      directory=fn_args.working_dir,
      project_name='diamond_price_tuning')
  
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      transform_graph,
      batch_size=TRAIN_BATCH_SIZE)

  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      transform_graph,
      batch_size=EVAL_BATCH_SIZE)

  return TunerFnResult(
      tuner=tuner,
      fit_kwargs={
          'epochs':EPOCHS,
          'x': train_dataset,
          'validation_data': eval_dataset,
          'steps_per_epoch': fn_args.train_steps,
          'validation_steps': fn_args.eval_steps,
          'callbacks':[tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)]
      })


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.
  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files, 
      fn_args.data_accessor, 
      tf_transform_output, 
      TRAIN_BATCH_SIZE)

  eval_dataset = _input_fn(
      fn_args.eval_files, 
      fn_args.data_accessor,
      tf_transform_output, 
      EVAL_BATCH_SIZE)

  if fn_args.hyperparameters:
    hparams = kerastuner.HyperParameters.from_config(fn_args.hyperparameters)
  else:
    # This is a shown case when hyperparameters is decided and Tuner is removed
    # from the pipeline. User can also inline the hyperparameters directly in
    # _build_keras_model.
    hparams = _get_hyperparameters()
  absl.logging.info('HyperParameters for training: %s' % hparams.get_config())
  
  # Distribute training over multiple replicas on the same machine.
  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
      model = _build_keras_model(
            hparams=hparams,
            tf_transform_output=tf_transform_output)

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir, update_freq='batch')
  
  earlystopping_callback = tf.keras.callbacks.EarlyStopping(
      monitor='val_loss', patience=3, mode='min'
  )

  model.fit(
      train_dataset,
      epochs=EPOCHS,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback,earlystopping_callback])
    
  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
    

Overwriting model/model.py


In [10]:
%%writefile pipeline/__init__.py

##module init

Overwriting pipeline/__init__.py


In [11]:
%%writefile pipeline/configs.py

import os  # pylint: disable=unused-import

# TODO(b/149347293): Move more TFX CLI flags into python configuration.

# Pipeline name will be used to identify this pipeline.
PIPELINE_NAME = 'diamonds-pipeline'

# GCP related configs.

# Following code will retrieve your GCP project. You can choose which project
# to use by setting GOOGLE_CLOUD_PROJECT environment variable.

"""
try:
  import google.auth  # pylint: disable=g-import-not-at-top  # pytype: disable=import-error
  try:
    _, GOOGLE_CLOUD_PROJECT = google.auth.default()
  except google.auth.exceptions.DefaultCredentialsError:
    GOOGLE_CLOUD_PROJECT = ''
except ImportError:
  GOOGLE_CLOUD_PROJECT = ''
"""
# Specify your GCS bucket name here. You have to use GCS to store output files
# when running a pipeline with Kubeflow Pipeline on GCP or when running a job
# using Dataflow. Default is '<gcp_project_name>-kubeflowpipelines-default'.
# This bucket is created automatically when you deploy KFP from marketplace.
#GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-kubeflowpipelines-default'

# Following image will be used to run pipeline components run if Kubeflow
# Pipelines used.
# This image will be automatically built by CLI if we use --build-image flag.
#PIPELINE_IMAGE = f'gcr.io/{GOOGLE_CLOUD_PROJECT}/{PIPELINE_NAME}'

PREPROCESSING_FN = 'model.preprocessing.preprocessing_fn'
RUN_FN = 'model.model.run_fn'
TUNER_FN = 'model.model.tuner_fn'

TRAIN_NUM_STEPS = 5000
EVAL_NUM_STEPS = 1000

# Change this value according to your use cases.
EVAL_ACCURACY_THRESHOLD = 0.6

# Google Cloud BigQuery related configs.
# Use following configs to use BigQueryExampleGen as a data source.
#
# Beam args to use BigQueryExampleGen with Beam DirectRunner.
#
# BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
#    '--project=' + GOOGLE_CLOUD_PROJECT,
#    '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
#    ]

# The query that extracts the examples from BigQuery.
#
# BIG_QUERY_QUERY = """
#         SELECT ...
#         FROM
#         WHERE
# """


Overwriting pipeline/configs.py


In [42]:
%%writefile pipeline/pipeline.py

from typing import List, Optional

import tensorflow_model_analysis as tfma
import tfx
from model import features

from ml_metadata.proto import metadata_store_pb2

##local variables for modules

def create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_path: str,
    preprocessing_fn: str,
    run_fn: str,
    tuner_fn: str,
    enable_tuning: bool,
    train_args: tfx.v1.proto.TrainArgs,
    eval_args: tfx.v1.proto.EvalArgs,
    eval_accuracy_threshold: float,
    serving_model_dir: str,
    schema_path: Optional[str] = None,
    metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
    beam_pipeline_args: Optional[List[str]] = None,
) -> tfx.v1.dsl.Pipeline:


    components = []

  ##ExampleGen 
    output_config = tfx.proto.example_gen_pb2.Output(
    split_config = tfx.proto.example_gen_pb2.SplitConfig(splits=[
          tfx.proto.example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
          tfx.proto.example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)

    ])
    )

    example_gen = tfx.components.CsvExampleGen(
    input_base = data_path,
    output_config = output_config
    )

  ##adding ExampleGen component 
    components.append(example_gen)

  ## StatisticsGen
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs['examples']
    )

    components.append(statistics_gen)

  ##SchemaGen

    if schema_path is None:
        schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'],
                       infer_feature_shape=False)
    
    else:
        schema_gen = tfx.components.SchemaGen(schema_file=schema_path)

    components.append(schema_gen)


    ##Example validator

  
    ##Transform
    transform = tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        ##module_file='pipeline/preprocessing.py'
        preprocessing_fn=preprocessing_fn
    )

    components.append(transform)
    
    if enable_tuning:
        tuner = tfx.components.Tuner(
            ##module_file=TRAINER_MODULE,
            tuner_fn=tuner_fn,
            examples=transform.outputs['transformed_examples'],
            transform_graph=transform.outputs['transform_graph'],
            train_args=train_args,
            eval_args=eval_args
        )
        components.append(tuner)
        
    ##if tuner is not enabled = we already got  best parameters
    ## import best parameters from the previous session
    
    ##TODO - figure out if there is a way to import the best parameters from the MD store
    if not enable_tuning:
        hparams_importer = tfx.v1.dsl.Importer(
            source_uri = './tfx_pipeline_output/diamonds-pipeline/Tuner/best_hyperparameters/6',
            artifact_type = tfx.v1.types.standard_artifacts.HyperParameters).with_id('import_hparams')
        components.append(hparams_importer)

    ##Trainer
    trainer = tfx.components.Trainer(
        ##module_file=TRAINER_MODULE,
        run_fn=run_fn,
        examples=transform.outputs['transformed_examples'],
        ##transformed_examples=transform.outputs['transformed_examples'],
        schema=schema_gen.outputs['schema'],
        transform_graph=transform.outputs['transform_graph'],
        hyperparameters=(tuner.outputs['best_hyperparameters'] if enable_tuning else hparams_importer.outputs['result']),  
        ##train_args=trainer_pb2.TrainArgs(num_steps=3000),
        ##eval_args=trainer_pb2.EvalArgs(num_steps=1000)
        train_args=train_args,
        eval_args=eval_args
    )

    components.append(trainer)

    ##add more components as needed
  
    ##Evaluator
    ## evaluates the new model
    ##validates it against a base model if it is good enough to be pushed
 
    metrics_specs= tfma.MetricsSpec(
                metrics=[
                    tfma.MetricConfig(class_name='ExampleCount'),
                    tfma.MetricConfig(class_name='MeanSquaredError',
                                     threshold=tfma.MetricThreshold(
                                         change_threshold=tfma.GenericChangeThreshold(
                                         absolute={'value': -1e-10}, direction=tfma.MetricDirection.LOWER_IS_BETTER)))
                ],
    ##you can add threshold map for metrics used in the model training
        thresholds = {
            'root_mean_squared_error': tfma.MetricThreshold(
                value_threshold=tfma.GenericValueThreshold(),
                    ##you dont have to set an upper bound - will default to infinity
                    ##upper_bound={'value': 2000}),
                change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.LOWER_IS_BETTER,
                        absolute={'value': 1e-10}
                )
            )
        }

    )


    eval_config = tfma.EvalConfig(
        model_specs=[
        # This assumes a serving model with signature 'serving_default'. If
        # using estimator based EvalSavedModel, add signature_name: 'eval' and
        # remove the label_key.
            tfma.ModelSpec(
                signature_name='serving_default',
                label_key='price'
            ##preprocessing_function_names=['transform_features'],
                )
        ],
        metrics_specs=[metrics_specs],
        slicing_specs=[
            ##this is for the whole dataset, no slices
            tfma.SlicingSpec(),
            ##also specifying slices for some features 
            tfma.SlicingSpec(feature_keys=['cut']),
            tfma.SlicingSpec(feature_keys=['color']),
        ]
    )
    
    ##resolve the previous blessed model
    model_resolver=tfx.v1.dsl.Resolver(
        strategy_class=tfx.v1.dsl.experimental.LatestBlessedModelStrategy,
        model=tfx.v1.dsl.Channel(type=tfx.types.standard_artifacts.Model),
        model_blessing=tfx.v1.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing)).with_id('latest_blessed_model_resolver')

    components.append(model_resolver)

    model_analyzer = tfx.components.Evaluator(
        examples = example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )
    
    components.append(model_analyzer)
    
    ##Pusher
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        model_blessing=model_analyzer.outputs['blessing'],
        push_destination=tfx.v1.proto.PushDestination(
            filesystem=tfx.v1.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir)
        )
    )
    
    components.append(pusher)

    return tfx.v1.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      metadata_connection_config=metadata_connection_config,
      beam_pipeline_args=beam_pipeline_args,
    )





Overwriting pipeline/pipeline.py


In [46]:
%%writefile local_runner.py

"""Define LocalDagRunner to run the pipeline locally."""

import os
from absl import logging

from tfx import v1 as tfx
from pipeline import configs
from pipeline import pipeline

OUTPUT_DIR='.'

PIPELINE_BUCKET = 'gs://diamonds_pipeline'

PIPELINE_ROOT = os.path.join(OUTPUT_DIR, 'tfx_pipeline_output',
                             configs.PIPELINE_NAME)
METADATA_PATH = os.path.join(OUTPUT_DIR, 'tfx_metadata', configs.PIPELINE_NAME,
                             'metadata.db')
SERVING_MODEL_DIR = os.path.join(PIPELINE_BUCKET, 'serving_model')


DATA_PATH = 'gs://diamonds_data'

if not DATA_PATH:
    DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')

def run():
  """Define a pipeline."""

  ##debug code
  print("serving model dir: ", SERVING_MODEL_DIR)
  print("metadata path: ", METADATA_PATH)

  tfx.orchestration.LocalDagRunner().run(
      pipeline.create_pipeline(
          pipeline_name=configs.PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          data_path=DATA_PATH,
          # NOTE: Use `query` instead of `data_path` to use BigQueryExampleGen.
          # query=configs.BIG_QUERY_QUERY,
          # NOTE: Set the path of the customized schema if any.
          # schema_path=generated_schema_path,
          preprocessing_fn=configs.PREPROCESSING_FN,
          run_fn=configs.RUN_FN,
          tuner_fn=configs.TUNER_FN,
          enable_tuning=False,
          train_args=tfx.proto.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
          eval_args=tfx.proto.EvalArgs(num_steps=configs.EVAL_NUM_STEPS),
          eval_accuracy_threshold=configs.EVAL_ACCURACY_THRESHOLD,
          serving_model_dir=SERVING_MODEL_DIR,
          # NOTE: Provide GCP configs to use BigQuery with Beam DirectRunner.
          # beam_pipeline_args=configs.
          # BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH))
      )
  
if __name__ == '__main__':
  logging.set_verbosity(logging.INFO)
  run()

Overwriting local_runner.py


In [47]:
!tfx pipeline compile --pipeline_path local_runner.py

CLI
Compiling pipeline
Detected Local.
Use --engine flag if you intend to use a different orchestrator.
serving model dir:  gs://diamonds_pipeline/serving_model
metadata path:  ./tfx_metadata/diamonds-pipeline/metadata.db
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline compiled successfully.


In [7]:
!tfx pipeline create --pipeline_path local_runner.py

CLI
Creating pipeline
Detected Local.
Use --engine flag if you intend to use a different orchestrator.
serving model dir:  gs://diamonds_pipeline/serving_model/1
metadata path:  ./tfx_metadata/diamonds-pipeline/metadata.db
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "diamonds-pipeline" already exists.


In [48]:
!tfx pipeline update --pipeline_path local_runner.py

CLI
Updating pipeline
Detected Local.
Use --engine flag if you intend to use a different orchestrator.
serving model dir:  gs://diamonds_pipeline/serving_model
metadata path:  ./tfx_metadata/diamonds-pipeline/metadata.db
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "diamonds-pipeline" updated successfully.


In [49]:
!tfx run create --pipeline_name diamonds-pipeline

CLI
Creating a run for pipeline: diamonds-pipeline
Detected Local.
Use --engine flag if you intend to use a different orchestrator.
serving model dir:  gs://diamonds_pipeline/serving_model
metadata path:  ./tfx_metadata/diamonds-pipeline/metadata.db
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Evaluator"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.evaluator.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Pusher"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.pusher.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  