<h1> Preprocessing using tf.transform and Dataflow </h1>

This notebook illustrates:
<ol>
<li> Creating datasets for Machine Learning using tf.transform and Dataflow
</ol>
<p>
While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.
<p>
Only specific combinations of TensorFlow/Beam are supported by tf.transform. So make sure to get a combo that is.
* TFT 0.1.10 
* TF 1.0 or higher
* Apache Beam [GCP] 2.1.1 or higher

In [None]:
%bash
pip uninstall -y google-cloud-dataflow
pip install --upgrade --force tensorflow_transform==0.4.0 apache-beam[gcp]==2.2.0

In [None]:
%bash
pip freeze | grep -e 'flow\|beam'

In [None]:
import tensorflow as tf
import warnings
with warnings.catch_warnings():
  warnings.filterwarnings("ignore", category=DeprecationWarning)
  import apache_beam as beam
print tf.__version__

In [None]:
# change these to try this notebook out
BUCKET = 'asl-ml-immersion-temp'
PROJECT = 'asl-ml-immersion'
REGION = 'us-central1'

In [None]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [None]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

<h2> Save the query from earlier </h2>

The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that.

In [None]:
query="""
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
"""

In [None]:
import google.datalab.bigquery as bq
df = bq.Query(query + " LIMIT 100").execute().result().to_dataframe()
df.head()

<h2> Create ML dataset using tf.transform and Dataflow </h2>
<p>
Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.
<p>
Note that after you launch this, the notebook won't show you progress. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about <b>30 minutes</b> for me. If you wish to continue without doing this step, you can copy my preprocessed output:
<pre>
gsutil -m cp -r gs://asl-ml-immersion/babyweight/preproc_tft gs://your-bucket/
</pre>

In [None]:
%bash
# makes sure that the version of tensorflow and tensorflow_transform that we are using is on the worker machines
echo "tensorflow_transform==0.4.0" > requirements.txt
echo "apache-beam[gcp]==2.2.0" >> requirements.txt
#pip freeze | grep tensorflow-transform > requirements.txt
cat requirements.txt

In [None]:
import datetime
with warnings.catch_warnings():
  warnings.filterwarnings("ignore", category=DeprecationWarning)
  import apache_beam as beam
  import tensorflow_transform as tft
  from tensorflow_transform.beam import impl as beam_impl

METADATA = {
  'ORDERED_FIELDS' : ['weight_pounds', 'is_male', 'mother_age', 'plurality', 'gestation_weeks'],
  'STR_FIELDS' : ['key', 'is_male', 'plurality'],
  'FLT_FIELDS' : ['weight_pounds', 'mother_age', 'gestation_weeks']
}

def preprocess_tft(inputs):
    import copy
    import numpy as np

    def center(x):
          return x - tft.mean(x)

    result = copy.copy(inputs) # shallow copy
    result['mother_age_tft'] = center(inputs['mother_age'])
    result['gestation_weeks_centered'] = tft.scale_to_0_1(inputs['gestation_weeks'])
    return result


def cleanup(rowdict, metadata):
    import copy, hashlib
    # create synthetic data where we assume that no ultrasound has been performed
    # and so we don't know sex of the baby. Let's assume that we can tell the difference
    # between single and multiple, but that the errors rates in determining exact number
    # is difficult in the absence of an ultrasound.
    #print(rowdict)
    no_ultrasound = copy.deepcopy(rowdict)
    no_ultrasound[u'is_male'] = u'Unknown'

    if rowdict[u'plurality'] > 1:
        no_ultrasound[u'plurality'] = u'Multiple(2+)'
    else:
        no_ultrasound[u'plurality'] = u'Single(1)'

    w_ultrasound = copy.deepcopy(rowdict)
    # Change the plurality column to strings
    w_ultrasound[u'plurality'] = [u'Single(1)', u'Twins(2)',
                                 u'Triplets(3)', u'Quadruplets(4)',
                                 u'Quintuplets(5)'][rowdict[u'plurality']-1]

    # add any missing columns, and correct the types
    def tofloat(value, ifnot):
      try:
        return float(value)
      except (ValueError, TypeError):
        return ifnot

    out_no_ultrasound = {
      k : str(no_ultrasound[k]) if k in no_ultrasound else u'None' for k in metadata['STR_FIELDS']
    }
    out_no_ultrasound.update({
        k : tofloat(no_ultrasound[k], -99) if k in no_ultrasound else -99 for k in metadata['FLT_FIELDS']
    }
    )

    out_w_ultrasound = {
      k : str(w_ultrasound[k]) if k in w_ultrasound else u'None' for k in metadata['STR_FIELDS']
    }
    out_w_ultrasound.update({
        k : tofloat(w_ultrasound[k], -99) if k in w_ultrasound else -99 for k in metadata['FLT_FIELDS']
    }
    )

    # cleanup: write out only the data we that we want to train on
    # using no_ultrasound or w_ultrasound yield the same results
    for out in [out_no_ultrasound, out_w_ultrasound]:
      if (out['weight_pounds'] > 0 and
              out['mother_age'] > 0 and
              out['gestation_weeks'] > 0 ):

        data = ','.join([str(out[k]) for k in metadata['ORDERED_FIELDS']])
        out['key'] = hashlib.sha224(data).hexdigest()
        yield out
      
def preprocess(query, in_test_mode):
  import os
  import os.path
  import tempfile
  from apache_beam.io import tfrecordio
  from tensorflow_transform.coders import example_proto_coder
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.beam.tft_beam_io import transform_fn_io

  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  print 'Launching Dataflow job {} ... hang on'.format(job_name)
  OUTPUT_DIR = 'gs://{0}/babyweight/preproc_tft/'.format(BUCKET)
  import subprocess
  subprocess.call('gsutil -m -o rm -r {}'.format(OUTPUT_DIR).split())

  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'region' : REGION,
    'max_num_workers': 24,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'save_main_session': False,
    'requirements_file': 'requirements.txt'
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)

  RUNNER = 'DataflowRunner'

  # set up metadata
  raw_data_schema = {
    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
                   for colname in METADATA['STR_FIELDS']
  }
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
                   for colname in METADATA['FLT_FIELDS']
    })

  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))

  def read_rawdata(p, step, test_mode):
    if step == 'train':
        selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) < 3'.format(query)
    else:
        selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hashmonth),4) = 3'.format(query)

    if test_mode:
        selquery += ' LIMIT 1000'

    #print 'Processing {} data from {}'.format(step, selquery)
    return (p
          | '{}_read'.format(step) >> beam.io.Read(
                beam.io.BigQuerySource(query=selquery,
                                       use_standard_sql=True),
                                       )
          | '{}_cleanup'.format(step) >> beam.FlatMap(cleanup, METADATA)
         )

  # run Beam
  with beam.Pipeline(RUNNER, options=opts) as p:
    with beam_impl.Context(os.path.join(OUTPUT_DIR, 'tmp')):
      # analyze and transform training
      raw_data = read_rawdata(p, 'train', in_test_mode)

      # Combine data and schema into a dataset tuple.  Note that we already used
      # the schema to "read" BigQuery data
      raw_dataset = (raw_data, raw_data_metadata)
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))

      transformed_data, transformed_metadata = transformed_dataset

      _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'train'),
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))

      # transform eval data
      raw_test_data = read_rawdata(p, 'eval', in_test_mode)
      raw_test_dataset = (raw_test_data, raw_data_metadata)
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())

      # Don't need transformed data schema, it's the same as before.
      transformed_test_data, _ = transformed_test_dataset

      _ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'eval'),
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))


      _ = (transform_fn
           | 'WriteTransformFn' >>
           transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata'))
          )

  job = p.run()

In [None]:
preprocess(query, in_test_mode=False)

In [None]:
%bash
gsutil ls gs://${BUCKET}/babyweight/preproc_tft/*-00000*

Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License