<a href="https://colab.research.google.com/github/rkrissada/google_ml_training/blob/master/tftransform.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1> Exploring tf.transform </h1>

While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

Only specific combinations of TensorFlow/Beam are supported by tf.transform. So make sure to get a combo that is.

* TFT 0.8.0
* TF 1.8 or higher
* Apache Beam [GCP] 2.9.0 or higher

In [0]:
%%bash
pip install apache-beam[gcp]==2.9.0 tensorflow_transform==0.8.0

Collecting apache-beam[gcp]==2.9.0
  Downloading https://files.pythonhosted.org/packages/d4/3d/90aa15779e884feebae4b0c26cad6f52cd4040397a94deb58dad9c8b7300/apache_beam-2.9.0-cp27-cp27mu-manylinux1_x86_64.whl (2.4MB)
Collecting tensorflow_transform==0.8.0
  Downloading https://files.pythonhosted.org/packages/1a/5c/d22108832b9df5d91e7750e544e69157c4eeca8321c177b83c047f428d9c/tensorflow-transform-0.8.0.tar.gz (122kB)
Collecting httplib2<=0.11.3,>=0.8 (from apache-beam[gcp]==2.9.0)
  Downloading https://files.pythonhosted.org/packages/fd/ce/aa4a385e3e9fd351737fd2b07edaa56e7a730448465aceda6b35086a0d9b/httplib2-0.11.3.tar.gz (215kB)
Collecting typing<3.7.0,>=3.6.0; python_version < "3.5.0" (from apache-beam[gcp]==2.9.0)
  Downloading https://files.pythonhosted.org/packages/cc/3e/29f92b7aeda5b078c86d14f550bf85cff809042e3429ace7af6193c3bc9f/typing-3.6.6-py2-none-any.whl
Collecting pyvcf<0.7.0,>=0.6.8 (from apache-beam[gcp]==2.9.0)
  Downloading https://files.pythonhosted.org/packages/20/b6/36b

google-cloud-bigquery 1.6.1 has requirement google-api-core<2.0.0dev,>=1.0.0, but you'll have google-api-core 0.1.4 which is incompatible.
googledatastore 7.0.1 has requirement httplib2<0.10,>=0.9.1, but you'll have httplib2 0.11.3 which is incompatible.


<b>Restart the kernel</b> after you do a pip install (click on the <b>Reset</b> button in Datalab)

In [0]:
%bash
pip freeze | grep -e 'flow\|beam'

apache-airflow==1.9.0
apache-beam==2.9.0
tensorflow==1.8.0
tensorflow-transform==0.8.0


In [0]:
import tensorflow as tf
import tensorflow_transform as tft
import shutil
print(tf.__version__)

  from ._conv import register_converters as _register_converters


1.8.0


In [0]:
# change these to try this notebook out
BUCKET = 'qwiklabs-gcp-2194bbaf8a54e2ad'
PROJECT = 'qwiklabs-gcp-2194bbaf8a54e2ad'
REGION = 'asia-southeast1'

In [0]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [0]:
%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


In [0]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

## Input source: BigQuery

Get data from BigQuery but defer filtering etc. to Beam.
Note that the dayofweek column is now strings.

In [0]:
import google.datalab.bigquery as bq
def create_query(phase, EVERY_N):
  """
  phase: 1=train 2=valid
  """
  base_query = """
WITH daynames AS
  (SELECT ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek)
SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,
  EXTRACT(HOUR FROM pickup_datetime) AS hourofday,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count AS passengers,
  'notneeded' AS key
FROM
  `nyc-tlc.yellow.trips`, daynames
WHERE
  trip_distance > 0 AND fare_amount > 0
  """

  if EVERY_N == None:
    if phase < 2:
      # training
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)),4) < 2".format(base_query)
    else:
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)),4) = {1}".format(base_query, phase)
  else:
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))),{1}) = {2}".format(base_query, EVERY_N, phase)
    
  return query

query = create_query(2, 100000)

In [0]:
df_valid = bq.Query(query).execute().result().to_dataframe()
display(df_valid.head())
df_valid.describe()

Unnamed: 0,fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key
0,4.0,Sun,0,-73.990792,40.686295,-73.99647,40.686182,1,notneeded
1,7.5,Sun,0,-73.995475,40.717172,-73.990305,40.723332,1,notneeded
2,11.0,Sun,0,-73.979865,40.740787,-73.987555,40.722502,1,notneeded
3,12.1,Thurs,0,-74.009379,40.704176,-73.987394,40.732475,1,notneeded
4,6.0,Sun,0,-73.98791,40.738,-73.9892,40.74385,1,notneeded


Unnamed: 0,fare_amount,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers
count,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0
mean,11.242599,13.244075,-72.576852,39.973146,-72.748974,40.006091,1.722118
std,9.447462,6.548354,10.133452,5.777329,12.981577,5.664887,1.351062
min,2.5,0.0,-78.133333,-73.991278,-751.4,-73.97797,0.0
25%,6.0,9.0,-73.991849,40.734954,-73.991236,40.734008,1.0
50%,8.5,14.0,-73.981824,40.75264,-73.980164,40.753427,1.0
75%,12.5,19.0,-73.967418,40.7667,-73.964153,40.767832,2.0
max,143.0,23.0,40.806487,41.366138,40.7854,41.366138,6.0


## Create ML dataset using tf.transform and Dataflow

Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.

In [0]:
%writefile requirements.txt
tensorflow-transform==0.8.0

Writing requirements.txt


Test transform_data is type pcollection. test if _ = is neccesary

In [0]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl

def is_valid(inputs):
  try:
    pickup_longitude = inputs['pickuplon']
    dropoff_longitude = inputs['dropofflon']
    pickup_latitude = inputs['pickuplat']
    dropoff_latitude = inputs['dropofflat']
    hourofday = inputs['hourofday']
    dayofweek = inputs['dayofweek']
    passenger_count = inputs['passengers']
    fare_amount = inputs['fare_amount']
    return (fare_amount >= 2.5 and pickup_longitude > -78 and pickup_longitude < -70 \
      and dropoff_longitude > -78 and dropoff_longitude < -70 and pickup_latitude > 37 \
      and pickup_latitude < 45 and dropoff_latitude > 37 and dropoff_latitude < 45 \
      and passenger_count > 0)
  except:
    return False
  
def preprocess_tft(inputs):
      import datetime   
      print inputs
      result = {}
      result['fare_amount'] = tf.identity(inputs['fare_amount'])     
      result['dayofweek'] = tft.string_to_int(inputs['dayofweek']) # builds a vocabulary
      result['hourofday'] = tf.identity(inputs['hourofday']) # pass through
      result['pickuplon'] = (tft.scale_to_0_1(inputs['pickuplon'])) # scaling numeric values
      result['pickuplat'] = (tft.scale_to_0_1(inputs['pickuplat']))
      result['dropofflon'] = (tft.scale_to_0_1(inputs['dropofflon']))
      result['dropofflat'] = (tft.scale_to_0_1(inputs['dropofflat']))
      result['passengers'] = tf.cast(inputs['passengers'], tf.float32) # a cast
      result['key'] = tf.as_string(tf.ones_like(inputs['passengers'])) # arbitrary TF func
      # engineered features
      latdiff = inputs['pickuplat'] - inputs['dropofflat']
      londiff = inputs['pickuplon'] - inputs['dropofflon']
      result['latdiff'] = tft.scale_to_0_1(latdiff)
      result['londiff'] = tft.scale_to_0_1(londiff)
      dist = tf.sqrt(latdiff * latdiff + londiff * londiff)
      result['euclidean'] = tft.scale_to_0_1(dist)
      return result

def preprocess(in_test_mode):
  import os
  import os.path
  import tempfile
  from apache_beam.io import tfrecordio
  from tensorflow_transform.coders import example_proto_coder
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.beam import tft_beam_io
  from tensorflow_transform.beam.tft_beam_io import transform_fn_io

  job_name = 'preprocess-taxi-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
  if in_test_mode:
    import shutil
    print 'Launching local job ... hang on'
    OUTPUT_DIR = './preproc_tft'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    EVERY_N = 100000
  else:
    print 'Launching Dataflow job {} ... hang on'.format(job_name)
    OUTPUT_DIR = 'gs://{0}/taxifare/preproc_tft/'.format(BUCKET)
    import subprocess
    subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
    EVERY_N = 10000
    
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'max_num_workers': 16,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True,
    'requirements_file': 'requirements.txt'
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  # set up raw data metadata
  raw_data_schema = {
    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'dayofweek,key'.split(',')
  }
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'fare_amount,pickuplon,pickuplat,dropofflon,dropofflat'.split(',')
    })
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'hourofday,passengers'.split(',')
    })
  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))

  # run Beam  
  with beam.Pipeline(RUNNER, options=opts) as p:
    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
      # save the raw data metadata
      raw_data_metadata | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
            os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
            pipeline=p)
      
      # read training data from bigquery and filter rows     
      raw_data = (p 
        | 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(1, EVERY_N), use_standard_sql=True))
        | 'train_filter' >> beam.Filter(is_valid))
      raw_dataset = (raw_data, raw_data_metadata)
      
      # analyze and transform training data
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
      transformed_data, transformed_metadata = transformed_dataset
      
      # save transformed training data to disk in efficient tfrecord format
      transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'train'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      
      # read eval data from bigquery and filter rows  
      raw_test_data = (p 
        | 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(2, EVERY_N), use_standard_sql=True))
        | 'eval_filter' >> beam.Filter(is_valid))
      raw_test_dataset = (raw_test_data, raw_data_metadata)
      
      # transform eval data
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      transformed_test_data, _ = transformed_test_dataset
      
      # save transformed training data to disk in efficient tfrecord format
      transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'eval'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      
      # save transformation function to disk for use at serving time
      transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(
          os.path.join(OUTPUT_DIR, 'metadata'))

preprocess(in_test_mode=False) # change to True to run locally

Launching Dataflow job preprocess-taxi-features-190501-072321 ... hang on


  options = pbegin.pipeline.options.view_as(DebugOptions)


{'dayofweek': <tf.Tensor 'inputs/dayofweek_copy:0' shape=(?,) dtype=string>, 'passengers': <tf.Tensor 'inputs/passengers_copy:0' shape=(?,) dtype=int64>, 'fare_amount': <tf.Tensor 'inputs/fare_amount_copy:0' shape=(?,) dtype=float32>, 'pickuplat': <tf.Tensor 'inputs/pickuplat_copy:0' shape=(?,) dtype=float32>, 'dropofflat': <tf.Tensor 'inputs/dropofflat_copy:0' shape=(?,) dtype=float32>, 'key': <tf.Tensor 'inputs/key_copy:0' shape=(?,) dtype=string>, 'hourofday': <tf.Tensor 'inputs/hourofday_copy:0' shape=(?,) dtype=int64>, 'pickuplon': <tf.Tensor 'inputs/pickuplon_copy:0' shape=(?,) dtype=float32>, 'dropofflon': <tf.Tensor 'inputs/dropofflon_copy:0' shape=(?,) dtype=float32>}
Instructions for updating:
Use `tft.compute_and_apply_vocabulary()` instead.
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-2194bbaf8a54e2ad/taxifare/preproc_tft/tmp/tftransform_tmp/70a36fe80ca54b83946ab3a540c7c02e/saved_model.pb

Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.



ContextualVersionConflict: (httplib2 0.12.0 (/usr/local/envs/py2env/lib/python2.7/site-packages), Requirement.parse('httplib2<=0.11.3,>=0.8'), set(['apache-beam']))

In [0]:
%bash
# ls preproc_tft
gsutil ls gs://${BUCKET}/taxifare/preproc_tft/

gs://qwiklabs-gcp-2194bbaf8a54e2ad/taxifare/preproc_tft/
gs://qwiklabs-gcp-2194bbaf8a54e2ad/taxifare/preproc_tft/tmp/


<h2> Train off preprocessed data </h2>

In [0]:
%bash
rm -rf taxifare_tft.tar.gz taxi_trained
export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft
python -m trainer.task \
   --train_data_paths="gs://${BUCKET}/taxifare/preproc_tft/train*" \
   --eval_data_paths="gs://${BUCKET}/taxifare/preproc_tft/eval*"  \
   --output_dir=./taxi_trained \
   --train_steps=10 --job-dir=/tmp \
   --metadata_path=gs://${BUCKET}/taxifare/preproc_tft/metadata

  from ._conv import register_converters as _register_converters
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': 'worker', '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f8a1db9b0d0>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 0, '_tf_random_seed': None, '_master': '', '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': './taxi_trained', '_global_id_in_cluster': 0, '_save_summary_steps': 100}
Traceback (most recent call last):
  File "/content/datalab/training-data-analyst/courses/machine_learning/feateng/taxifare_tft/trainer/task.py", line 127, in <module>
    model.train_and_evaluate(arguments)
  File "/content/datalab/training-data-analyst/courses/machine_learni

In [0]:
!ls $PWD/taxi_trained/export/exporter

ls: cannot access '/content/datalab/training-data-analyst/courses/machine_learning/feateng/taxi_trained/export/exporter': No such file or directory


In [0]:
%writefile /tmp/test.json
{"dayofweek":"Thu","hourofday":17,"pickuplon": -73.885262,"pickuplat": 40.773008,"dropofflon": -73.987232,"dropofflat": 40.732403,"passengers": 2}

Writing /tmp/test.json


In [0]:
%bash
model_dir=$(ls $PWD/taxi_trained/export/exporter/)
gcloud ml-engine local predict \
    --model-dir=./taxi_trained/export/exporter/${model_dir} \
    --json-instances=/tmp/test.json

ls: cannot access '/content/datalab/training-data-analyst/courses/machine_learning/feateng/taxi_trained/export/exporter/': No such file or directory
  from ._conv import register_converters as _register_converters
Traceback (most recent call last):
  File "lib/googlecloudsdk/command_lib/ml_engine/local_predict.py", line 184, in <module>
    main()
  File "lib/googlecloudsdk/command_lib/ml_engine/local_predict.py", line 179, in main
    signature_name=args.signature_name)
  File "/tools/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/prediction_lib.py", line 98, in local_predict
    client = create_client(framework, model_dir, **kwargs)
  File "/tools/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/prediction_lib.py", line 91, in create_client
    return create_client_fn(model_path, **kwargs)
  File "/tools/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/frameworks/tf_prediction_lib.py", line 555, in create_tf_session_client
    return SessionClient(*

Copyright 2016-2018 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License