## Table of Contents
1. Install and Setup Airflow
2. Make Directories
3. Tensorflow Extended Module Files
4. Migrate Baseline Data
5. Create Airflow Pipeline File
6. Continuous Training


The goal of this notebook is to walkthrough a TFX pipeline orchestrated using Airflow. Since we prototyped the pipeline in the interactive pipeline, we can reuse a lot of the components to complete the pipeline.

In [None]:
import os

## 1. Install and Setup Airflow
To install Airflow, we need to define the location of Airflow and we can install it with pip.

In [None]:
export AIRFLOW_HOME=~/airflow
pip install apache-airflow

Once Airflow is installed, we need to initialize the Airflow database. Out of the box, Airflow will work with SQLite.

In [None]:
airflow initdb

The usual Airflow setup consists of an Airflow scheduler and web server. The scheduler is responsible for coordinate the tasks while the web server provides a user interface to interact with the tasks.

In [None]:
# Open a new terminal window
airflow webserver -p 8080

# Open in a separate terminal window
airflow scheduler

If everything is working then we should be able to access Airflow by going to http://127.0.0.1:8080. You may need enter the shell commands below to setup an account to proceed.

In [None]:
airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

## 2. Make Directories
To keep our project organized, we will move all associated files in the Airflow directory. 

In [None]:
os.mkdir(os.path.join(os.environ['HOME'], 'airflow' 'data', 'hot_dog_data'))

## 3. Tensorflow Extended Module Files
We have defined the module files in the interactive pipeline. We can move them into the Airflow folder for simplicity.

In [None]:
_hot_dog_transform = os.path.join(
    os.environ['HOME'], 'airflow', 'dags', 'hot_dog_transform.py'
)

In [None]:
%%writefile {_hot_dog_transform}

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import tensorflow as tf


def _process_image(raw_image):
    """Process a single image

    Parameters
    ----------
    raw_image : bytestring
        Encoded image string

    Returns
    -------
    tf.Tensor
        Decoded and resized image
    """
    raw_image = tf.reshape(raw_image, [])
    img_rgb = tf.image.decode_jpeg(raw_image, channels=3)
    img = tf.cast(img_rgb, dtype=tf.float32)
    resized_img = tf.image.resize_with_crop_or_pad(
        img, target_height=224, target_width=224,
    )
    
    return tf.reshape(resized_img, [224, 224, 3])
 
 
def preprocessing_fn(inputs):
    """Callback function for preprocessing inputs

    Serves as the entry point for TFX Transform component

    Parameters
    ----------
    inputs : nested tf.Tensor
        A batch of tensors to be processed

    Returns
    -------
    tf.Tensor
        Each tensor stacks the results of applying fn to tensors unstacked from 
        elems along the first dimension, from first to last
    """
    image_raw = inputs['image']
    label = inputs['label']
    # the pipeline processes images in batches
    # use the tf.map_fn to apply our user defined function to batch
    img_preprocessed=tf.map_fn(_process_image, image_raw, dtype=tf.float32)

    return {
      'image_xf': img_preprocessed,
      'label': label,
    }

In [None]:
_hot_dog_train = os.path.join(
    os.environ['HOME'], 'airflow', 'dags', 'hot_dog_train.py'
)

In [None]:
%%writefile {_hot_dog_train}

import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import tensorflow as tf
import tensorflow_transform as tft
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput


_LABEL_KEY = 'label'
_BATCH_SIZE = 32


def _input_fn(
    file_pattern, data_accessor, tf_transform_output, batch_size
):
    """Generates features and label for tuning/training

    Parameters
    ----------
    file_pattern : List[str]
        List of paths or patterns of input tfrecord files.
    data_accessor : tfx.components.DataAccessor
        DataAccessor for converting input to RecordBatch.
    tf_transform_output : tft.TFTransformOutput
        Output from Transform component
    batch_size : int
        representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns
    -------
    tf.data.Dataset
        A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=batch_size, label_key=_LABEL_KEY,
            shuffle_buffer_size=1200, shuffle_seed=123
        ),
        tf_transform_output.transformed_metadata.schema
    )
    
    return dataset


def _build_keras_model():
    """Create a Keras model

    Returns
    -------
    tf.keras.Model
        Model to be used during training
    """
    inputs = tf.keras.layers.Input(shape=(224, 224, 3), name='image_xf')
    base_model= tf.keras.applications.EfficientNetB0(
      include_top=False, weights='imagenet', input_tensor=inputs
    )

    # Rebuild top
    x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    x = tf.keras.layers.Dropout(0.2)(x)
    output = tf.keras.layers.Dense(3, activation='softmax', name='label')(x)

    # Compile
    model = tf.keras.Model(inputs=inputs, outputs=output)

    model.compile(
          loss=tf.keras.losses.CategoricalCrossentropy(),
          optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
          metrics=['accuracy']
    )

    return model


def _get_serve_tf_examples_fn(model, tf_transform_output):
    """"Returns a function that parses a serialized tf.Example

    Parameters
    ----------
    model : tf.keras.Model
        Model to be used during training
    tf_transform_output : TFTransformOutput
        Output from Transform component

    Returns
    -------
    tf.function
        serve_tf_examples_fn
    """

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature

        Parameters
        ----------
        serialized_tf_examples : tf.Example
            Serialized tf.Example to be processed

        Returns
        -------
        dict
            Serving signature
        """
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(_LABEL_KEY)
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, feature_spec
        )

        transformed_features = model.tft_layer(parsed_features)

        outputs = model(transformed_features)
        return {"outputs": outputs}

    return serve_tf_examples_fn


# TFX Trainer will call this function.
def run_fn(fn_args):
    """Train the model based on given args

    Parameters
    ----------
    fn_args : _type_
        Arguments used to train the model as name/value pairs.
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(
      fn_args.train_files, fn_args.data_accessor, 
      tf_transform_output, _BATCH_SIZE
    )
    eval_dataset = _input_fn(
      fn_args.eval_files, fn_args.data_accessor, 
      tf_transform_output, _BATCH_SIZE
    )

    model = _build_keras_model()

    early_stop = tf.keras.callbacks.EarlyStopping(
      monitor='val_accuracy', patience=3
    )

    model.fit(
        train_dataset,
        epochs=20,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        callbacks=[early_stop]
    )
    
    signatures = {
          "serving_default": _get_serve_tf_examples_fn(
              model, tf_transform_output
          ).get_concrete_function(
              tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
          ),
      }
    model.save(
        fn_args.serving_model_dir, save_format="tf", signatures=signatures
    )

## 4. Migrate Baseline Data
Similar to the interactive pipeline, we can use the baseline data set to get an initial model.

In [None]:
import shutil

shutil.copy(
    '../data/processed/baseline.tfrecord', 
    os.path.join(
        os.environ['HOME'], 'airflow', 'data', 'hot_dog_data', 
        'baseline.tfrecord'
    )
)

## 5. Create Airflow Pipeline File
We can reuse the components of the interactive pipeline. For an Airflow pipeline, the TFX components get wrapped in a single pipeline function 

In [None]:
_hot_dog_pipeline = os.path.join(
    os.environ['HOME'], 'airflow', 'dags', 'hot_dog_pipeline.py'
)

In [None]:
%%writefile {_hot_dog_pipeline}

"""TFX pipeline orchestrated with Airflow"""

import datetime
import os
from typing import List
#from tfx.proto import example_gen_pb2
from tfx import v1 as tfx
import tensorflow_model_analysis as tfma
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
from tfx.orchestration.airflow.airflow_dag_runner import AirflowPipelineConfig


_pipeline_name = 'hot_dog_classifier'
_airflow_root = os.path.join(os.environ['HOME'], 'airflow')

# Location where data is stored; defined earlier in this notebook
_data_root = os.path.join(_airflow_root, 'data', 'hot_dog_data')

# Location where module files are stored; defined earlier in this notebook
_transform_module_file = os.path.join(
    _airflow_root, 'dags', 'hot_dog_transform.py'
)
_trainer_module_file = os.path.join(
    _airflow_root, 'dags', 'hot_dog_train.py'
)

# Location to output the trained model
_serving_model_dir = os.path.join(_airflow_root, 'models', 'taco_bias')

# Location to save artifacts and metadata
_tfx_root = os.path.join(_airflow_root, 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
# Sqlite ML-metadata db path.
_metadata_path = os.path.join(
    _tfx_root, 'metadata', _pipeline_name, 'metadata.db'
)

_beam_pipeline_args = [
    '--direct_running_mode=multi_processing',
    # setting direct_num_workers=0 will auto-detect available number of CPUs
    '--direct_num_workers=0',
]

# Airflow-specific configs; these will be passed directly to airflow
# we can control the frequency of continuous training using schedule_interval
_airflow_config = {
    'schedule_interval': '0 */2 * * *',
    'start_date': datetime.datetime(2019, 1, 1)
}


def _create_pipeline(
    pipeline_name, pipeline_root, data_root, transform_module_file, 
    trainer_module_file, serving_model_dir, metadata_path, beam_pipeline_args
):
    """Create a TFX Pipeline"""
    example_gen = tfx.components.ImportExampleGen(input_base=data_root)

    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs['examples']
    )
    
    schema_gen = tfx.components.SchemaGen(
        statistics=statistics_gen.outputs['statistics'],
        infer_feature_shape=True
    )
    
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )
    
    transform = tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=os.path.abspath(transform_module_file)
    )
  
    # Uses user-provided Python function that implements a model.
    trainer = tfx.components.Trainer(
        #custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
        module_file=os.path.abspath(trainer_module_file),
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=tfx.proto.TrainArgs(num_steps=6),
        eval_args=tfx.proto.EvalArgs(num_steps=2)
    )

    model_resolver = tfx.dsl.Resolver(
        strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
        model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
        model_blessing=tfx.dsl.Channel(
            type=tfx.types.standard_artifacts.ModelBlessing
        )
    ).with_id('latest_blessed_model_resolver')

    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='label')],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=[
            tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(
                    class_name='CategoricalAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value': 0.55}),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value': -1e-3}
                        )
                    )
                )
            ])
        ]
    )
    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )  
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        )
    )
    return pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=[
            example_gen,
            statistics_gen,
            schema_gen,
            example_validator,
            transform,
            trainer,
            model_resolver,
            evaluator,
            pusher,
        ],
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ), 
        beam_pipeline_args=beam_pipeline_args
    )

# 'DAG' below need to be kept for Airflow to detect dag.
DAG = AirflowDagRunner(AirflowPipelineConfig(_airflow_config)).run(
    _create_pipeline(
        pipeline_name=_pipeline_name,
        pipeline_root=_pipeline_root,
        data_root=_data_root,
        transform_module_file=_transform_module_file,
        trainer_module_file=_trainer_module_file,
        serving_model_dir=_serving_model_dir,
        metadata_path=_metadata_path,
        beam_pipeline_args=_beam_pipeline_args
    )
)

## 6. Continuous Training
The Airflow model is scheduled to rerun at regular intervals. We can simulate continuous training by adding the taco bias data to folder with the baseline data set.

In [None]:
import shutil

shutil.copy(
    '../data/processed/taco_bias.tfrecord', 
    os.path.join(
        os.environ['HOME'], 'airflow', 'data', 'hot_dog_data', 
        'taco_bias.tfrecord'
    )
)

Alternatively, an Airflow DAG can be set up to do this process automatically. The `sensor_task` watches to see if any file is created in the path `~/airflow/models/taco_bias`. Once a new file is created, `python_task` uses `shutil.copy` to move the tfrecord file from `~/Documents/GitHub/hot_dog_classifier/data/processed/taco_bias.tfrecord` to `~/airflow/data/hot_dog_data` in anticipation of the next run.

In [None]:
_move_file = os.path.join(
    os.environ['HOME'], 'airflow', 'dags', 'move_file.py'
)

In [None]:
%%writefile {_move_file}

"""Transfer TFRecord once model is completed"""

from airflow.contrib.sensors.file_sensor import FileSensor
import datetime
from airflow.operators.python_operator import PythonOperator
import airflow
import shutil
import os
default_args = {
    'start_date': datetime.datetime(2019, 1, 1)
}

with airflow.DAG(
    'move_file', default_args= default_args, schedule_interval= '@once'
) as dag:
    sensor_task = FileSensor(
        task_id= 'file_sensor_task', poke_interval=30,  filepath=os.path.join(
                os.environ['HOME'], 'airflow', 'models', 'taco_bias', '*'
            )
        )
    python_task = PythonOperator(
        task_id='move_file',
        python_callable=shutil.copy,
        op_kwargs={
            'src': os.path.join(
                os.environ['HOME'], 'Documents', 'GitHub', 
                'hot_dog_classifier', 'data', 'processed', 'taco_bias.tfrecord'
            ), 
            'dst': os.path.join(
                os.environ['HOME'], 'airflow', 'data', 'hot_dog_data'
            )
        }
    )

sensor_task >> python_task

Once the model is retrained, we can save it as part of our project.

In [None]:
from distutils.dir_util import copy_tree

copy_tree(
    os.path.join(os.environ['HOME'], 'airflow', 'models', 'taco_bias'), 
    '../data/models/taco_bias'
)