## Taxi Cab Classification

This notebook presents a simplified version of Kubeflow's *taxi cab clasification* pipeline, built upon TFX components.

Here all the pipeline components are stripped down to their core to showcase how to run it in a self-contained local Juyter Noteobok.

Additionally, the pipeline has been upgraded to work with Python3 and all major libraries (Tensorflow, Tensorflow Transform, Tensorflow Model Analysis, Tensorflow Data Validation, Apache Beam) have been bumped to their latests versions.

In [2]:
import os
import shutil
import logging
import apache_beam as beam
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
import tensorflow_data_validation as tfdv

from apache_beam.io import textio
from apache_beam.io import tfrecordio

from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.coders.csv_coder import CsvCoder
from tensorflow_transform.coders.example_proto_coder import ExampleProtoCoder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import metadata_io

In [2]:
DATA_DIR = 'data/'
TRAIN_DATA = os.path.join(DATA_DIR, 'taxi-cab-classification/train.csv')
EVALUATION_DATA = os.path.join(DATA_DIR, 'taxi-cab-classification/eval.csv')

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
CATEGORICAL_FEATURE_KEYS = ['trip_start_hour', 'trip_start_day', 'trip_start_month']

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = ['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude']

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = ['pickup_census_tract', 'dropoff_census_tract', 'payment_type', 'company',
    'pickup_community_area', 'dropoff_community_area']

# allow nan values in these features.
OPTIONAL_FEATURES = ['dropoff_latitude', 'dropoff_longitude', 'pickup_census_tract', 'dropoff_census_tract',
    'company', 'trip_seconds', 'dropoff_community_area']

LABEL_KEY = 'tips'
FARE_KEY = 'fare'

  'Some syntactic constructs of Python 3 are not yet fully supported by '


In [None]:
# training parameters
EPOCHS = 1
STEPS = 3
BATCH_SIZE = 32
HIDDEN_LAYER_SIZE = '1500'
LEARNING_RATE = 0.1

In [3]:
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
# tf.get_logger().setLevel(logging.ERROR)

#### Data Validation

For an overview of the TFDV functions: https://www.tensorflow.org/tfx/tutorials/data_validation/chicago_taxi

In [4]:
vldn_output = os.path.join(DATA_DIR, 'validation')

# TODO: Understand why this was used in the conversion to the output json
# key columns: list of the names for columns that should be treated as unique keys.
key_columns = ['trip_start_timestamp']

# read the first line of the cvs to have and ordered list of column names 
# (the Schema will scrable the features)
with open(TRAIN_DATA) as f:
    column_names = f.readline().strip().split(',')

stats = tfdv.generate_statistics_from_csv(data_location=TRAIN_DATA)
schema = tfdv.infer_schema(stats)

eval_stats = tfdv.generate_statistics_from_csv(data_location=EVALUATION_DATA)
anomalies = tfdv.validate_statistics(eval_stats, schema)

# Log anomalies
for feature_name, anomaly_info in anomalies.anomaly_info.items():
    logging.getLogger().error(
        'Anomaly in feature "{}": {}'.format(
            feature_name, anomaly_info.description))
    
# show inferred schema
tfdv.display_schema(schema=schema)

W1107 15:33:09.759347 140276907915072 deprecation_wrapper.py:119] From /home/jovyan/.local/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_gen_lib.py:165: The name tf.gfile.Exists is deprecated. Please use tf.io.gfile.exists instead.

W1107 15:33:09.767974 140276907915072 deprecation_wrapper.py:119] From /home/jovyan/.local/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_gen_lib.py:321: The name tf.gfile.Glob is deprecated. Please use tf.io.gfile.glob instead.

W1107 15:33:09.790357 140276907915072 deprecation_wrapper.py:119] From /home/jovyan/.local/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_gen_lib.py:327: The name tf.gfile.GFile is deprecated. Please use tf.io.gfile.GFile instead.

W1107 15:33:17.073589 140274294482688 tfrecordio.py:57] Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
W1107 15:33:17.253429 140276907915072 deprecation.py:323] From /home/jovyan

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'trip_start_hour',INT,required,,-
'trip_miles',FLOAT,required,,-
'dropoff_census_tract',FLOAT,optional,single,-
'trip_seconds',FLOAT,optional,single,-
'pickup_community_area',INT,required,,-
'tips',FLOAT,required,,-
'payment_type',STRING,required,,'payment_type'
'trip_start_timestamp',INT,required,,-
'dropoff_latitude',FLOAT,optional,single,-
'trip_start_month',INT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'payment_type',"'Cash', 'Credit Card', 'Dispute', 'No Charge', 'Pcard', 'Unknown'"
'company',"'0118 - 42111 Godfrey S.Awir', '0694 - 59280 Chinesco Trans Inc', '1085 - 72312 N and W Cab Co', '2733 - 74600 Benny Jona', '2809 - 95474 C & D Cab Co Inc.', '3011 - 66308 JBL Cab Inc.', '3152 - 97284 Crystal Abernathy', '3201 - C&D Cab Co Inc', '3201 - CID Cab Co Inc', '3253 - 91138 Gaither Cab Co.', '3385 - 23210 Eman Cab', '3623 - 72222 Arrington Enterprises', '3897 - Ilie Malec', '4053 - Adwar H. Nikola', '4197 - 41842 Royal Star', '4615 - 83503 Tyrone Henderson', '4615 - Tyrone Henderson', '4623 - Jay Kim', '5006 - 39261 Salifu Bawa', '5006 - Salifu Bawa', '5074 - 54002 Ahzmi Inc', '5074 - Ahzmi Inc', '5129 - 87128', '5129 - 98755 Mengisti Taxi', '5129 - Mengisti Taxi', '5724 - KYVI Cab Inc', '585 - Valley Cab Co', '5864 - 73614 Thomas Owusu', '5864 - Thomas Owusu', '5874 - 73628 Sergey Cab Corp.', '5997 - 65283 AW Services Inc.', '5997 - AW Services Inc.', '6488 - 83287 Zuha Taxi', '6743 - Luhak Corp', 'Blue Ribbon Taxi Association Inc.', 'C & D Cab Co Inc', 'Chicago Elite Cab Corp.', 'Chicago Elite Cab Corp. (Chicago Carriag', 'Chicago Medallion Leasing INC', 'Chicago Medallion Management', 'Choice Taxi Association', 'Dispatch Taxi Affiliation', 'KOAM Taxi Association', 'Northwest Management LLC', 'Taxi Affiliation Services', 'Top Cab Affiliation'"


In [5]:
# Resolve anomalies
company = tfdv.get_feature(schema, 'company')
company.distribution_constraints.min_domain_mass = 0.9

# Add new value to the domain of feature payment_type.
payment_type_domain = tfdv.get_domain(schema, 'payment_type')
payment_type_domain.value.append('Prcard')

# Validate eval stats after updating the schema 
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

#### Data Transformation

For an overview of the TFT functions: https://www.tensorflow.org/tfx/tutorials/transform/simple

In [6]:
def to_dense(tensor):
    """Takes as input a SparseTensor and return a Tensor with correct default value
    Args:
      tensor: tf.SparseTensor
    Returns:
      tf.Tensor with default value
    """
    if not isinstance(tensor, tf.sparse.SparseTensor):
        return tensor
    if tensor.dtype == tf.string:
        default_value = ''
    elif tensor.dtype == tf.float32:
        default_value = 0.0
    elif tensor.dtype == tf.int32:
        default_value = 0
    else:
        raise ValueError(f"Tensor type not recognized: {tensor.dtype}")

    return tf.squeeze(tf.sparse_to_dense(tensor.indices,
                               [tensor.dense_shape[0], 1],
                               tensor.values, default_value=default_value), axis=1)
    # TODO: Update to below version
    # return tf.squeeze(tf.sparse.to_dense(tensor, default_value=default_value), axis=1)


def preprocess_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
      inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
      Map from string feature key to transformed feature operations.
    """
    outputs = {}
    for key in DENSE_FLOAT_FEATURE_KEYS:
        # Preserve this feature as a dense float, setting nan's to the mean.
        outputs[key] = tft.scale_to_z_score(to_dense(inputs[key]))

    for key in VOCAB_FEATURE_KEYS:
        # Build a vocabulary for this feature.
        if inputs[key].dtype == tf.string:
            vocab_tensor = to_dense(inputs[key])
        else:
            vocab_tensor = tf.as_string(to_dense(inputs[key]))
        outputs[key] = tft.compute_and_apply_vocabulary(
            vocab_tensor, vocab_filename='vocab_' + key,
            top_k=VOCAB_SIZE, num_oov_buckets=OOV_SIZE)

    for key in BUCKET_FEATURE_KEYS:
        outputs[key] = tft.bucketize(to_dense(inputs[key]), FEATURE_BUCKET_COUNT)

    for key in CATEGORICAL_FEATURE_KEYS:
        outputs[key] = tf.cast(to_dense(inputs[key]), tf.int64)

    taxi_fare = to_dense(inputs[FARE_KEY])
    taxi_tip = to_dense(inputs[LABEL_KEY])
    # Test if the tip was > 20% of the fare.
    tip_threshold = tf.multiply(taxi_fare, tf.constant(0.2))
    outputs[LABEL_KEY] = tf.logical_and(
        tf.logical_not(tf.math.is_nan(taxi_fare)),
        tf.greater(taxi_tip, tip_threshold))

    for key in outputs:
        if outputs[key].dtype == tf.bool:
            outputs[key] = tft.compute_and_apply_vocabulary(tf.as_string(outputs[key]),
                                             vocab_filename='vocab_' + key)
    
    return outputs

In [7]:
trns_output = os.path.join(DATA_DIR, "transformed")
if os.path.exists(trns_output):
    shutil.rmtree(trns_output)

tft_input_metadata = dataset_metadata.DatasetMetadata(schema)

runner = 'DirectRunner'
with beam.Pipeline(runner, options=None) as p:
    with beam_impl.Context(temp_dir=os.path.join(trns_output, 'tmp')):
        converter = CsvCoder(column_names, tft_input_metadata.schema)

        # READ TRAIN DATA
        train_data = (
                p
                | 'ReadTrainData' >> textio.ReadFromText(TRAIN_DATA, skip_header_lines=1)
                | 'DecodeTrainData' >> beam.Map(converter.decode))

        # TRANSFORM TRAIN DATA (and get transform_fn function)
        transformed_dataset, transform_fn = (
                (train_data, tft_input_metadata) | beam_impl.AnalyzeAndTransformDataset(preprocess_fn))
        transformed_data, transformed_metadata = transformed_dataset

        # SAVE TRANSFORMED TRAIN DATA
        _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
            os.path.join(trns_output, 'train'),
            coder=ExampleProtoCoder(transformed_metadata.schema))

        # READ EVAL DATA
        eval_data = (
                p
                | 'ReadEvalData' >> textio.ReadFromText(EVALUATION_DATA, skip_header_lines=1)
                | 'DecodeEvalData' >> beam.Map(converter.decode))

        # TRANSFORM EVAL DATA (using previously created transform_fn function)
        eval_dataset = (eval_data, tft_input_metadata)
        transformed_eval_data, transformed_metadata = (
            (eval_dataset, transform_fn) | beam_impl.TransformDataset())

        # SAVE EVAL DATA
        _ = transformed_eval_data | 'WriteEvalData' >> tfrecordio.WriteToTFRecord(
            os.path.join(trns_output, 'eval'),
            coder=ExampleProtoCoder(transformed_metadata.schema))

        # SAVE transform_fn FUNCTION FOR LATER USE
        # TODO: check out what is the transform function (transform_fn) that came from previous step
        _ = (transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(trns_output))

        # SAVE TRANSFORMED METADATA
        metadata_io.write_metadata(
            metadata=tft_input_metadata,
            path=os.path.join(trns_output, 'metadata'))

W1107 15:33:45.261063 140276907915072 deprecation.py:323] From /home/jovyan/.local/lib/python3.6/site-packages/tensorflow_transform/mappers.py:349: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
W1107 15:33:45.289230 140276907915072 deprecation.py:323] From <ipython-input-6-d55a3ea73154>:21: sparse_to_dense (from tensorflow.python.ops.sparse_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Create a `tf.sparse.SparseTensor` and use `tf.sparse.to_dense` instead.
W1107 15:33:45.652244 140276907915072 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/saved_model/signature_def_utils_impl.py:201: build_tensor_info (from tensorflow.python.saved_model.utils_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This 

#### Train

Estimator API: https://www.tensorflow.org/guide/premade_estimators

In [8]:
def training_input_fn(transformed_output, transformed_examples, batch_size, target_name):
    """
    Args:
      transformed_output: tft.TFTransformOutput
      transformed_examples: Base filename of examples
      batch_size: Batch size.
      target_name: name of the target column.
    Returns:
      The input function for training or eval.
    """
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=transformed_examples,
        batch_size=batch_size,
        features=transformed_output.transformed_feature_spec(),
        reader=tf.data.TFRecordDataset,
        shuffle=True)
    transformed_features = dataset.make_one_shot_iterator().get_next()
    transformed_labels = transformed_features.pop(target_name)
    return transformed_features, transformed_labels


def get_feature_columns():
    """Callback that returns a list of feature columns for building a tf.estimator.
    Returns:
      A list of tf.feature_column.
    """
    return (
            [tf.feature_column.numeric_column(key, shape=()) for key in DENSE_FLOAT_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=VOCAB_SIZE + OOV_SIZE)) for key in VOCAB_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)) for key in BUCKET_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=num_buckets, default_value=0)) for key, num_buckets in zip(CATEGORICAL_FEATURE_KEYS, MAX_CATEGORICAL_FEATURE_VALUES)]
    )

In [9]:
training_output = os.path.join(DATA_DIR, "training")
if os.path.exists(training_output):
    shutil.rmtree(training_output)

hidden_layer_size = [int(x.strip()) for x in HIDDEN_LAYER_SIZE.split(',')]

tf_transform_output = tft.TFTransformOutput(trns_output)

# Set how often to run checkpointing in terms of steps.
config = tf.estimator.RunConfig(save_checkpoints_steps=1000)
n_classes = tf_transform_output.vocabulary_size_by_name("vocab_" + LABEL_KEY)
# Create estimator
estimator =  tf.estimator.DNNClassifier(
                feature_columns=get_feature_columns(),
                hidden_units=hidden_layer_size,
                n_classes=n_classes,
                config=config,
                model_dir=training_output)

# TODO: Simplify all this: https://www.tensorflow.org/guide/premade_estimators

I1107 15:34:33.098996 140276907915072 estimator.py:209] Using config: {'_model_dir': 'data/training', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': 1000, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f941e831828>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [10]:
estimator.train(input_fn=lambda: training_input_fn(
                                    tf_transform_output, 
                                    os.path.join(trns_output, 'train' + '*'),
                                    int(BATCH_SIZE), 
                                    "tips"), 
                steps=int(STEPS))

W1107 15:38:37.536054 140276907915072 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
W1107 15:38:37.555328 140276907915072 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/data/experimental/ops/readers.py:835: parallel_interleave (from tensorflow.python.data.experimental.ops.interleave_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.interleave(map_func, cycle_length, block_length, num_parallel_calls=tf.data.experimental.AUTOTUNE)` instead. If sloppy execution is desired, use `tf.data.Options.experimental_determinstic`.
W1107 15:38:37.579628 140276907915072 deprec

<tensorflow_estimator.python.estimator.canned.dnn.DNNClassifier at 0x7f941e825b70>

In [11]:
eval_result = estimator.evaluate(input_fn=lambda: training_input_fn(
                                                    tf_transform_output, 
                                                    os.path.join(trns_output, 'eval' + '*'),
                                                    int(BATCH_SIZE), 
                                                    "tips"), 
                                 steps=50)

print(eval_result)

I1107 15:38:46.951269 140276907915072 estimator.py:1145] Calling model_fn.
W1107 15:38:47.779766 140276907915072 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/metrics_impl.py:2027: div (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
W1107 15:38:48.069793 140276907915072 metrics_impl.py:804] Trapezoidal rule is known to produce incorrect PR-AUCs; please switch to "careful_interpolation" instead.
W1107 15:38:48.098131 140276907915072 metrics_impl.py:804] Trapezoidal rule is known to produce incorrect PR-AUCs; please switch to "careful_interpolation" instead.
I1107 15:38:48.312572 140276907915072 estimator.py:1147] Done calling model_fn.
I1107 15:38:48.338888 140276907915072 evaluation.py:255] Starting evaluation at 2019-11-07T15:38:48Z
I1107 15:38:48.440474 140276907915072 monitored_session.py:240] Graph was finalized.
W1107 15

{'accuracy': 0.769375, 'accuracy_baseline': 0.758125, 'auc': 0.8707819, 'auc_precision_recall': 0.600909, 'average_loss': 0.55728555, 'label/mean': 0.241875, 'loss': 17.833138, 'precision': 0.61538464, 'prediction/mean': 0.40588272, 'recall': 0.12403101, 'global_step': 3}


#### Model Analysis

TF Model Analysis docs: https://www.tensorflow.org/tfx/model_analysis/get_started

In [12]:
# TODO: Implement model load and params analysis

def eval_input_receiver_fn(transformed_output):
    """Build everything needed for the tf-model-analysis to run the model.
    Args:
      transformed_output: tft.TFTransformOutput
    Returns:
      EvalInputReceiver function, which contains:
        - Tensorflow graph which parses raw untranformed features, applies the
          tf-transform preprocessing operators.
        - Set of raw, untransformed features.
        - Label against which predictions will be compared.
    """
    serialized_tf_example = tf.compat.v1.placeholder(
        dtype=tf.string, shape=[None], name='input_example_tensor')
    features = tf.io.parse_example(serialized_tf_example, transformed_output.raw_feature_spec())
    transformed_features = transformed_output.transform_raw_features(features)
    receiver_tensors = {'examples': serialized_tf_example}
    return tfma.export.EvalInputReceiver(
        features=transformed_features,
        receiver_tensors=receiver_tensors,
        labels=transformed_features[LABEL_KEY])

# EXPORT MODEL
eval_model_dir = os.path.join(training_output, 'tfma_eval_model_dir')
tfma.export.export_eval_savedmodel(
    estimator=estimator,
    export_dir_base=eval_model_dir,
    eval_input_receiver_fn=(lambda: eval_input_receiver_fn(tf_transform_output)))

W1107 15:40:23.048025 140276907915072 ops.py:6619] Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\013\n\tConst_1:0\022\022vocab_payment_type"

W1107 15:40:23.053156 140276907915072 ops.py:6619] Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\013\n\tConst_3:0\022\031vocab_pickup_census_tract"

W1107 15:40:23.054471 140276907915072 ops.py:6619] Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\013\n\tConst_4:0\022\rvocab_company"

W1107 15:40:23.056294 140276907915072 ops.py:6619] Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\013\n\tConst_7:0\022\nvocab_tips"

W1107 15:40:23.056940 140276907915072 ops.py:6619] Expected binary or unicode string, got type_url: "type.googleapis.com/tensorflow.AssetFileDef"
value: "\n\013\n\tConst_8:0\022\033vocab_pickup_communit

b'data/training/tfma_eval_model_dir/1573141222'