<a name='1'></a>
## 1 - Install and Imports

In [None]:
!pip install -U tfx 
!pip3 install tensorflow_text>=2.0.0rc0  

In [None]:
import os
import sys
import pandas as pd
import tempfile
import numpy as np
import pprint
import argparse
from typing import Dict, Text, Any, Tuple, List

import tensorflow as tf

import tensorflow_model_analysis as tfma
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import apache_beam

from tensorflow import keras
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform
from tfx.components import Tuner
from tfx.components import Trainer
from tfx.components import BulkInferrer
from tfx.types import standard_artifacts
from tfx import v1 as tfx
from tfx import types
from tfx.components.bulk_inferrer import prediction_to_example_utils
from tfx.components.util import model_utils
from tfx.proto import bulk_inferrer_pb2
from tfx.proto import trainer_pb2
from tfx.proto import example_gen_pb2
from tfx.types import artifact_utils
from tfx.utils import io_utils
from tfx.utils import path_utils
from tfx.dsl.components.base import base_executor

from tfx_bsl.public.proto import model_spec_pb2
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public import tfxio

from tensorflow.python.lib.io import file_io
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_serving.apis import prediction_log_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

from google.protobuf import json_format
from google.protobuf import text_format
from google.protobuf.json_format import MessageToDict

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

tf.get_logger().setLevel('ERROR')
pp = pprint.PrettyPrinter()

# Display versions of TF and TFX related packages
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
print('TensorFlow Data Validation version: {}'.format(tfdv.__version__))
print('TensorFlow Transform version: {}'.format(tft.__version__))

<a name='2'></a>
## 2 - Load the dataset

In [None]:
# Declare paths to the data

INPUT_SOURCE_DIR = '../input/foursquare-location-matching'
INPUT_SOURCE_PAIRS_DATA = f'{INPUT_SOURCE_DIR}/pairs.csv'
INPUT_SOURCE_TRAIN_DATA = f'{INPUT_SOURCE_DIR}/train.csv'
INPUT_SOURCE_TEST_DATA = f'{INPUT_SOURCE_DIR}/test.csv'

PIPELINE_DIR = './pipeline'

TRAINING_DATA_DIR = './training_data'
TRAINING_DATA = f'{TRAINING_DATA_DIR}/dataset.csv'

SERVING_MODEL_DIR = './serving_model'
SERVING_DATA_DIR = './serving_data'
SERVING_DATA = f'{SERVING_DATA_DIR}/dataset.csv'


# Create the directory
!mkdir -p {PIPELINE_DIR}
!mkdir -p {TRAINING_DATA_DIR}
!mkdir -p {SERVING_MODEL_DIR}
!mkdir -p {SERVING_DATA_DIR}


In [None]:
pairs_df = pd.read_csv(INPUT_SOURCE_PAIRS_DATA)
pairs_df = pairs_df.replace(np.nan, '',regex=True)
pairs_df.shape

In [None]:
#combine address columns : address_1, city_1, state_1, zip_1 to full_address_1
pairs_df['full_address_1'] = pairs_df['address_1'].map(str) + " " + pairs_df['city_1'].map(str) + " " + pairs_df['state_1'].map(str) + " " + pairs_df['zip_1'].map(str) + " Phone: " + pairs_df['phone_1'].map(str)
pairs_df['full_address_2'] = pairs_df['address_2'].map(str) + " " + pairs_df['city_2'].map(str) + " " + pairs_df['state_2'].map(str) + " " + pairs_df['zip_2'].map(str) + " Phone: " + pairs_df['phone_2'].map(str)
pairs_df = pairs_df.drop(columns=['address_1','city_1','state_1','zip_1', 'phone_1', 'address_2','city_2','state_2','zip_2', 'phone_2'], axis=1)
pairs_df.dtypes

In [None]:
train_df = pd.read_csv(INPUT_SOURCE_TRAIN_DATA)
train_df = train_df.replace(np.nan, '',regex=True)

In [None]:
#create additional matching pairs from train.csv and add it to pairs dataframe
add_pairs_df = pd.merge(train_df, train_df, on="point_of_interest", suffixes=('_1', '_2'))
add_pairs_df = add_pairs_df[add_pairs_df["id_1"]!=add_pairs_df["id_2"]]
add_pairs_df = add_pairs_df.drop(["point_of_interest"], axis=1)
add_pairs_df["match"] = True

add_pairs_df['full_address_1'] = add_pairs_df['address_1'].map(str) + " " + add_pairs_df['city_1'].map(str) + " " + add_pairs_df['state_1'].map(str) + " " + add_pairs_df['zip_1'].map(str) + " Phone: " + add_pairs_df['phone_1'].map(str)
add_pairs_df['full_address_2'] = add_pairs_df['address_2'].map(str) + " " + add_pairs_df['city_2'].map(str) + " " + add_pairs_df['state_2'].map(str) + " " + add_pairs_df['zip_2'].map(str) + " Phone: " + add_pairs_df['phone_2'].map(str)
add_pairs_df = add_pairs_df.drop(columns=['address_1','city_1','state_1','zip_1', 'phone_1', 'address_2','city_2','state_2','zip_2', 'phone_2'], axis=1)
add_pairs_df.shape

In [None]:
#create additional non-matching pairs from train.csv and add it to pairs dataframe

non_match_df = train_df.head(1000)
non_match_df=non_match_df.merge(non_match_df, how='cross', suffixes=('_1', '_2'))
non_match_df = non_match_df[non_match_df['id_1'] != non_match_df['id_2']]
non_match_df.loc[non_match_df['point_of_interest_1'] != non_match_df['point_of_interest_2'], 'match'] = False
non_match_df.loc[non_match_df['point_of_interest_1'] == non_match_df['point_of_interest_2'], 'match'] = True
non_match_df = non_match_df.drop(["point_of_interest_1","point_of_interest_2",], axis=1)

non_match_df['full_address_1'] = non_match_df['address_1'].map(str) + " " + non_match_df['city_1'].map(str) + " " + non_match_df['state_1'].map(str) + " " + non_match_df['zip_1'].map(str) + " Phone: " + non_match_df['phone_1'].map(str)
non_match_df['full_address_2'] = non_match_df['address_2'].map(str) + " " + non_match_df['city_2'].map(str) + " " + non_match_df['state_2'].map(str) + " " + non_match_df['zip_2'].map(str) + " Phone: " + non_match_df['phone_2'].map(str)
non_match_df = non_match_df.drop(columns=['address_1','city_1','state_1','zip_1', 'phone_1', 'address_2','city_2','state_2','zip_2', 'phone_2'], axis=1)
non_match_df.shape


In [None]:
pairs_df = pd.concat([pairs_df, add_pairs_df], ignore_index=True)
pairs_df = pd.concat([pairs_df, non_match_df], ignore_index=True)
pairs_df.shape

In [None]:
pairs_df.drop_duplicates()
pairs_df.shape

In [None]:
pairs_df = pairs_df[pairs_df["id_1"]!=pairs_df["id_2"]]
pairs_df.shape

In [None]:
pairs_df['match'] = pairs_df.match.astype("category").cat.codes
display(pairs_df['match'].unique())
pairs_df.full_address_1 
pairs_df.shape

In [None]:
#pairs_df.head(2000).to_csv(TRAINING_DATA, index=False)
pairs_df.to_csv(TRAINING_DATA, index=False)

In [None]:
del pairs_df
del add_pairs_df
#del non_match_df

<a name='3'></a>
## 3 - Data Ingestion

<a name='3-1'></a>
### 3.1 - Setup the Interactive Context

Setup the interactive context in pipeline directory

In [None]:
# Declare the InteractiveContext and use a local sqlite file as the metadata store.
context = InteractiveContext(pipeline_root=PIPELINE_DIR)

<a name='3-2'></a>
### 3.2 - Generating Examples
Ingest the data using TFX component - [ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)
* The input is in CSV format so you will need to use the appropriate type of `ExampleGen` to handle it. 
* This function accepts a *directory* path to the training data and not the CSV file path itself. 

In [None]:
# Instantiate ExampleGen with the input CSV dataset
example_gen = CsvExampleGen(input_base=TRAINING_DATA_DIR)
context.run(example_gen)

<a name='4'></a>
## 4 - Data Validation

<a name='4-1'></a>
### 4.1 - StatisticsGen

Compute the statistics using TFX Component - StatisticsGen. Visualizations provided by the integrated [FACETS](https://pair-code.github.io/facets/) library.

In [None]:
# Instantiate StatisticsGen with the ExampleGen ingested dataset
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistics_gen)


In [None]:
# Display the results
context.show(statistics_gen.outputs['statistics'])

<a name='4-2'></a>
### 4.2 - SchemaGen

Infer the input dataset schema using TFX component SchemaGen to validate incoming datasets during training and serving. [SchemaGen](https://www.tensorflow.org/tfx/guide/schemagen) component.

In [None]:
# Instantiate SchemaGen with the output statistics from the StatisticsGen
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(schema_gen, enable_cache=True)

In [None]:
# Visualize the output
context.show(schema_gen.outputs['schema'])

<a name='4-3'></a>
### 4.3 - Schema Environments

In supervised learning, we train the model to make predictions by feeding a set of features with its corresponding label. Thus, our training dataset will have both the input features and label, and the schema is configured to detect these. 

<a name='5'></a>
## 5 - Feature Engineering




<a name='5-1'></a>
### 5.1 - Transform
Using [TFX Transform component](https://www.tensorflow.org/tfx/api_docs/python/tfx/components/Transform) 


*   create constant module file
*   create transform module file
*   instantiate transform and run

In [None]:
# Set the constants module filename
_fs_constants_module_file = 'fs_constants.py'

In [None]:
%%writefile {_fs_constants_module_file}

SCALE_MINMAX_FEATURE_KEYS = [
        "name_distance",
        "location_distance"
#        "address_distance",
#        "cityzip_distance"
#        "url_distance"
#        "phone_distance"
       ]

VOCAB_FEATURE_KEYS = [

        "country_1",
        "country_2"
 #       "categories_1",
 #       "categories_2"
        ]

LABEL_KEY = "match"

# Utility function for renaming the feature
def transformed_name(key):
    return key + '_xf'

In [None]:
# Set the transform module filename
_fs_transform_module_file = 'fs_transform.py'

In [None]:
%%writefile {_fs_transform_module_file}

import tensorflow_hub as hub
import tensorflow_text
import tensorflow as tf
import tensorflow_transform as tft
import numpy as np
import fs_constants

embed = hub.load("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/3")

_SCALE_MINMAX_FEATURE_KEYS = fs_constants.SCALE_MINMAX_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = fs_constants.VOCAB_FEATURE_KEYS
_LABEL_KEY = fs_constants.LABEL_KEY
_transformed_name = fs_constants.transformed_name

def text_similarity(text1,text2):
    text1_vec = embed(tf.reshape(text1,[-1]))
    text2_vec = embed(tf.reshape(text2,[-1]))
    name_distance = tf.matmul(text1_vec, tf.transpose(text2_vec))
    return name_distance[:,0]

def scale_longitude(lon):
    return (lon + 78)/8.

def scale_latitude(lat):
    return (lat - 37)/8.

def euclidean(lon1, lat1, lon2, lat2):
    londiff = lon2 - lon1
    latdiff = lat2 - lat1
    return tf.sqrt(londiff*londiff + latdiff*latdiff)

def edit_distance(phone_1, phone_2):
    return tf.edit_distance(phone_1, phone_2, normalize=True)

def poi_distance(lon1, lat1, lon2, lat2):
    scaled_lon1 = scale_longitude(lon1)
    scaled_lon2 = scale_longitude(lon2)
    scaled_lat1 = scale_latitude(lat1)
    scaled_lat2 = scale_latitude(lat2)
    return euclidean(scaled_lon1, scaled_lat1, scaled_lon2, scaled_lat2)

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor.
   Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
    Args:
      x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
       in the second dimension.
    Returns:
      A rank 1 tensor where missing values of `x` have been filled in.
    """
    if not isinstance(x, tf.sparse.SparseTensor):
        return x

    default_value = '' if x.dtype == tf.string else 0
    return tf.squeeze(
        tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value),
        axis=1)

def preprocessing_fn(inputs):
    features_dict = {}
    
    name_distance = text_similarity(inputs["name_1"], inputs["name_2"])
    full_address_distance = text_similarity(_fill_in_missing(inputs["full_address_1"]), _fill_in_missing(inputs["full_address_2"]))
    url_distance = text_similarity(_fill_in_missing(inputs["url_1"]), _fill_in_missing(inputs["url_2"]))
    category_distance = text_similarity(_fill_in_missing(inputs["categories_1"]), _fill_in_missing(inputs["categories_2"]))
    
    location_distance = poi_distance(inputs["longitude_1"],inputs["latitude_1"], inputs["longitude_2"],inputs["latitude_2"])
    
    #for feature in _VOCAB_FEATURE_KEYS:
    #    features_dict[_transformed_name(feature)] = tf.cast(
    #                            tft.compute_and_apply_vocabulary(_fill_in_missing(inputs[feature])),dtype=tf.float32)                               
                                    
    features_dict["name_distance_xf"] = name_distance
    features_dict["full_address_distance_xf"] = full_address_distance
    features_dict["url_distance_xf"] = url_distance
    features_dict["category_distance_xf"] = category_distance
    features_dict["location_distance_xf"] = location_distance
        
    
    features_dict["name_distance_xf"] = tft.scale_by_min_max(features_dict["name_distance_xf"])
    features_dict["full_address_distance_xf"] = tft.scale_by_min_max(features_dict["full_address_distance_xf"])
    features_dict["url_distance_xf"] = tft.scale_by_min_max(features_dict["url_distance_xf"])
    features_dict["category_distance_xf"] = tft.scale_by_min_max(features_dict["category_distance_xf"])
    features_dict["location_distance_xf"] = tft.scale_by_min_max(features_dict["location_distance_xf"])
        
    features_dict[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.float32)
    return features_dict


In [None]:
# Instantiate the Transform component
transform = Transform(examples=example_gen.outputs['examples'],
                      schema=schema_gen.outputs['schema'],
                      module_file=os.path.abspath(_fs_transform_module_file))
# Run the component
context.run(transform, enable_cache=True)

In [None]:
# Get URI and list subdirectories
graph_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(graph_uri)

<a name='6'></a>
## 6 - Train Model




<a name='6-1'></a>
#### 6.1 - Tuner Function

Using [TFX Tuner component](https://www.tensorflow.org/tfx/guide/tuner) 

*   create tuner module file
*   instantiate tuner and run




In [None]:
# Declare name of module file
_tuner_module_file = 'tuner.py'

In [None]:
%%writefile {_tuner_module_file}

# Define imports
import tensorflow as tf
import tensorflow_transform as tft
from kerastuner.engine import base_tuner
import kerastuner as kt
from tensorflow import keras
from typing import NamedTuple, Dict, Text, Any, List
from tfx.components.trainer.fn_args_utils import FnArgs, DataAccessor


_FEATURE_KEYS = ['category_distance_xf', 'location_distance_xf', 'name_distance_xf',
                 'full_address_distance_xf', 'url_distance_xf']

# Declare namedtuple field names
TunerFnResult = NamedTuple('TunerFnResult', [('tuner', base_tuner.BaseTuner),
                                             ('fit_kwargs', Dict[Text, Any])])

# Label key
LABEL_KEY = 'match'

# Callback for the search strategy
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)

def _gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')
    
def _input_fn(file_pattern,
              tf_transform_output,
              num_epochs=None,
              batch_size=32) -> tf.data.Dataset:
    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy())

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=_gzip_reader_fn,
        num_epochs=num_epochs,
        label_key=LABEL_KEY)
    
    return dataset

def model_builder(hp):
 
    inputs = [
      keras.layers.Input(shape=(1,), name=f)
      for f in _FEATURE_KEYS
        ]
    
    output = tf.keras.layers.concatenate(inputs)

    hp_units1 = hp.Int('units1', min_value=32, max_value=512, step=32)
    output = tf.keras.layers.Dense(units=hp_units1, activation='relu')(output)

    hp_units2 = hp.Int('units2', min_value=32, max_value=512, step=32)
    output = tf.keras.layers.Dense(units=hp_units2, activation='relu')(output)

    hp_units3 = hp.Int('units3', min_value=32, max_value=512, step=32)
    output = tf.keras.layers.Dense(units=hp_units3, activation='relu')(output)

    hp_units4 = hp.Int('units4', min_value=32, max_value=512, step=32)
    output = tf.keras.layers.Dense(units=hp_units4, activation='relu')(output)
    
    output = keras.layers.Dense(1, activation='sigmoid')(output)
    model = tf.keras.Model(inputs=inputs, outputs=output)

    # Tune the learning rate for the optimizer
    # Choose an optimal value from 0.01, 0.001, or 0.0001
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss='binary_crossentropy',
                metrics=['accuracy'])
    return model
    
def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    tuner = kt.Hyperband(model_builder,
                     objective='val_accuracy',
                     max_epochs=5,
                     factor=3,
                     directory=fn_args.working_dir,
                     project_name='kt_hyperband')

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    train_set = _input_fn(fn_args.train_files[0], tf_transform_output)
    val_set = _input_fn(fn_args.eval_files[0], tf_transform_output)
    return TunerFnResult(
        tuner=tuner,
        fit_kwargs={ 
        "callbacks":[stop_early],
        'x': train_set,
        'validation_data': val_set,
        'steps_per_epoch': fn_args.train_steps,
        'validation_steps': fn_args.eval_steps
            }
        )

In [None]:
# Setup the Tuner component
tuner = Tuner(
    module_file=os.path.abspath(_tuner_module_file),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=5000),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=5000)
    )

In [None]:
context.run(tuner, enable_cache=True)

<a name='6-2'></a>
#### 6.2 - Trainer Function

Using [TFX Trainer component](https://www.tensorflow.org/tfx/guide/trainer) 

*   create trainer module file
*   instantiate trainer and run




In [None]:
# Declare trainer module file
_fs_trainer_module_file = 'trainer.py'

In [None]:
%%writefile {_fs_trainer_module_file}

from typing import Dict, List, Text

import os
import glob
from absl import logging

import datetime
import tensorflow as tf
import tensorflow_transform as tft

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput

_LABEL_KEY = 'match'
_BATCH_SIZE = 32

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 64) -> tf.data.Dataset:

    """Generates features and label for tuning/training.

    Args:
      file_pattern: List of paths or patterns of input tfrecord files.
      data_accessor: DataAccessor for converting input to RecordBatch.
      tf_transform_output: A TFTransformOutput.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    return data_accessor.tf_dataset_factory(file_pattern, 
                                          tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),
                                          tf_transform_output.transformed_metadata.schema)

def _get_tf_examples_serving_signature(model, tf_transform_output):
    """Returns a serving signature that accepts `tensorflow.Example`."""
  
    model.tft_layer_inference = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[tf.TensorSpec(shape=[None],
                                                dtype=tf.string, name='examples')])
    def serve_tf_examples_fn(serialized_tf_example):
      """Returns the output to be used in the serving signature."""
      raw_feature_spec = tf_transform_output.raw_feature_spec()
      # Remove label feature since these will not be present at serving time.
      raw_feature_spec.pop(_LABEL_KEY)
      raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
      transformed_features = model.tft_layer_inference(raw_features)
      logging.info('serve_transformed_features = %s', transformed_features)

      outputs = model(transformed_features)
      return {'outputs': outputs}

    return serve_tf_examples_fn

def _get_transform_features_signature(model, tf_transform_output):
    """Returns a serving signature that applies tf.Transform to features."""
    model.tft_layer_eval = tf_transform_output.transform_features_layer()

    @tf.function(input_signature=[
        tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
    ])
    def transform_features_fn(serialized_tf_example):
      """Returns the transformed_features to be fed as input to evaluator."""
      raw_feature_spec = tf_transform_output.raw_feature_spec()
      raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
      transformed_features = model.tft_layer_eval(raw_features)
      logging.info('eval_transformed_features = %s', transformed_features)
      return transformed_features

    return transform_features_fn

def export_serving_model(tf_transform_output, model, output_dir):
    """Exports a keras model for serving.
    Args:
      tf_transform_output: Wrapper around output of tf.Transform.
      model: A keras model to export for serving.
      output_dir: A directory where the model will be exported to.
    """
    # The layer has to be saved to the model for keras tracking purpases.
    model.tft_layer = tf_transform_output.transform_features_layer()

    signatures = {
        'serving_default':
            _get_tf_examples_serving_signature(model, tf_transform_output),
        'transform_features':
            _get_transform_features_signature(model, tf_transform_output),
    }

    model.save(output_dir, save_format='tf', signatures=signatures)

def _build_keras_model(tf_transform_output: TFTransformOutput,
                       hp
                       ) -> tf.keras.Model:
    """Creates a DNN Keras model for classifying taxi data.
    Args:
      tf_transform_output: [TFTransformOutput], the outputs from Transform
    Returns:
      A keras Model.
    """
    feature_spec = tf_transform_output.transformed_feature_spec().copy()
    feature_spec.pop(_LABEL_KEY)

    inputs = {}
    for key, spec in feature_spec.items():
      if isinstance(spec, tf.io.VarLenFeature):
        inputs[key] = tf.keras.layers.Input(
            shape=[None], name=key, dtype=spec.dtype, sparse=True)
      elif isinstance(spec, tf.io.FixedLenFeature):
        inputs[key] = tf.keras.layers.Input(
            shape=spec.shape or [1], name=key, dtype=spec.dtype)
      else:
        raise ValueError('Spec type is not supported: ', key, spec)

    hp_units1 = hp.get('units1')
    hp_units2 = hp.get('units2')
    hp_units3 = hp.get('units3')
    hp_units4 = hp.get('units4')
    
    output = tf.keras.layers.Concatenate()(tf.nest.flatten(inputs))
    output = tf.keras.layers.Dense(units=hp_units1, activation='relu')(output)
    output = tf.keras.layers.Dense(units=hp_units2, activation='relu')(output)
    output = tf.keras.layers.Dense(units=hp_units3, activation='relu')(output)
    output = tf.keras.layers.Dense(units=hp_units4, activation='relu')(output)
    output = tf.keras.layers.Dense(1, activation='sigmoid')(output)

    return tf.keras.Model(inputs=inputs, outputs=output)

def run_fn(fn_args: tfx.components.FnArgs):

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, 
                            tf_transform_output, _BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, 
                           tf_transform_output, _BATCH_SIZE)

  # Load best hyperparameters
    hp = fn_args.hyperparameters.get('values')
    hp_learning_rate = hp.get('learning_rate')

    model = _build_keras_model(tf_transform_output, hp)
       
    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
        metrics=[tf.keras.metrics.BinaryAccuracy()])
    
    model.summary()

  # Callback for TensorBoard
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=fn_args.model_run_dir, update_freq='batch')
  
    model.fit(
       train_dataset,
       epochs=10,
       steps_per_epoch=fn_args.train_steps,
       validation_data=eval_dataset,
       validation_steps=fn_args.eval_steps,
       callbacks=[tensorboard_callback])
  
   # Export the model.
    export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)

In [None]:
# Setup the Trainer component
trainer = Trainer(
    module_file=_fs_trainer_module_file,
    examples=transform.outputs['transformed_examples'],
    hyperparameters=tuner.outputs['best_hyperparameters'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=tfx.proto.TrainArgs(num_steps=25000),
    eval_args=tfx.proto.EvalArgs(num_steps=12000))

In [None]:
# Run the component
context.run(trainer, enable_cache=True)

In [None]:
# Get artifact uri of trainer model output
model_artifact_dir = trainer.outputs['model'].get()[0].uri
print(model_artifact_dir)

# List subdirectories artifact uri
print(f'contents of model artifact directory:{os.listdir(model_artifact_dir)}')

# Define the model directory
model_dir = os.path.join(model_artifact_dir, 'Format-Serving')

# List contents of model directory
print(f'contents of model directory: {os.listdir(model_dir)}')

In [None]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri
print(model_run_artifact_dir)
%reload_ext tensorboard
%tensorboard --logdir='./pipeline/Trainer/model_run/6'

In [None]:
!zip -r trainer.zip './pipeline/Trainer'