# Imports

In [None]:
%pip install tensorflow-data-validation
%pip install -q tensorflow_data_validation[visualization]
%pip install tfx

In [None]:
import sys
import os
import numpy as np
import pandas as pd
import pickle
import tempfile
import tensorflow_data_validation as tfdv
import tensorflow as tf

np.set_printoptions(threshold=sys.maxsize)
print('TFDV version: {}'.format(tfdv.version.__version__))
print('TF version: {}'.format(tf.__version__))

# Data Analysis

#### Load and display data

In [None]:
DATA = './data'
TRAIN_DATA = os.path.join(DATA, 'train.csv')
TEST_DATA = os.path.join(DATA, 'test.csv')
OUTPUT = './output'

In [None]:
train_df = pd.read_csv(TRAIN_DATA, sep=";")    
test_df = pd.read_csv(TEST_DATA, sep=";")
display(train_df)
display(test_df)

#### Generate Statistics

In [None]:
%%capture
import tensorflow_data_validation as tfdv
print('TFDV version: {}'.format(tfdv.version.__version__))
train_stats = tfdv.generate_statistics_from_dataframe(train_df)
test_stats = tfdv.generate_statistics_from_dataframe(test_df)

In [None]:
tfdv.visualize_statistics(train_stats)
tfdv.visualize_statistics(lhs_statistics=train_stats,
                         rhs_statistics=test_stats)

#### Infer schema and detect anomalies

In [None]:
schema = tfdv.infer_schema(train_stats)
tfdv.display_schema(schema)

In [None]:
from tensorflow_metadata.proto.v0 import schema_pb2

# Create schema environments and remove the label from the testing environment so it is not detected as an anomaly in the test set
schema.default_environment.append('TRAINING')
schema.default_environment.append('TESTING')

tfdv.get_feature(schema, 'EXTRA_BAGGAGE').not_in_environment.append('TESTING')

# Generate new statistics based on schema
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
stats_options.label_feature = 'EXTRA_BAGGAGE'
train_stats = tfdv.generate_statistics_from_dataframe(
    train_df,
    stats_options=stats_options,
)

# Check for anomalies in the test statistics
anomalies = tfdv.validate_statistics(test_stats, schema, environment='TESTING')
tfdv.display_anomalies(anomalies)

#options = tfdv.StatsOptions(schema=schema)
#anomalous_example_stats = tfdv.validate_examples_in_csv(data_location=TRAIN_DATA, stats_options=options)



#### Checking data skew and drift

In [None]:
tfdv.get_feature(schema, 'WEBSITE').skew_comparator.infinity_norm.threshold = 0.01
skew_anomalies = tfdv.validate_statistics(statistics=train_stats, schema=schema, serving_statistics=test_stats)
tfdv.display_anomalies(skew_anomalies)

#### Generate statistics on data slices

In [None]:
from tensorflow_data_validation.utils import slicing_util
slice_fn =  slicing_util.get_feature_value_slicer(features={'DEVICE': 'COMPUTER'})
stats_options = tfdv.StatsOptions(slice_functions=[slice_fn])

train_stats = tfdv.generate_statistics_from_dataframe(
    train_df,
    stats_options=stats_options,
)

tfdv.visualize_statistics(train_stats)


# Data preprocessing

In [None]:
%pip install -U tensorflow-transform
%pip install pyarrow

In [None]:
import os
PIPELINE_NAME = "extra-baggage"
DATA_ROOT = "train-data"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging

logging.set_verbosity(logging.ERROR)  # Set default logging level.

In [None]:
_trainer_module_file = 'extra_baggage_trainer.py'

In [None]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow_transform as tft
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

import tensorflow as tf
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx.components.trainer.fn_args_utils import FnArgs
from tfx_bsl.tfxio import dataset_options

logging.set_verbosity(logging.ERROR)  # Set default logging level.

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

_FEATURE_KEYS = ['WEBSITE','GDS','DEPARTURE','ARRIVAL',
      'ADULTS','CHILDREN','INFANTS','TRAIN','HAUL_TYPE',
      'DISTANCE','DEVICE','TRIP_TYPE','PRODUCT','SMS','NO_GDS'
  ]

_LABEL_KEY = 'EXTRA_BAGGAGE'


def _apply_preprocessing(raw_features, tft_layer):
  transformed_features = tft_layer(raw_features)  
  if _LABEL_KEY in raw_features:
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label
  else:
    return transformed_features, None

    
def _input_fn(file_pattern: List[str],
              data_accessor: DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size),
      tf_transform_output.raw_metadata.schema).repeat()

  transform_layer = tf_transform_output.transform_features_layer()
  
  def apply_transform(raw_features):    
    return _apply_preprocessing(raw_features, transform_layer)

  return dataset.map(apply_transform).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(1, activation='sigmoid')(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.BinaryCrossentropy(),
      metrics=[tf.keras.metrics.Accuracy()])

  model.summary(print_fn=logging.info)
  return model

# TFX Trainer will call this function.
def run_fn(fn_args: FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """  
  tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
  
  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)  

  model = _build_keras_model()
  print('*************', train_dataset)
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

In [None]:
_transform_module_file = 'transform.py' 

In [None]:

%%writefile {_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  
  # Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
  _VOCAB_SIZE = 1000
  # Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
  _OOV_SIZE = 10
  # Number of buckets used by tf.transform for encoding each feature.
  _FEATURE_BUCKET_COUNT = 10

  _FEATURE_KEYS = ['WEBSITE','GDS','DEPARTURE','ARRIVAL',
      'ADULTS','CHILDREN','INFANTS','TRAIN','HAUL_TYPE',
      'DISTANCE','DEVICE','TRIP_TYPE','PRODUCT','SMS','NO_GDS'
  ]

  _VOCAB_FEATURE_KEYS = ['DEPARTURE','ARRIVAL', 'WEBSITE','GDS', 
                        'TRAIN', 'TRIP_TYPE', 'DEVICE', 'SMS', 
                        'NO_GDS', 'HAUL_TYPE', 'PRODUCT', 'EXTRA_BAGGAGE']

  _CATEGORICAL_FEATURE_KEYS = ['ADULTS','CHILDREN','INFANTS']

  _DENSE_FLOAT_FEATURE_KEYS = ['DISTANCE']

  _BUCKET_FEATURE_KEYS = []

  _LABEL_KEY = 'EXTRA_BAGGAGE'
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # If sparse make it dense, setting nan's to 0 or '', and apply zscore.
    outputs[key] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[key] = tft.compute_and_apply_vocabulary(
            _fill_in_missing(inputs[key]),
            top_k=_VOCAB_SIZE,
            num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[key] = tft.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[key] = _fill_in_missing(inputs[key])  

  return outputs

  
def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  if not isinstance(x, tf.sparse.SparseTensor):
    return x

  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

In [None]:
from tfx import v1 as tfx
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, transform_module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:

  """Creates a pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  stats_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  schema_gen = tfx.components.SchemaGen(statistics=stats_gen.outputs['statistics'], exclude_splits=['eval'])

  example_validator = tfx.components.ExampleValidator(
    statistics=stats_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

  transform = tfx.components.Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=_transform_module_file)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))  

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      stats_gen,
      schema_gen,
      transform,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file =_trainer_module_file,
      transform_module_file =_transform_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

In [None]:
!find {SERVING_MODEL_DIR}

In [None]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,
                                          [metadata_store_pb2.Event.OUTPUT])

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [None]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs
SCHEMA_METADATA_PATH = METADATA_PATH
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

  transform_gen_output = get_latest_artifacts(metadata_handler,
                                           PIPELINE_NAME, 'Transform')
  transform_artifacts = transform_gen_output[standard_component_specs.TRANSFORM_GRAPH_KEY]

  #ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
  #                                 'ExampleValidator')
  #anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

In [None]:
visualize_artifacts(schema_artifacts)

In [None]:
model_dirs = (item for item in os.scandir(SERVING_MODEL_DIR) if item.is_dir())

model_path = max(model_dirs, key=lambda i: int(i.name)).path
loaded_model = tf.keras.models.load_model(model_path)
inference_fn = loaded_model.signatures['default']
print(inference_fn)

In [None]:



features = {
      "ARRIVAL": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"22/July"])),
      "TRAIN": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"FALSE"])),
      "CHILDREN": tf.train.Feature(int64_list=tf.train.Int64List(value=[0])),
      "ADULTS": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])),
      "INFANTS": tf.train.Feature(int64_list=tf.train.Int64List(value=[0])),
      "GDS": tf.train.Feature(int64_list=tf.train.Int64List(value=[1])),
      "TRIP_TYPE": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"TRIP_TYPE"])),
      "DISTANCE": tf.train.Feature(float_list=tf.train.FloatList(value=[3206.92])),
      "DEVICE": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"COMPUTER"])),
      "NO_GDS": tf.train.Feature(int64_list=tf.train.Int64List(value=[0])),
      "HAUL_TYPE": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"DOMESTIC"])),
      "WEBSITE": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"EDES"])),
      "DEPARTURE": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"22/July"])),
      "PRODUCT": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"TRIP"])),
      "SMS": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"TRUE"])),      
    }


example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples = example_proto.SerializeToString()

print(examples)
result = inference_fn(examples=tf.constant([examples]))
print(result['output'].numpy())

In [None]:
import tensorflow as tf
try: # detect TPUs
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  strategy = tf.distribute.experimental.TPUStrategy(tpu)
except ValueError: # detect GPUs
  strategy = tf.distribute.MirroredStrategy() # for GPU or multi-GPU machines
  #strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  #strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() # for clusters of multi-GPU machines

print("Number of accelerators: ", strategy.num_replicas_in_sync)