## Import required packages and modules

In [1]:
import os

import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft

from tensorflow_transform.beam import impl
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io

In [2]:
!pip list | grep 'tensorflow'
!pip list | grep 'beam'
!pip list | grep 'cloud-dataflow'

tensorflow                         1.10.0     
tensorflow-hub                     0.1.0      
tensorflow-model-analysis          0.6.0      
tensorflow-transform               0.8.0      
apache-beam                        2.5.0      
google-cloud-dataflow              2.5.0      


## Raw data metadaya

In [3]:
CATEGORICAL_FEATURE_NAMES = ['is_male', 'mother_race']
NUMERIC_FEATURE_NAMES = ['mother_age', 'plurality', 'gestation_weeks']
TARGET_FEATURE_NAME = 'weight_pounds'
KEY_COLUMN = 'key'

def create_raw_metadata():  
    
    raw_data_schema = {}
    
    # key feature scehma
    raw_data_schema[KEY_COLUMN]= dataset_schema.ColumnSchema(
        tf.float32, [], dataset_schema.FixedColumnRepresentation())
    
    # target feature scehma
    raw_data_schema[TARGET_FEATURE_NAME]= dataset_schema.ColumnSchema(
        tf.float32, [], dataset_schema.FixedColumnRepresentation())
    
    # categorical features scehma
    raw_data_schema.update({ column_name : dataset_schema.ColumnSchema(
        tf.string, [], dataset_schema.FixedColumnRepresentation())
                            for column_name in CATEGORICAL_FEATURE_NAMES})
    
    # numerical features scehma
    raw_data_schema.update({ column_name : dataset_schema.ColumnSchema(
        tf.float32, [], dataset_schema.FixedColumnRepresentation())
                            for column_name in NUMERIC_FEATURE_NAMES})
    
      # create dataset_metadata given raw_schema
    raw_metadata = dataset_metadata.DatasetMetadata(
        dataset_schema.Schema(raw_data_schema))
    
    return raw_metadata

import pprint
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(create_raw_metadata().schema.as_feature_spec())

{   'gestation_weeks': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
    'is_male': FixedLenFeature(shape=[], dtype=tf.string, default_value=None),
    'key': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
    'mother_age': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
    'mother_race': FixedLenFeature(shape=[], dtype=tf.string, default_value=None),
    'plurality': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
    'weight_pounds': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None)}


## Source query

In [4]:
def get_source_query(step, data_size):
    
    train_size = data_size * 0.7
    eval_size = data_size * 0.3
    
    query = """
    SELECT
      ROUND(weight_pounds,1) AS weight_pounds,
      is_male,
      mother_age,
      mother_race,
      plurality,
      gestation_weeks,
      FARM_FINGERPRINT( 
        CONCAT(
          COALESCE(CAST(weight_pounds AS STRING), 'NA'),
          COALESCE(CAST(is_male AS STRING),'NA'),
          COALESCE(CAST(mother_age AS STRING),'NA'),
          COALESCE(CAST(mother_race AS STRING),'NA'),
          COALESCE(CAST(plurality AS STRING), 'NA'),
          COALESCE(CAST(gestation_weeks AS STRING),'NA')
          )
        ) AS key
        FROM
          publicdata.samples.natality
        WHERE year > 2000
        AND weight_pounds > 0
        AND mother_age > 0
        AND plurality > 0
        AND gestation_weeks > 0
        AND month > 0
    """
    
    if step == 'train':
        source_query = 'SELECT * FROM ({}) WHERE MOD(key, 100) < 70 LIMIT {}'.format(query,int(train_size))
    else:
        source_query = 'SELECT * FROM ({}) WHERE MOD(key, 100) >= 70 LIMIT {}'.format(query,int(eval_size))
    
    return source_query

## Source cleanup function

In [5]:
def prep_bq_row(bq_row):

    # modify opaque numeric race code into human-readable data
    races = dict(zip([1,2,3,4,5,6,7,18,28,39,48],
                     ['White', 'Black', 'American Indian', 'Chinese', 
                      'Japanese', 'Hawaiian', 'Filipino',
                      'Asian Indian', 'Korean', 'Samaon', 'Vietnamese']))
    result = {} 
    
    for feature_name in bq_row.keys():
        result[feature_name] = str(bq_row[feature_name])

    if 'mother_race' in bq_row and bq_row['mother_race'] in races:
        result['mother_race'] = races[bq_row['mother_race']]
    else:
        result['mother_race'] = 'Unknown'

    return result

## Read and clean from source

In [6]:
def read_from_bq(pipeline, step, data_size):
    
    source_query = get_source_query(step, data_size)
    raw_data = (
        pipeline
        | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(
            beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
        | '{} - Clean up Data'.format(step) >> beam.Map(prep_bq_row)
    )
    
    raw_metadata = create_raw_metadata()
    raw_dataset = (raw_data, raw_metadata )
    return raw_dataset

## tf.Transform preprocess_fn

In [8]:
def preprocess_fn(input_features):
    
    output_features = {}

    # target feature
    output_features['weight_pounds'] = input_features['weight_pounds']

    # normalisation
    output_features['mother_age_normalized'] = tft.scale_to_z_score(input_features['mother_age'])
    
    # bucktisation based on quantiles
    age_buckets = tft.quantiles(input_features['mother_age'], num_buckets=5, epsilon=0.01)
    output_features['mother_age_bucketized'] = tft.apply_buckets(input_features['mother_age'], age_buckets)
    
    # scaling between 0 and 1
    output_features['gestation_weeks_scaled'] =  tft.scale_to_0_1(input_features['gestation_weeks'])
    
    # you can compute new features based on custom formulas
    output_features['mother_age_log'] = tf.log(input_features['mother_age'])
    
    # or create flags/indicators
    output_features['is_multiple'] = tf.cast(input_features['plurality'] > [1], dtype=tf.int64)
    
    # convert categorical features to indexed vocab
    output_features['mother_race'] = tft.compute_and_apply_vocabulary(input_features['mother_race'], vocab_filename='mother_race')
    output_features['is_male'] = tft.compute_and_apply_vocabulary(input_features['is_male'], vocab_filename='is_male')
        
    return output_features

## Analyze and Transform

In [9]:
def analyze_and_transform(raw_dataset, step):
    
    transformed_dataset, transform_fn = (
        raw_dataset 
        | '{} - Analyze & Transform'.format(step) >> impl.AnalyzeAndTransformDataset(preprocess_fn)
    )
    
    return transformed_dataset, transform_fn

## Transform

In [10]:
def transform(raw_dataset, transform_fn, step):
    
    transformed_dataset = (
        (raw_dataset, transform_fn) 
        | '{} - Transform'.format(step) >> impl.TransformDataset()
    )
    
    return transformed_dataset

## Write tfrecords

In [11]:
def write_tfrecords(transformed_dataset, location, step):
    
    transformed_data, transformed_metadata = transformed_dataset
    (
        transformed_data 
        | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix=os.path.join(location,'{}-'.format(step)),
            file_name_suffix=".tfrecords",
            coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
    )

## Write Text

In [12]:
def write_text(dataset, location, step):
    
    data, _ = dataset
    (
        data 
        | '{} - WriteData'.format(step) >> beam.io.WriteToText(
            file_path_prefix=os.path.join(location,'{}-'.format(step)),
            file_name_suffix=".txt")
    )

In [13]:
def write_transform_artefacts(transform_fn, location):
    
    (
        transform_fn 
        | 'Write Transform Artefacts' >> transform_fn_io.WriteTransformFn(location)
    )

## Transformation Pipeline

In [14]:
def run_transformation_pipeline(args):
    
    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
    
    runner = args['runner']
    data_size = args['data_size']
    transformed_data_location = args['transformed_data_location']
    transform_artefact_location = args['transform_artefact_location']
    temporary_dir = args['temporary_dir']
    debug = args['debug']
    
    print("Sample data size: {}".format(data_size))
    print("Sink transformed data files location: {}".format(transformed_data_location))
    print("Sink transform artefact location: {}".format(transform_artefact_location))
    print("Temporary directory: {}".format(temporary_dir))
    print("Runner: {}".format(runner))
    print("Debug enabled: {}".format(debug))

    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        with impl.Context(temporary_dir):
            
            # Preprocess train data
            step = 'train'
            # Read raw train data from BQ
            raw_train_dataset = read_from_bq(pipeline, step, data_size)
            # Analyze and transform raw_train_dataset 
            transformed_train_dataset, transform_fn = analyze_and_transform(raw_train_dataset, step)
            # Write transformed train data to sink as tfrecords
            write_tfrecords(transformed_train_dataset, transformed_data_location, step)
            
            # Preprocess evaluation data
            step = 'eval'
            # Read raw eval data from BQ
            raw_eval_dataset = read_from_bq(pipeline, 'eval', data_size)
            # Transform eval data based on produced transform_fn
            transformed_eval_dataset = transform(raw_eval_dataset, transform_fn, step)
            # Write transformed eval data to sink as tfrecords
            write_tfrecords(transformed_eval_dataset, transformed_data_location, step)
            
            # Write transformation artefacts 
            write_transform_artefacts(transform_fn, transform_artefact_location)

            # (Optional) for debugging, write transformed data as text 
            step = 'debug'
            # Wwrite transformed train data as text if debug enabled
            if debug == True:
                write_text(transformed_train_dataset, transformed_data_location, step)

            


## Set pipeline params

In [14]:
%%writefile requirements.txt
tensorflow-transform==0.8.0

Overwriting requirements.txt


In [17]:
from datetime import datetime

BUCKET = 'ksalama-gcs-cloudml'
PROJECT = 'ksalama-gcp-playground'
REGION = 'europe-west1'
ROOT_DIR =  'babyweight_tft'
RUN_LOCAL = True

OUTPUT_DIR = ROOT_DIR if RUN_LOCAL==True else "gs://{}/{}".format(BUCKET,ROOT_DIR)
TRANSFORM_ARTEFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')

runner = 'DirectRunner' if RUN_LOCAL == True else 'DataflowRunner'

job_name = 'preprocess-babweight-data-tft-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))

args = {
    
    'job_name': job_name,
    'runner': runner,
    'data_size': 10000,
    'transformed_data_location':  TRANSFORMED_DATA_DIR,
    'transform_artefact_location':  TRANSFORM_ARTEFACTS_DIR,
    'temporary_dir': TEMP_DIR,
    'debug':True,
    
    'project': PROJECT,
    'region': 'europe-west1',
    'staging_location': os.path.join(OUTPUT_DIR, 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'worker_machine_type': 'n1-standard-1',
    'requirements_file': 'requirements.txt',
}

## Run pipeline

In [18]:
try: 
    tf.gfile.DeleteRecursively(TRANSFORMED_DATA_DIR)
    tf.gfile.DeleteRecursively(TRANSFORM_ARTEFACTS_DIR)
    tf.gfile.DeleteRecursively(TEMP_DIR)
    print 'previous transformation files deleted!'
except:
    pass

tf.logging.set_verbosity(tf.logging.ERROR)
print 'Launching {} job {} ... hang on'.format(runner, job_name)
print("")
run_transformation_pipeline(args)
print("Done!")

previous transformation files deleted!
Launching DirectRunner job preprocess-babweight-data-tft-180827-132629 ... hang on

Sample data size: 10000
Sink transformed data files location: babyweight_tft/transformed
Sink transform artefact location: babyweight_tft/transform
Temporary directory: babyweight_tft/tmp
Runner: DirectRunner
Debug enabled: True




Done!


In [17]:
# !echo 'transformed data:'   
# !gsutil ls gs://ksalama-gcs-cloudml/babyweight_tft/transformed
# !echo 'transform artefact:'   
# !gsutil ls gs://ksalama-gcs-cloudml/babyweight_tft/transform/transform_fn
# !echo 'transform vocab assets:'
# !gsutil ls gs://ksalama-gcs-cloudml/babyweight_tft/transform/transform_fn/assets