<h2>Pre-processing pipeline</h2>
<h3>Convert the input CSV files to TF records after pre-processing</h3>

In [1]:
import tensorflow as tf                                                                                                                                                                                             
from tensorflow import data
import tensorflow_transform as tft 
import tensorflow_model_analysis as tfma
import tensorflow_transform.coders as tft_coders

from tensorflow_transform.beam import impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils

from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io

import apache_beam as beam

import os
import params
import featurizer
import metadata

  from ._conv import register_converters as _register_converters


<h3>Transform features </h3>
<h3>tft.string_to_int uses an analyzer to compute the unique values taken by the input strings, and then uses TensorFlow operations to convert the input strings to indices in the table of unique values.</h3>
<h3>
top_k: Limit the generated vocabulary to the first `top_k` elements. If set
      to None, the full vocabulary is generated.
</h3>


In [2]:
NUM_BUCKETS = 4                                                                                                                                                                                                     

def preprocess(input_features):

    output_features = {}

    output_features[metadata.TARGET_FEATURE_NAME] = input_features[metadata.TARGET_FEATURE_NAME]

    for feature_name in metadata.NUMERIC_FEATURE_NAMES:

        #output_features[feature_name+"_scaled"] = tft.scale_to_z_score(input_features[feature_name])
        output_features[feature_name] = tft.scale_to_z_score(input_features[feature_name])

        quantiles = tft.quantiles(input_features[feature_name], num_buckets=NUM_BUCKETS, epsilon=0.01)
        output_features[feature_name+"_bucketized"] = tft.apply_buckets(input_features[feature_name],
                                                                        bucket_boundaries=quantiles)

    for feature_name in metadata.CATEGORICAL_FEATURE_NAMES:

        tft.uniques(input_features[feature_name], vocab_filename=feature_name)
        output_features[feature_name] = input_features[feature_name]

        # sba added this
        #output_features[feature_name+"_integerized"] = tft.string_to_int(input_features[feature_name],
                                                           #vocab_filename=feature_name)
    for feature_name in metadata.VOCAB_FEATURE_NAMES:

        output_features[feature_name +"_integerized"] = tft.string_to_int(input_features[feature_name],top_k=metadata.VOCAB_SIZE, num_oov_buckets=metadata.OOV_SIZE, vocab_filename=feature_name)
                                                           


    return output_features

<h3>Handle errors during the transformation</h3>

In [3]:
class MapAndFilterErrors(beam.PTransform):
  """Like beam.Map but filters out erros in the map_fn."""

  class _MapAndFilterErrorsDoFn(beam.DoFn):
    """Count the bad examples using a beam metric."""

    def __init__(self, fn):
      self._fn = fn
      # Create a counter to measure number of bad elements.
      self._bad_elements_counter = beam.metrics.Metrics.counter(
          'carnival_example', 'bad_elements')

    def process(self, element):
      try:
        yield self._fn(element)
      except Exception:  # pylint: disable=broad-except
        # Catch any exception the above call.
        self._bad_elements_counter.inc(1)

  def __init__(self, fn):
    self._fn = fn

  def expand(self, pcoll):
    return pcoll | beam.ParDo(self._MapAndFilterErrorsDoFn(self._fn))




In [4]:
def fix_comma_and_filter_third_column(line):
    # to avoid namespace error with DataflowRunner the import of csv is done
    # locacally see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors
    import csv
    cols = list(csv.reader([line], skipinitialspace=True,))[0]
    
    # Remove the customer number and sail date     
    return '\t'.join(cols[0:3] + cols[5:])


In [5]:
def run_transformation_pipeline(runner, options):

    options = beam.pipeline.PipelineOptions(flags=[], **options)

    print("Source raw train data files: {}".format(params.Params.RAW_TRAIN_DATA_FILE))
    print("Source raw train data files: {}".format(params.Params.RAW_EVAL_DATA_FILE))

    print("Sink transformed train data files: {}".format(params.Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX))
    print("Sink transformed data files: {}".format(params.Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX))
    print("Sink transform artefacts directory: {}".format(params.Params.TRANSFORM_ARTEFACTS_DIR))

    print("Temporary directory: {}".format(params.Params.TEMP_DIR))
    print("")


    with beam.Pipeline(runner, options=options) as pipeline:
        with impl.Context(params.Params.TEMP_DIR):

            raw_metadata = featurizer.create_raw_metadata()
            converter = tft_coders.csv_coder.CsvCoder(column_names=metadata.RAW_FEATURE_NAMES,
                                                      delimiter=params.Params.RAW_DATA_DELIMITER,
                                                      schema=raw_metadata.schema)

            ###### analyze & transform train #########################################################
            if(runner=='DirectRunner'):
                print("Transform training data....")

            step = 'train'

            # Read raw train data from csv files
            raw_train_data = (
              pipeline
              | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(params.Params.RAW_TRAIN_DATA_FILE)
              | '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
              | '{} - FixCommasAndRemoveFiledTestData'.format(step) >> beam.Map(fix_comma_and_filter_third_column)
              | '{} - Decode CSV Data'.format(step) >> MapAndFilterErrors(converter.decode)

            )
            
            # create a train dataset from the data and schema
            raw_train_dataset = (raw_train_data, raw_metadata)

            # analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn
            transformed_train_dataset, transform_fn = (
                raw_train_dataset
                | '{} - Analyze & Transform'.format(step) >> impl.AnalyzeAndTransformDataset(preprocess)
            )

            # get data and schema separately from the transformed_train_dataset
            transformed_train_data, transformed_metadata = transformed_train_dataset

            # write transformed train data to sink
            _ = (
                transformed_train_data
                | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=params.Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX,
                    file_name_suffix=".tfrecords",
                    coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            )

            ###### transform eval ##################################################################

            if(runner=='DirectRunner'):
                print("Transform eval data....")

            step = 'eval'

            raw_eval_data = (
              pipeline
              | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(params.Params.RAW_EVAL_DATA_FILE)
              | '{} - Remove Empty Lines'.format(step) >> beam.Filter(lambda line: line)
              | '{} - FixCommasAndRemoveFiledTestData'.format(step) >> beam.Map(fix_comma_and_filter_third_column)
              | '{} - Decode CSV Data'.format(step) >> MapAndFilterErrors(converter.decode)

            )

            # create a eval dataset from the data and schema
            raw_eval_dataset = (raw_eval_data, raw_metadata)

            # transform eval data based on produced transform_fn (from analyzing train_data)
            transformed_eval_dataset = (
                (raw_eval_dataset, transform_fn)
                | '{} - Transform'.format(step) >> impl.TransformDataset()
            )

            # get data from the transformed_eval_dataset
            transformed_eval_data, _ = transformed_eval_dataset

            # write transformed eval data to sink
            _ = (
                transformed_eval_data
                | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
                    file_path_prefix=params.Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX,
                    file_name_suffix=".tfrecords",
                    coder=tft_coders.example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            )


            ###### write transformation metadata #######################################################
            if(runner=='DirectRunner'):
                print("Saving transformation artefacts ....")

            # write transform_fn as tf.graph
            _ = (
                transform_fn
                | 'Write Transform Artefacts' >> transform_fn_io.WriteTransformFn(params.Params.TRANSFORM_ARTEFACTS_DIR)
            )

    if runner=='DataflowRunner':
        pipeline.run()

<h2>Run the pipeline locally or on Dataflow </h2>

In [None]:
import shutil
from datetime import datetime

if params.Params.TRANSFORM:
    
    tf.logging.set_verbosity(tf.logging.ERROR)

    runner = 'DirectRunner' # DirectRunner | DataflowRunner

    job_name = 'preprocess-data-tft-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))
    print 'Launching {} job {} ... hang on'.format(runner, job_name)
    print("")

    options = {
        'region': 'europe-west1',
        'staging_location': os.path.join(params.Params.DATA_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(params.Params.DATA_DIR, 'tmp'),
        'job_name': job_name,
        'project': params.Params.GCP_PROJECT_ID,
        'worker_machine_type': 'n1-standard-2',
        'max_num_workers': 20,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'requirements_file': 'requirements.txt',
    }

    if runner == 'DirectRunner':

        shutil.rmtree(params.Params.TRANSFORM_ARTEFACTS_DIR, ignore_errors=True)
        shutil.rmtree(params.Params.TRANSFORMED_DATA_DIR, ignore_errors=True)
        shutil.rmtree(params.Params.TEMP_DIR, ignore_errors=True)


        run_transformation_pipeline(runner, options)
        print("Transformation is done!")
else:
    print("Transformation was skipped!")
