# Data preprocessing and Feature Engineering with TFT

In this lab, we use [TensorFlow Transform](https://www.tensorflow.org/tfx/guide/tft) (TFT) to perform the following:

1. **Implement** transformation logic in **preprocess_fn.
2. **Implement** a Beam pipeline:
 1. **Analyze** and **transform** training data.
 2. **Transform** evaluation data.
3. **Run** pipeline to produce the transformed **data** and transform **artifacts**.

![alt text](imgs/tft.png "Data transformation and feature engineering")

In [3]:
#!pip install -q tensorflow_transform

In [28]:
import os
import apache_beam as beam
import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils
print('TF version: {}'.format(tf.__version__))
print('TFDV version: {}'.format(tfdv.__version__))
print('TFT version: {}'.format(tft.__version__))
print('Apache Beam version: {}'.format(beam.__version__))

TF version: 1.15.0
TFDV version: 0.15.0
TFT version: 0.15.0
Apache Beam version: 2.16.0


In [8]:
WORKSPACE = 'workspace' # you can set to a GCS location
DATA_DIR = os.path.join(WORKSPACE, 'raw_data')
TRAIN_DATA_FILE = os.path.join(DATA_DIR,'train.csv')
EVAL_DATA_FILE = os.path.join(DATA_DIR,'eval.csv')
RAW_SCHEMA_LOCATION = os.path.join(WORKSPACE, 'raw_schema.pbtxt')

## 1. Implement transformation logic
We make use of the raw schema to perform metadata-driven feature handling, as follows:
1. Scale numeric features with z-score
2. Integerise categorical features

Ather transformations can be performed, including bucketization, polynomial expantion, clipping, or custom formulas.

In [36]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
          'marital_status', 'occupation', 'relationship', 'race', 'gender',
          'capital_gain', 'capital_loss', 'hours_per_week',
          'native_country', 'income_bracket']
TARGET_FEATURE_NAME = 'income_bracket'
WEIGHT_COLUMN_NAME = 'fnlwgt'

In [46]:
def make_preprocessing_fn(raw_schema):
        
    def preprocessing_fn(input_features):
        
        processed_features = {}
        
        for feature in raw_schema.feature:
            feature_name = feature.name

            # Pass the target feature as is.
            if feature_name == TARGET_FEATURE_NAME:
                processed_features[feature_name] = input_features[feature_name]
                continue

            if feature.type == 1:
                # Extract vocabulary and integerize categorical features.
                processed_features[feature_name + "_integerized"] = tft.compute_and_apply_vocabulary(
                    input_features[feature_name], vocab_filename=feature_name)
            else:
                # normalize numeric features.
                processed_features[feature_name + "_scaled"] = tft.scale_to_z_score(
                    input_features[feature_name])

        # Pass the weight column
        processed_features[WEIGHT_COLUMN_NAME] = input_features[WEIGHT_COLUMN_NAME]

        # Bucketize age using quantiles. 
        quantiles = tft.quantiles(input_features["age"], num_buckets=5, epsilon=0.01)
        processed_features["age_bucketized"] = tft.apply_buckets(
          input_features["age"], bucket_boundaries=quantiles)
        
        # Feature creation
        education_to_age_ratio = input_features["age"] / input_features["education_num"]
        capital_indicator = input_features['capital_gain'] > input_features['capital_loss']
        processed_features['education_to_age_ratio'] = tf.cast(education_to_age_ratio, tf.float32)
        processed_features['capital_indicator'] =tf.cast(capital_indicator, tf.int64)
    
        return processed_features

    return preprocessing_fn

## 2. Implement a Beam pipeline 

In [49]:
def run_pipeline(args):
    
    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
    
    raw_schema_location = args['raw_schema_location']
    raw_train_data_location = args['raw_train_data_location']
    raw_eval_data_location = args['raw_eval_data_location']
    transformed_train_data_location = args['transformed_train_data_location']
    transformed_eval_data_location = args['transformed_eval_data_location']
    transform_artefact_location = args['transform_artefact_location']
    temporary_dir = args['temporary_dir']
    runner = args['runner']

    # Load TFDV schema and create tft schema from it.
    source_raw_schema = tfdv.load_schema_text(raw_schema_location)
    raw_feature_spec = schema_utils.schema_as_feature_spec(source_raw_schema).feature_spec
    
    # Since the raw_feature_spec doesn't include the weight column, we need to add it. 
    raw_feature_spec[WEIGHT_COLUMN_NAME] = tf.FixedLenFeature(
        shape=[1], dtype=tf.int64, default_value=None)
    
    raw_metadata = dataset_metadata.DatasetMetadata(
      dataset_schema.from_feature_spec(raw_feature_spec))

    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        with tft_beam.Context(temporary_dir):
            
            converter = tft.coders.CsvCoder(column_names=HEADER, 
                schema=raw_metadata.schema)
            
            ###### analyze & transform trainining data ###############################

            # Read raw training csv data.
            step = 'Train'
            
            raw_train_data = (
                pipeline
                  | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_train_data_location)
                  | '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
                  | '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
            )
      
            # Create a train dataset from the data and schema.
            raw_train_dataset = (raw_train_data, raw_metadata)

            # Analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn.
            transformed_train_dataset, transform_fn = (
                raw_train_dataset 
                | '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
                      make_preprocessing_fn(source_raw_schema))
            )
  
            # Get data and schema separately from the transformed_train_dataset.
            transformed_train_data, transformed_metadata = transformed_train_dataset

            # write transformed train data to sink.
            _ = (
                transformed_train_data 
                | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=transformed_train_data_location,
                    file_name_suffix=".tfrecords",
                    coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
            )

            ###### transform evaluation data #########################################

            # Read raw training csv data.
            step = 'Eval'

            raw_eval_data = (
            pipeline
              | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_eval_data_location)
              | '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
              | '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
            )
      
            # Create a eval dataset from the data and schema.
            raw_eval_dataset = (raw_eval_data, raw_metadata)

            # Transform eval data based on produced transform_fn.
            transformed_eval_dataset = (
                (raw_eval_dataset, transform_fn) 
                | '{} - Transform'.format(step) >> tft_beam.TransformDataset()
            )

            # Get data from the transformed_eval_dataset.
            transformed_eval_data, _ = transformed_eval_dataset

            # Write transformed eval data to sink.
            _ = (
                transformed_eval_data 
                | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=transformed_eval_data_location,
                    file_name_suffix=".tfrecords",
                    coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
            )

            ###### write transformation metadata #######################################################

            # Write transform_fn.
            _ = (
              transform_fn 
              | 'Write Transform Artefacts' >> tft_beam.WriteTransformFn(
                  transform_artefact_location)
            )
            

## 3. Run Data Transformation Pipeline

In [34]:
TRANSFORM_ARTEFACTS_DIR = os.path.join(WORKSPACE,'transform_artifacts')
TRANSFORMED_DATA_DIR = os.path.join(WORKSPACE,'transformed_data')
TEMP_DIR = os.path.join(WORKSPACE, 'tmp')

runner = 'DirectRunner'

args = {
    
    'runner': runner,

    'raw_schema_location': RAW_SCHEMA_LOCATION,

    'raw_train_data_location': TRAIN_DATA_FILE,
    'raw_eval_data_location': EVAL_DATA_FILE,

    'transformed_train_data_location':  os.path.join(TRANSFORMED_DATA_DIR, "train"),
    'transformed_eval_data_location':  os.path.join(TRANSFORMED_DATA_DIR, "eval"),
    'transform_artefact_location':  TRANSFORM_ARTEFACTS_DIR,
    
    'temporary_dir': TEMP_DIR
}

In [50]:
from tensorflow.io import gfile

if gfile.exists(TEMP_DIR):
    print("Removing {} contents...".format(TEMP_DIR))
    gfile.rmtree(TRANSFORMED_DATA_DIR)
    
if gfile.exists(TRANSFORMED_DATA_DIR):
    print("Removing {} contents...".format(TRANSFORMED_DATA_DIR))
    gfile.rmtree(TRANSFORMED_DATA_DIR)
          
if gfile.exists(TRANSFORM_ARTEFACTS_DIR):
    print("Removing {} contents...".format(TRANSFORM_ARTEFACTS_DIR))
    gfile.rmtree(TRANSFORM_ARTEFACTS_DIR)

tf.logging.set_verbosity(tf.logging.ERROR)
print("Running TF Transform pipeline...")
print("")
%time run_pipeline(args)
print("")
print("Pipeline is done.")

Removing workspace/transformed_data contents...
Removing workspace/transform_artifacts contents...
Running TF Transform pipeline...





CPU times: user 59.9 s, sys: 744 ms, total: 1min
Wall time: 1min 2s

Pipeline is done.


## Check TFT outputs

In [55]:
!ls {TRANSFORM_ARTEFACTS_DIR}/*

workspace/transform_artifacts/transform_fn:
assets	saved_model.pb	variables

workspace/transform_artifacts/transformed_metadata:
schema.pbtxt


In [66]:
tft_output = tft.TFTransformOutput(TRANSFORM_ARTEFACTS_DIR)
transform_feature_spec = tft_output.transformed_feature_spec()
transform_feature_spec

{'age_bucketized': FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None),
 'age_scaled': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'capital_gain_scaled': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'capital_indicator': FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None),
 'capital_loss_scaled': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'education_integerized': FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None),
 'education_num_scaled': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'education_to_age_ratio': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'fnlwgt': FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None),
 'fnlwgt_scaled': FixedLenFeature(shape=[1], dtype=tf.float32, default_value=None),
 'gender_integerized': FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None),
 'hours_per_week_scaled': FixedLenFeature(shape=[1

In [57]:
tf.enable_eager_execution()

In [85]:
def _parse_example(example):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example, transform_feature_spec)

dataset = tf.data.TFRecordDataset(TRANSFORMED_DATA_DIR + "/train-00000-of-00001.tfrecords")
for record in dataset.take(5).map(_parse_example):
    print(record)
    print("")


{'capital_indicator': <tf.Tensor: id=7633, shape=(1,), dtype=int64, numpy=array([1])>, 'education_integerized': <tf.Tensor: id=7635, shape=(1,), dtype=int64, numpy=array([2])>, 'gender_integerized': <tf.Tensor: id=7640, shape=(1,), dtype=int64, numpy=array([0])>, 'education_to_age_ratio': <tf.Tensor: id=7637, shape=(1,), dtype=float32, numpy=array([3.], dtype=float32)>, 'capital_loss_scaled': <tf.Tensor: id=7634, shape=(1,), dtype=float32, numpy=array([-0.21665958], dtype=float32)>, 'marital_status_integerized': <tf.Tensor: id=7643, shape=(1,), dtype=int64, numpy=array([1])>, 'capital_gain_scaled': <tf.Tensor: id=7632, shape=(1,), dtype=float32, numpy=array([0.14845291], dtype=float32)>, 'relationship_integerized': <tf.Tensor: id=7647, shape=(1,), dtype=int64, numpy=array([1])>, 'workclass_integerized': <tf.Tensor: id=7648, shape=(1,), dtype=int64, numpy=array([4])>, 'race_integerized': <tf.Tensor: id=7646, shape=(1,), dtype=int64, numpy=array([0])>, 'fnlwgt_scaled': <tf.Tensor: id=763