## Setup


In [None]:
!pip install tfx

In [None]:
#import required libraries
import os
import pprint
import tempfile
import urllib

import pandas as pd
import numpy as np

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing


%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
%reload_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
#check library versions
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

In [None]:
!rm -rf data.*
!rm -rf *trainer.py
!sudo rm -r /content/tfx

In [None]:
! mkdir /content/tfx/
! mkdir /content/tfx/data


## load dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
df=pd.read_csv('/content/drive/MyDrive/temp.csv',index_col=0)


#Drop NA rows
df = df.dropna()
#df=df.drop(columns=['key'])

##Keep a test set for final testing( TFX internally splits train and validation data )
np.random.seed(seed=2)
msk = np.random.rand(len(df)) < 0.9
traindf = df[msk]
evaldf = df[~msk]

print(len(traindf))
print(len(evaldf))

traindf.to_csv("/content/tfx/data/data_trans.csv", index=False, header=True)
evaldf.to_csv("eval.csv", index=False, header=False)

## Create interactive context

In [None]:
_tfx_root = os.path.join(os.getcwd(), 'tfx');
context = InteractiveContext(pipeline_root=_tfx_root)

## ExampleGen for Splitting of data

In [None]:
example_gen = CsvExampleGen(input=external_input(os.path.join(_tfx_root, 'data')))
context.run(example_gen)

In [None]:
#We get train ane evaulate splits of data
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

In [None]:
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'Split-train')

tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

## StatisticsGen to compute statistics analysis

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs['statistics'])

## SchemaGen to produce schema

In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

In [None]:
context.show(schema_gen.outputs['schema'])

## Example valiator to detect anomalies in data

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

In [None]:
context.show(example_validator.outputs['anomalies'])

In [None]:
# Get the URI of the output artifact representing the transformed examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 1 records and decode them.
for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

## Transform for feature engineering

In [None]:
bins_lat = pd.qcut(list(df['dropoff_latitude'].values) + list(df['pickup_latitude'].values), q=20, duplicates='drop', retbins=True)[1]
bins_lon = pd.qcut(list(df['dropoff_longitude'].values) + list(df['pickup_longitude'].values), q=20, duplicates='drop', retbins=True)[1]

In [None]:
code = '''
bins_lat = {bins_lat}
bins_lon = {bins_lon}
'''

code = code.replace('{bins_lat}', str(list(bins_lat)))
code = code.replace('{bins_lon}', str(list(bins_lon)))

with open('constants_trainer.py', 'w') as writefile:
    writefile.write(code)

## Training

In [None]:
_input_fn_module_file = 'inputfn_trainer.py'

In [None]:
%%writefile {_input_fn_module_file}

import os
import tensorflow as tf

###############################
##Feature engineering functions
def feature_engg_features(df):
  #Add new features
  df['distance'] = ((df['pickup_latitude'] - df['dropoff_latitude'])**2 +  (df['pickup_longitude'] - df['dropoff_longitude'])**2)**0.5
  
  df['pickup_datetime'] = tf.strings.as_string(df['pickup_datetime'])

  df['split_date_time'] = tf.strings.split(df['pickup_datetime'], ' ').to_tensor()

  df['date'] = tf.gather(df['split_date_time'] ,0 , axis=1)
  df['time'] = tf.gather(df['split_date_time'] ,1 , axis=1)

  df['split_date'] = tf.strings.split(df['date'], '-',name='split_date').to_tensor()
  df['split_time'] = tf.strings.split(df['time'], ':',name='split_time').to_tensor()

  df['trip_start_month'] = tf.gather(df['split_date'] ,1 , axis=1)
  df['trip_start_day'] = tf.gather(df['split_date'] ,2 , axis=1)
  df['trip_start_hour'] = tf.gather(df['split_time'] ,0 , axis=1)
  return(df)


#To be called from TF
def feature_engg(features, label):
  #Add new features
  features = feature_engg_features(features)

  return(features, label)

def make_input_fn(dir_uri, mode, vnum_epochs = None, batch_size = 512):
    def decode_tfr(serialized_example):
      # 1. define a parser
      features = tf.io.parse_example(
        serialized_example,
        # Defaults are not specified since both keys are required.
        features={
            'dropoff_latitude': tf.io.FixedLenFeature([], tf.float32),
            'dropoff_longitude': tf.io.FixedLenFeature([], tf.float32),
            'fare_amount': tf.io.FixedLenFeature([], tf.float32),
            'pickup_latitude': tf.io.FixedLenFeature([], tf.float32, default_value = 0.0),
            'pickup_longitude': tf.io.FixedLenFeature([], tf.float32, default_value = 0.0),
            'pickup_datetime': tf.io.FixedLenFeature([], tf., default_value = '2014-06-30 11:26:37')
        })
      

      return features, features['fare_amount']

    def _input_fn(v_test=False):
      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = tf.io.gfile.glob(dir_uri)

      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
      print('k')
      if mode == tf.estimator.ModeKeys.TRAIN:
        num_epochs = vnum_epochs # indefinitely
      else:
        num_epochs = 1 # end-of-input after this

      dataset = dataset.batch(batch_size)
      dataset = dataset.prefetch(buffer_size = batch_size)

      #Convert TFRecord data to dict
      dataset = dataset.map(decode_tfr)

      #Feature engineering
      dataset = dataset.map(feature_engg)

      if mode == tf.estimator.ModeKeys.TRAIN:
          num_epochs = vnum_epochs # indefinitely
          dataset = dataset.shuffle(buffer_size = batch_size)
      else:
          num_epochs = 1 # end-of-input after this

      dataset = dataset.repeat(num_epochs)       
      
      if v_test == True:
        print(next(dataset.__iter__()))
        
      return dataset
    return _input_fn

In [None]:
import inputfn_trainer as ift
eval_file = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'eval/*')
fn_d = ift.make_input_fn(dir_uri = eval_file,
                    mode = tf.estimator.ModeKeys.EVAL,
                    batch_size = 10)
fn_d(v_test=True)

In [None]:
%%writefile model_trainer.py

import tensorflow as tf
import tensorflow.keras as keras
import inputfn_trainer as ift
import constants_trainer as ct

from tfx.components.trainer.fn_args_utils import FnArgs
print(tf.__version__)

device = "gpu"

if device == "tpu":
  resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
  tf.config.experimental_connect_to_cluster(resolver)
  tf.tpu.experimental.initialize_tpu_system(resolver)
  strategy = tf.distribute.experimental.TPUStrategy(resolver)
else:
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

#Create model
params_default = {
    'lr' : 0.001,
    'beta_1' : 0.99,
    'beta_2' : 0.999,
    'epsilon' : 1e-08,
    'decay' : 0.01,
    'hidden_layers' : 1
}


def create_feature_cols():
    #Keras format features
    k_month = tf.keras.Input(name='trip_start_month', shape=(1,), dtype=tf.string)
    k_hour  = tf.keras.Input(name='trip_start_hour', shape=(1,), dtype=tf.string)
    k_day  = tf.keras.Input(name='trip_start_day', shape=(1,), dtype=tf.string)
    k_picklat  = tf.keras.Input(name='pickup_latitude', shape=(1,), dtype=tf.float32)
    k_picklon  = tf.keras.Input(name='pickup_longitude', shape=(1,), dtype=tf.float32)
    k_droplat  = tf.keras.Input(name='dropoff_latitude', shape=(1,), dtype=tf.float32)
    k_droplon  = tf.keras.Input(name='dropoff_longitude', shape=(1,), dtype=tf.float32)
    k_distance  = tf.keras.Input(name='distance', shape=(1,), dtype=tf.float32)
    keras_dict_input = {'trip_start_month': k_month, 'trip_start_hour': k_hour, 'trip_start_day' : k_day,
                        'pickup_latitude': k_picklat, 'pickup_longitude': k_picklon,
                        'dropoff_latitude': k_droplat, 'dropoff_longitude': k_droplon, 'distance' : k_distance
                        }

    return({'K' : keras_dict_input})

def create_keras_model(feature_cols, bins_lat, bins_lon,  params = params_default):
    METRICS = [
            keras.metrics.RootMeanSquaredError(name='rmse')
    ]

    #Input layers
    input_feats = []
    for inp in feature_cols['K'].keys():
      input_feats.append(feature_cols['K'][inp])

    

    ##Handle categorical attributes( One-hot encoding )
    cat_day = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7'], mask_token=None)(feature_cols['K']['trip_start_day'])
    cat_day = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=7)(cat_day)

    cat_hour = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7','8'
                                                                                      '9','10','11','12','13','14','15','16',
                                                                                      '17','18','19','20','21','22','23','0'
                                                                                      ], mask_token=None)(feature_cols['K']['trip_start_hour'])
    cat_hour = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=24)(cat_hour)

    cat_month = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=['1','2','3','4','5','6','7','8'
                                                                                      '9','10','11','12'], mask_token=None)(feature_cols['K']['trip_start_month'])
    cat_month = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=12)(cat_month)

    # cat_company = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=df['company'].unique(), mask_token=None)(feature_cols['K']['company'])
    # cat_company = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=len(df['company'].unique()))(cat_company)

    ##Binning
    bins_pickup_lat = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lat)(feature_cols['K']['pickup_latitude'])
    cat_pickup_lat = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lat)+1)(bins_pickup_lat)

    bins_pickup_lon = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lon)(feature_cols['K']['pickup_longitude'])
    cat_pickup_lon = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lon)+1)(bins_pickup_lon)

    bins_drop_lat = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lat)(feature_cols['K']['dropoff_latitude'])
    cat_drop_lat = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lat)+1)(bins_drop_lat)

    bins_drop_lon = tf.keras.layers.experimental.preprocessing.Discretization(bins = bins_lon)(feature_cols['K']['dropoff_longitude'])
    cat_drop_lon = tf.keras.layers.experimental.preprocessing.CategoryEncoding(len(bins_lon)+1)(bins_drop_lon)

    ##Categorical cross
    cross_day_hour = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_day, cat_hour])
    hash_cross_day_hour = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=24 * 7)(cross_day_hour)
    cat_cross_day_hour = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens = 24* 7)(hash_cross_day_hour)

    cross_pick_lon_lat = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_pickup_lat, cat_pickup_lon])
    hash_cross_pick_lon_lat = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=(len(bins_lat) + 1) ** 2)(cross_pick_lon_lat)

    cross_drop_lon_lat = tf.keras.layers.experimental.preprocessing.CategoryCrossing()([cat_drop_lat, cat_drop_lon])
    hash_cross_drop_lon_lat = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=(len(bins_lat) + 1) ** 2)(cross_drop_lon_lat)

    # Cross to embedding
    embed_cross_pick_lon_lat = tf.keras.layers.Embedding(((len(bins_lat) + 1) ** 2), 4)(hash_cross_pick_lon_lat)
    embed_cross_pick_lon_lat = tf.reduce_sum(embed_cross_pick_lon_lat, axis=-2)

    embed_cross_drop_lon_lat = tf.keras.layers.Embedding(((len(bins_lat) + 1) ** 2), 4)(hash_cross_drop_lon_lat)
    embed_cross_drop_lon_lat = tf.reduce_sum(embed_cross_drop_lon_lat, axis=-2)

    # Also pass time attributes as Deep signal( Cast to integer )
    int_trip_start_day = tf.strings.to_number(feature_cols['K']['trip_start_day'], tf.float32)
    int_trip_start_hour = tf.strings.to_number(feature_cols['K']['trip_start_hour'], tf.float32)
    int_trip_start_month = tf.strings.to_number(feature_cols['K']['trip_start_month'], tf.float32)

    #Add feature engineered columns - LAMBDA layer

    ###Create MODEL
    ####Concatenate all features( Numerical input )
    x_input_numeric = tf.keras.layers.concatenate([
                    feature_cols['K']['pickup_latitude'], feature_cols['K']['pickup_longitude'],
                    feature_cols['K']['dropoff_latitude'], feature_cols['K']['dropoff_longitude'],
                    feature_cols['K']['distance'], embed_cross_pick_lon_lat, embed_cross_drop_lon_lat,
                    int_trip_start_day, int_trip_start_hour, int_trip_start_month
                    ])

    #DEEP - This Dense layer connects to input layer - Numeric Data
    x_numeric = tf.keras.layers.Dense(32, activation='relu', kernel_initializer="he_uniform")(x_input_numeric)
    x_numeric = tf.keras.layers.BatchNormalization()(x_numeric)

    ####Concatenate all Categorical features( Categorical converted )
    x_input_categ = tf.keras.layers.concatenate([
                    cat_month, cat_cross_day_hour, cat_pickup_lat, cat_pickup_lon,
                    cat_drop_lat, cat_drop_lon
                    ])
    
    #WIDE - This Dense layer connects to input layer - Categorical Data
    x_categ = tf.keras.layers.Dense(32, activation='relu', kernel_initializer="he_uniform")(x_input_categ)

    ####Concatenate both Wide and Deep layers
    x = tf.keras.layers.concatenate([x_categ, x_numeric])

    for l_ in range(params['hidden_layers']):
        x = tf.keras.layers.Dense(32, activation='relu', kernel_initializer="he_uniform",
                                  activity_regularizer=tf.keras.regularizers.l2(0.00001))(x)
        x = tf.keras.layers.BatchNormalization()(x)

    #Final Layer
    out = tf.keras.layers.Dense(1, activation='relu')(x)
    model = tf.keras.Model(input_feats, out)

    #Set optimizer
    opt = tf.keras.optimizers.Adam(lr= params['lr'], beta_1=params['beta_1'], 
                                        beta_2=params['beta_2'], epsilon=params['epsilon'])

    #Compile model
    model.compile(loss='mean_squared_error',  optimizer=opt, metrics = METRICS)

    #Print Summary
    print(model.summary())
    return model

def keras_train_and_evaluate(model, train_dataset, validation_dataset, epochs=100):
  #Add callbacks
  reduce_lr = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                                patience=5, min_lr=0.00001, verbose = 1)
  
  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")

  #Train and Evaluate
  out = model.fit(train_dataset, 
                  validation_data = validation_dataset,
                  epochs=epochs,
                  # validation_steps = 3,   
                  steps_per_epoch = 100,   
                  callbacks=[reduce_lr,  
                             keras.callbacks.EarlyStopping(patience=20, restore_best_weights=True, verbose=True)]
                  )
  return model

def save_model(model, model_save_path):
  @tf.function
  def serving(dropoff_latitude, dropoff_longitude, pickup_latitude, pickup_longitude, trip_start_day, trip_start_hour, trip_start_month):
      distance = tf.cast( tf.sqrt((tf.abs(dropoff_latitude - pickup_latitude))**2 + (tf.abs(dropoff_longitude - pickup_longitude))**2), tf.float32)

      payload = {
          'dropoff_latitude': dropoff_latitude,
          'dropoff_longitude': dropoff_longitude,
          'pickup_latitude': pickup_latitude,
          'pickup_longitude': pickup_longitude,
          'trip_start_day': trip_start_day,
          'trip_start_hour': trip_start_hour,
          'trip_start_month': trip_start_month,
          'distance': distance
      }
      
      predictions = model(payload)
      return predictions

  serving = serving.get_concrete_function(trip_start_day=tf.TensorSpec([None,], dtype= tf.string, name='trip_start_day'), 
                                          trip_start_hour=tf.TensorSpec([None,], dtype= tf.string, name='trip_start_hour'),
                                          trip_start_month=tf.TensorSpec([None], dtype= tf.string, name='trip_start_month'), 
                                          dropoff_latitude=tf.TensorSpec([None,], dtype= tf.float32, name='dropoff_latitude'),
                                          dropoff_longitude=tf.TensorSpec([None,], dtype= tf.float32, name='dropoff_longitude'), 
                                          pickup_latitude=tf.TensorSpec([None,], dtype= tf.float32, name='pickup_latitude'),
                                          pickup_longitude=tf.TensorSpec([None,], dtype= tf.float32, name='pickup_longitude')
                                          )

  # version = "1"  #{'serving_default': call_output}
  tf.saved_model.save(
      model,
      model_save_path + "/",
      signatures=serving
  )

##Main function called by TFX
def run_fn(fn_args: FnArgs):
  train_dataset = ift.make_input_fn(dir_uri = fn_args.train_files,
                      mode = tf.estimator.ModeKeys.TRAIN,
                      batch_size = 128)()

  validation_dataset = ift.make_input_fn(dir_uri = fn_args.eval_files,
                      mode = tf.estimator.ModeKeys.EVAL,
                      batch_size = 512)()

  m_ = create_keras_model(params = params_default, feature_cols = create_feature_cols(),
                          bins_lat = ct.bins_lat,
                          bins_lon = ct.bins_lon)
  tf.keras.utils.plot_model(m_, show_shapes=True, rankdir="LR")

  m_ = keras_train_and_evaluate(m_, train_dataset, validation_dataset, fn_args.custom_config['epochs'])

  save_model(m_, fn_args.serving_model_dir)

In [None]:
_model_trainer_module_file='model_trainer.py'
trainer = Trainer(
    module_file=os.path.abspath(_model_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=example_gen.outputs['examples'],
    train_args=trainer_pb2.TrainArgs(),
    eval_args=trainer_pb2.EvalArgs(),
    custom_config=({"epochs": 1})
    )

context.run(trainer)

## Analyzing pipeline

In [None]:
model_artifact_dir = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dir))
model_dir = os.path.join(model_artifact_dir, 'serving_model_dir')
pp.pprint(os.listdir(model_dir))

## Pusher to verify validation

In [None]:
pusher = Pusher(
    model=trainer.outputs['model'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)