<a href="https://colab.research.google.com/github/jeyasri-s2/SJSU297-AdvancedDL/blob/master/Assignment6/TFX_Iris.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Goal
## Implement MLOps in IRIS dataset

## Setup
First, we install and import the necessary packages, set up paths, and download data.

### Upgrade Pip

To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab.  Local systems can of course be upgraded separately.

In [None]:
try:
  import colab
  !pip install --upgrade pip
except:
  pass

### Install TFX

**Note: In Google Colab, because of package updates, the first time you run this cell you must restart the runtime (Runtime > Restart runtime ...).**

In [None]:
!pip install -q -U --use-feature=2020-resolver tfx

In [None]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input


%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

Let's check the library versions.

In [None]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

### Set up pipeline paths

In [None]:
# This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]

# This is the directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')

# This is the path where your model will be pushed for serving.
_serving_model_dir = os.path.join(
    tempfile.mkdtemp(), 'serving_model/taxi_simple')

In [None]:
_data_root = tempfile.mkdtemp(prefix='tfx-iris')
DATA_PATH = 'https://raw.githubusercontent.com/valeriano-manassero/tfx-kubeflow-pipelines/master/iris/data/iris.csv'
_data_filepath = os.path.join(_data_root, "iris.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

Take a quick look at the CSV file.

In [None]:
!head {_data_filepath}

### Create the InteractiveContext
Last, we create an InteractiveContext, which will allow us to run TFX components interactively in this notebook.

In [None]:
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
context = InteractiveContext()

# ExampleGen

In [None]:
example_gen = CsvExampleGen(input=external_input(_data_root))
context.run(example_gen)

Let's examine the output artifacts of `ExampleGen`. This component produces two artifacts, training examples and evaluation examples:

#StatisticsGen

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

After `StatisticsGen` finishes running, we can visualize the outputted statistics. Try playing with the different plots!

# SchemaGen


In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

After `SchemaGen` finishes running, we can visualize the generated schema as a table.

In [None]:
context.show(schema_gen.outputs['schema'])

# ExampleValidator


In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

After `ExampleValidator` finishes running, we can visualize the anomalies as a table.

In [None]:
context.show(example_validator.outputs['anomalies'])

# Transform




In [None]:
!head {_data_filepath}

In [None]:
_iris_transform_module_file = 'iris_utils.py'

In [None]:
%%writefile {_iris_transform_module_file}

from typing import Text

import absl
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft

from tfx.components.trainer.executor import TrainerFnArgs


_FEATURE_KEYS = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
_LABEL_KEY = 'species'


def _transformed_name(key):
    return key + '_xf'


def _gzip_reader_fn(filenames):
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')


def _get_serve_tf_examples_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(_LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        transformed_features.pop(_transformed_name(_LABEL_KEY))
        return model(transformed_features)

    return serve_tf_examples_fn


def _input_fn(file_pattern: Text, tf_transform_output: tft.TFTransformOutput, batch_size: int = 200) -> tf.data.Dataset:
    transformed_feature_spec = (tf_transform_output.transformed_feature_spec().copy())
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=_gzip_reader_fn,
        label_key=_transformed_name(_LABEL_KEY))
    return dataset


def _build_keras_model() -> tf.keras.Model:
    inputs = [keras.layers.Input(shape=(1,), name=_transformed_name(f)) for f in _FEATURE_KEYS]
    d = keras.layers.concatenate(inputs)
    for _ in range(3):
        d = keras.layers.Dense(8, activation='relu')(d)
    output = keras.layers.Dense(3, activation='softmax')(d)

    model = keras.Model(inputs=inputs, outputs=output)
    model.compile(
        optimizer=keras.optimizers.Adam(lr=0.001),
        loss='binary_categorical_crossentropy',
        metrics=[keras.metrics.Accuracy()])

    model.summary(print_fn=absl.logging.info)
    return model


def _build_keras_model2() -> tf.keras.Model:
    model = keras.Sequential()
    model.add(keras.layers.Dense(8, input_dim=4, activation='relu'))
    model.add(keras.layers.Dense(10, activation='relu'))
    model.add(keras.layers.Dense(8, activation='relu'))
    model.add(keras.layers.Dense(3, activation='softmax'))
    model.compile(
        optimizer=keras.optimizers.Adam(lr=0.001),
        loss='categorical_crossentropy',
        metrics=[keras.metrics.CategoricalAccuracy()])

    model.summary(print_fn=absl.logging.info)
    return model


def preprocessing_fn(inputs):
    outputs = {}

    for key in _FEATURE_KEYS:
        outputs[_transformed_name(key)] = tft.scale_to_z_score(inputs[key])
    outputs[_transformed_name(_LABEL_KEY)] = inputs[_LABEL_KEY]

    return outputs


def run_fn(fn_args: TrainerFnArgs):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
    eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)

    mirrored_strategy = tf.distribute.MirroredStrategy()
    with mirrored_strategy.scope():
        model = _build_keras_model()

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(model,
                                      tf_transform_output).get_concrete_function(
                tf.TensorSpec(
                    shape=[None],
                    dtype=tf.string,
                    name='examples')),
    }

    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Now, we pass in this feature engineering code to the `Transform` component and run it to transform your data.

In [None]:
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_iris_transform_module_file)
context.run(transform)

In [None]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

# Trainer


In [None]:
trainer = Trainer(
      module_file=_iris_transform_module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      train_args=trainer_pb2.TrainArgs(num_steps=500),
      eval_args=trainer_pb2.EvalArgs(num_steps=200))

context.run(trainer)

# Evaluator


In [None]:
model_resolver = ResolverNode(instance_name='latest_blessed_model_resolver',
                                  resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
                                  model=Channel(type=Model),
                                  model_blessing=Channel(type=ModelBlessing))

In [None]:
eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='label')],
      slicing_specs=[tfma.SlicingSpec()],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.MetricConfig(
                  class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          # Increase this threshold when training on complete
                          # dataset.
                          lower_bound={'value': 0.4}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-2})))
          ])
      ])

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    # Change threshold will be ignored if there is no baseline (first run).
    eval_config=eval_config)

context.run(evaluator)

# Pusher

In [None]:
pusher = Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=_serving_model_dir)))

context.run(pusher)