In [None]:
# upgrade pip
try:
  import colab
  !pip install --upgrade pip
except:
  pass

In [None]:
# install TFX
!pip install -q -U --use-feature=2020-resolver tfx

In [None]:
# restart notebook

# install libraries
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_modelresolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input


%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
# check the library versions
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

In [None]:
# set pipeline paths 

# root package
tfxroot = tfx.__path__[0]

# taxi example 
taxiroot = os.path.join(tfxroot, 'examples/chicago_taxi_pipeline')

# path where model will be serving 
servingmodeldir = os.path.join(
    tempfile.mkdtemp(), 'serving_model/taxi_simple')

# logging 
absl.logging.set_verbosity(absl.logging.INFO)

In [None]:
# download dataset
dataroot = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
datafilepath = os.path.join(dataroot, "data.csv")
urllib.request.urlretrieve(DATA_PATH, datafilepath)

In [None]:
# check csv file 
!head {datafilepath}

In [None]:
# setup interactivecontext
context = InteractiveContext()

In [None]:
# setup ExampleGen
examplegen = CsvExampleGen(input=external_input(dataroot))
context.run(examplegen)

In [None]:
# check the artifacts 
artifact = examplegen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

In [None]:
# URI of the output artifact having training examples
trainuri = os.path.join(examplegen.outputs['examples'].get()[0].uri, 'train')

# get the files 
tfrecordfilenames = [os.path.join(trainuri, name)
                      for name in os.listdir(trainuri)]

# setup TFRecordDataset
dataset = tf.data.TFRecordDataset(tfrecordfilenames, compression_type="GZIP")

# loop over 3 examples 
for tfrecord in dataset.take(3):
  serializedexample = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serializedexample)
  pp.pprint(example)

In [None]:
# setup StatisticsGen
statisticsgen = StatisticsGen(
    examples=examplegen.outputs['examples'])
context.run(statisticsgen)

In [None]:
# visualise the output statistics 
context.show(statisticsgen.outputs['statistics'])

In [None]:
# setup SchemaGen 
schemagen = SchemaGen(
    statistics=statisticsgen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schemagen)

In [None]:
# visualise generated schema as a table 
context.show(schemagen.outputs['schema'])

In [None]:
# setup ExampleValidator
examplevalidator = ExampleValidator(
    statistics=statisticsgen.outputs['statistics'],
    schema=schemagen.outputs['schema'])
context.run(examplevalidator)

In [None]:
# visualise anomalies as a table 
context.show(examplevalidator.outputs['anomalies'])

In [None]:
taxiconstantsfile = 'taxi_constants.py'

In [None]:
%%writefile {taxiconstantsfile}

# max value of categorical features 
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

# feature keys 
CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# number of buckets for encoding 
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# number of vocabulary terms 
VOCAB_SIZE = 1000

# count of out of vocabulary buckets 
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

In [None]:
_taxi_transform_module_file = 'taxi_transform.py'

In [None]:
# write function to take raw data as input and return transformed features
%%writefile {_taxi_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

import taxi_constants

_DENSE_FEATURE_KEYS = taxi_constants.DENSE_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name


def preprocessing_fn(inputs):
 
  outputs = {}
  for key in _DENSE_FEATURE_KEYS:
    # dense float set nan's to the mean.
    outputs[_transformed_name(key)] = tft.scale_to_z_score(
        fillinmissing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # vocabulary for feature.
    outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        fillinmissing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.bucketize(
        fillinmissing(inputs[key]), _FEATURE_BUCKET_COUNT)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = fillinmissing(inputs[key])

  # did the customer give big tip
  taxi_fare = fillinmissing(inputs[_FARE_KEY])
  tips = fillinmissing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # tip >20% fare 
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs


def fillinmissing(x):
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

In [None]:
# transform component 
transform = Transform(
    examples=examplegen.outputs['examples'],
    schema=schemagen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform)

In [None]:
# transform artifacts 
transform.outputs

In [None]:
# transform graph artifact 
trainuri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(trainuri)

In [None]:
# Get the URI of the output artifact representing the transformed examples
trainuri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')

# list of files 
tfrecordfilenames = [os.path.join(trainuri, name)
                      for name in os.listdir(trainuri)]

# Create TFRecordDataset
dataset = tf.data.TFRecordDataset(tfrecordfilenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serializedexample = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serializedexample)
  pp.pprint(example)

In [None]:
taxitrainermodulefile = 'taxi_trainer.py'

In [None]:
%%writefile {taxitrainermodulefile}

from typing import List, Text

import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options

import taxi_constants

_DENSE_FEATURE_KEYS = taxi_constants.DENSE_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = taxi_constants.MAX_CATEGORICAL_FEATURE_VALUES
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name


def transformednames(keys):
  return [_transformed_name(key) for key in keys]


def getservetfexamplesfn(model, tf_transform_output):

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    transformed_features = model.tft_layer(parsed_features)
    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text],
              data_accessor: DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  
  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)),
      tf_transform_output.transformed_metadata.schema)


def buildkerasmodel(hidden_units: List[int] = None) -> tf.keras.Model:

  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in transformednames(_DENSE_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in transformednames(_VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in transformednames(_BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              transformednames(_CATEGORICAL_FEATURE_KEYS),
              _MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model = widedeepclassifier(
      # TODO(b/139668410) replace with premade wide_and_deep keras model
      wide_columns=indicator_column,
      deep_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model


def widedeepclassifier(wide_columns, deep_columns, dnn_hidden_units):

 
  input_layers = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in transformednames(_DENSE_FEATURE_KEYS)
  }
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in transformednames(_VOCAB_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in transformednames(_BUCKET_FEATURE_KEYS)
  })
  input_layers.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in transformednames(_CATEGORICAL_FEATURE_KEYS)
  })

  deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
  for numnodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(numnodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model.summary(print_fn=absl.logging.info)
  return model


def run_fn(fn_args: TrainerFnArgs):

  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, 
                            tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, 
                           tf_transform_output, 40)

  model = buildkerasmodel(
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ])

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_args.model_run_dir, update_freq='batch')
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          getservetfexamplesfn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_modeldir, save_format='tf', signatures=signatures)

In [None]:
# run the trainer component 
trainer = Trainer(
    module_file=os.path.abspath(taxitrainermodulefile),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schemagen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

In [None]:
# analyse training with tensorboard 
modelartifactdir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(modelartifactdir))
modeldir = os.path.join(modelartifactdir, 'serving_modeldir')
pp.pprint(os.listdir(modeldir))

In [None]:
# connect to tensorboard
modelrunartifactdir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {modelrunartifactdir}

In [None]:
# evaluator component 
evalconfig = tfma.EvalConfig(
    model_specs=[
   
        tfma.ModelSpec(label_key='tips')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
    
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.5}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-10})))
            ]
        )
    ],
    slicing_specs=[
        tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

In [None]:
# run the evaluator and run it 
modelresolver = ResolverNode(
      instance_name='latest_blessed_modelresolver',
      resolver_class=latest_blessed_modelresolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(modelresolver)

evaluator = Evaluator(
    examples=examplegen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=modelresolver.outputs['model'],
    evalconfig=evalconfig)
context.run(evaluator)

In [None]:
# examine the output artifacts 
evaluator.outputs

In [None]:
# visualise the global metrics 
context.show(evaluator.outputs['evaluation'])

In [None]:
# tensorflow model analysis 
import tensorflow_model_analysis as tfma

# Get the TFMA output result path and load the result.
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
tfmaresult = tfma.load_eval_result(PATH_TO_RESULT)

# Show data sliced along feature column trip_start_hour.
tfma.view.render_slicing_metrics(
    tfmaresult, slicing_column='trip_start_hour')

In [None]:
blessinguri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessinguri}

In [None]:
# load validation result record
PATH_TO_RESULT = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(PATH_TO_RESULT))

In [None]:
# define pusher component 
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=servingmodeldir)))
context.run(pusher)

In [None]:
# artifacts of pusher component
pusher.outputs

In [None]:
pushuri = pusher.outputs.model_push.get()[0].uri
model = tf.saved_model.load(pushuri)

for item in model.signatures.items():
  pp.pprint(item)