# wit
### Exploring using the What-If Tool with TFX

This repo contains a notebook which uses MLMD payloads from the Chicago Taxi
pipeline example, which are created in the TFX developer tutorial.  Since the MLMD artifact URIs are absolute paths, I skip the query and hard code the URIs to these local files.

In [2]:
from __future__ import print_function

import os
import tensorflow as tf
import tfx_utils

tf.enable_eager_execution()

# This code is kept for reference, but commented out since we aren't doing the query
# def _make_default_sqlite_uri(pipeline_name):
#     return os.path.join(os.environ['HOME'], 'airflow/tfx/metadata', pipeline_name, 'metadata.db')

# def get_metadata_store(pipeline_name):
#     return tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(_make_default_sqlite_uri(pipeline_name))

# pipeline_name = 'taxi_solution' # or taxi_solution
# pipeline_db_path = _make_default_sqlite_uri(pipeline_name)
# print('Pipeline DB:\n{}'.format(pipeline_db_path))

# store = get_metadata_store(pipeline_name)

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


Get the SavedModel:

In [4]:
# This code is kept for reference, but commented out since we aren't doing the query
# from os import listdir
# models = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.MODEL)
# modelroot = models.URI.iloc[0] + '/serving_model_dir/export/chicago-taxi/'
# newest = str(sorted([int(f) for f in listdir(modelroot) if f.isdigit()])[-1])
# modeldir = os.path.join(modelroot, newest)

modeldir = '1560181362' # local copy

Prepare the feature columns for the Estimator

In [5]:
# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

_CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 10

_BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

_VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10

# def _transformed_name(key):
#     return key + '_xf'


# def _transformed_names(keys):
#     return [_transformed_name(key) for key in keys]

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
    default_value = '' if x.dtype == tf.string else 0
    return tf.squeeze(
        tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value), axis=1)

real_valued_columns = [
    tf.feature_column.numeric_column(key, shape=())
    for key in _DENSE_FLOAT_FEATURE_KEYS
#     for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
]

categorical_columns = [
    tf.feature_column.categorical_column_with_identity(key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
    for key in _VOCAB_FEATURE_KEYS
#     for key in _transformed_names(_VOCAB_FEATURE_KEYS)
]

categorical_columns += [
    tf.feature_column.categorical_column_with_identity(
        key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
    for key in _BUCKET_FEATURE_KEYS
#     for key in _transformed_names(_BUCKET_FEATURE_KEYS)
]

categorical_columns += [
    tf.feature_column.categorical_column_with_identity(  # pylint: disable=g-complex-comprehension
        key, num_buckets=num_buckets, default_value=0) for key, num_buckets in zip(
        _CATEGORICAL_FEATURE_KEYS, _MAX_CATEGORICAL_FEATURE_VALUES)
#         _transformed_names(_CATEGORICAL_FEATURE_KEYS), _MAX_CATEGORICAL_FEATURE_VALUES)
]

Instantiate the trained Estimator from the SavedModel

In [7]:
# Number of nodes in the first layer of the DNN
first_dnn_layer_size = 100
num_dnn_layers = 4
dnn_decay_factor = 0.7

hidden_units=[
    max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
    for i in range(num_dnn_layers)
]

model = tf.estimator.DNNLinearCombinedClassifier(
    linear_feature_columns=categorical_columns,
    dnn_feature_columns=real_valued_columns,
    dnn_hidden_units=hidden_units,
    warm_start_from=modeldir)
    
print('model is a ({})'.format(type(model)))

INFO:tensorflow:Using default config.


I0613 19:48:02.393507 4779169216 estimator.py:1739] Using default config.




W0613 19:48:02.402549 4779169216 estimator.py:1760] Using temporary folder as model directory: /var/folders/20/9s3ttk5x4g97nqj3mg9wmh_r00k8nb/T/tmpaku51ltv


INFO:tensorflow:Using config: {'_model_dir': '/var/folders/20/9s3ttk5x4g97nqj3mg9wmh_r00k8nb/T/tmpaku51ltv', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x13c282a58>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


I0613 19:48:02.405421 4779169216 estimator.py:201] Using config: {'_model_dir': '/var/folders/20/9s3ttk5x4g97nqj3mg9wmh_r00k8nb/T/tmpaku51ltv', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x13c282a58>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


INFO:tensorflow:Warm-starting from a SavedModel


I0613 19:48:02.407586 4779169216 estimator.py:2292] Warm-starting from a SavedModel


model is a (<class 'tensorflow_estimator.python.estimator.canned.dnn_linear_combined.DNNLinearCombinedClassifier'>)


In [8]:
# This code is kept for reference, but commented out since we aren't doing the query
# schema_uri = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA).iloc[0].URI + 'schema.pbtxt'

schema_uri = 'schema.pbtxt'

In [9]:
# This code is kept for reference, but commented out since we aren't doing the query
# store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)

Make a feature_spec from the schema, and convert tips to an int

In [12]:
import tensorflow_transform as tft
from tfx.utils import io_utils
from tensorflow_metadata.proto.v0 import schema_pb2

schema_utils = tft.tf_metadata.schema_utils
schema_proto = io_utils.parse_pbtxt_file(file_name=schema_uri, message=schema_pb2.Schema())
feature_spec, domains = schema_utils.schema_as_feature_spec(schema_proto)

print('Before: tips is a {}'.format(feature_spec['tips'].dtype))
tips = feature_spec.pop('tips')
feature_spec['tips'] = tf.VarLenFeature(dtype=tf.int64)
print('After: tips is a {}'.format(feature_spec['tips'].dtype))

Before: tips is a <dtype: 'float32'>
After: tips is a <dtype: 'int64'>


Extract the tf.Examples from the tf.Records file, and set tips to be a binary classification label

In [16]:
# This code is kept for reference, but commented out since we aren't doing the query
# examples = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.EXAMPLES)
# for i in range(len(examples.URI)):
#     print(examples.URI.iloc[i])
#     !ls {examples.URI.iloc[i]}
# eval_examples = examples.URI.iloc[1] + listdir(examples.URI.iloc[1])[0]

eval_examples = 'data_tfrecord-00000-of-00001.gz'
print('eval_examples: {}'.format(eval_examples))

raw_dataset = tf.data.TFRecordDataset([eval_examples], compression_type='GZIP')
print('raw_dataset: ({}) {}'.format(type(raw_dataset), raw_dataset))

parsed_examples = []
for ex in raw_dataset:
    ex2 = tf.train.Example.FromString(ex.numpy())
    tip = ex2.features.feature.pop('tips')
    big_tipper = True if tip.float_list.value[0] > 0.2 else False
    ex2.features.feature['tips'].int64_list.value.append(big_tipper)
    parsed_examples.append(ex2)
print('parsed_examples[0]: ({}) {}'.format(type(parsed_examples[0]), parsed_examples[0]))

eval_examples: data_tfrecord-00000-of-00001.gz
raw_dataset: (<class 'tensorflow.python.data.ops.readers.TFRecordDatasetV1'>) <TFRecordDatasetV1 shapes: (), types: tf.string>
parsed_examples[0]: (<class 'tensorflow.core.example.example_pb2.Example'>) features {
  feature {
    key: "company"
    value {
      bytes_list {
        value: "Taxi Affiliation Services"
      }
    }
  }
  feature {
    key: "dropoff_census_tract"
    value {
    }
  }
  feature {
    key: "dropoff_community_area"
    value {
    }
  }
  feature {
    key: "dropoff_latitude"
    value {
    }
  }
  feature {
    key: "dropoff_longitude"
    value {
    }
  }
  feature {
    key: "fare"
    value {
      float_list {
        value: 27.049999237060547
      }
    }
  }
  feature {
    key: "payment_type"
    value {
      bytes_list {
        value: "Cash"
      }
    }
  }
  feature {
    key: "pickup_census_tract"
    value {
    }
  }
  feature {
    key: "pickup_community_area"
    value {
      int64_list 

Now analyze the model performance:

In [17]:
# !pip install --upgrade witwidget

from witwidget.notebook.visualization import WitConfigBuilder
from witwidget.notebook.visualization import WitWidget

In [18]:
tool_height_in_px = 1000

# Setup the tool with the test examples and the trained classifier
config_builder = WitConfigBuilder(parsed_examples).set_estimator_and_feature_spec(
    model, feature_spec).set_label_vocab(['good_tipper', 'bad_tipper'])
WitWidget(config_builder, height=tool_height_in_px)

WitWidget(config={'model_type': 'classification', 'label_vocab': ['good_tipper', 'bad_tipper'], 'are_sequence_…



W0613 20:06:36.914067 4779169216 estimator.py:974] Input graph does not use tf.data.Dataset or contain a QueueRunner. That means predict yields forever. This is probably a mistake.


ValueError: The corresponding Tensor of numerical column must be a Tensor. SparseTensor is not supported. key: fare