In [15]:
import os
import tempfile
import pandas as pd
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from apache_beam.options.pipeline_options import PipelineOptions
import tensorflow_transform.beam.impl as beam_impl
 
from measurements import measure

tf.__version__

'1.13.1'

In [52]:
temp_dir = tempfile.mkdtemp()

# Signature data: Just the way it'll arrive at prediction time
signature_csv = os.path.join(temp_dir, "signature.csv")

# Training data: maybe scaled or further pre-processed.
training_csv = os.path.join(temp_dir, "training.csv")

# TFRecord: Allows for high performance input into computational graphs
training_tfr = os.path.join(temp_dir, "training.tfr")

print("You can find the files at:")
print(signature_csv, training_csv, training_tfr)

You can find the files at:
/tmp/tmpsrnhj_zd/signature.csv /tmp/tmpsrnhj_zd/training.csv /tmp/tmpsrnhj_zd/training.tfr


In [53]:
!rm -rf temp_dir

In [54]:
data = measure(5)
data.to_csv(signature_csv, index=None)
data = pd.read_csv(signature_csv)
data.head()

Unnamed: 0,beta1,beta2,hour,humidity,weekday
0,-2.536744,-0.348205,22,12.726654,1
1,2.741431,-1.288184,10,24.225378,2
2,4.916265,4.722238,1,27.175469,2
3,-3.174294,-4.913538,0,14.737376,1
4,-2.951847,-2.143401,9,12.510398,4


### Specify the input and output formats

In [55]:
ORDERED_SIGNATURE_COLUMNS=["beta1", "beta2", "hour", "humidity", "weekday"]
header = bytes(",".join(ORDERED_SIGNATURE_COLUMNS), 'UTF-8')

In [56]:
feature_spec = {
    'beta1': tf.io.FixedLenFeature([1], tf.float32),
    'beta2': tf.io.FixedLenFeature([1], tf.float32),
    'weekday': tf.io.FixedLenFeature([1], tf.int64),
    'hour': tf.io.FixedLenFeature([1], tf.int64),
    'humidity': tf.io.FixedLenFeature([1], tf.float32)
}
schema = dataset_schema.from_feature_spec(feature_spec)

### Create an encoder and test it

In [57]:
csv_encoder = tft.coders.CsvCoder(ORDERED_SIGNATURE_COLUMNS, schema)
records = csv_encoder.decode("10.201, 10.101, 3,1.234,4")
print(records)
csv_encoder.encode(records)

{'weekday': array([4]), 'humidity': array([1.234], dtype=float32), 'beta1': array([10.201], dtype=float32), 'beta2': array([10.101], dtype=float32), 'hour': array([3])}


b'10.201,10.101,3,1.234,4'

### The Apache Beam pipeline 

In [58]:
def process_data(row):
    print(row)
    return row

### Dry run - Everything working?

In [59]:
with beam.Pipeline('DirectRunner', PipelineOptions()) as p:

    csv_encoder = tft.coders.CsvCoder(ORDERED_SIGNATURE_COLUMNS, schema)    

    _ = (p 
         | 'read_from_csv' >> beam.io.ReadFromText(
             file_pattern=signature_csv, coder=csv_encoder, skip_header_lines=1)
         
         | 'process_records' >> beam.Map(process_data)
         
         | 'write_to_csv' >> beam.io.WriteToText(
             file_path_prefix=training_csv, coder=csv_encoder, header=header)
        )


{'weekday': array([1]), 'humidity': array([12.726654], dtype=float32), 'beta1': array([-2.5367444], dtype=float32), 'beta2': array([-0.34820482], dtype=float32), 'hour': array([22])}
{'weekday': array([2]), 'humidity': array([24.225378], dtype=float32), 'beta1': array([2.7414305], dtype=float32), 'beta2': array([-1.2881843], dtype=float32), 'hour': array([10])}
{'weekday': array([2]), 'humidity': array([27.175468], dtype=float32), 'beta1': array([4.9162645], dtype=float32), 'beta2': array([4.7222385], dtype=float32), 'hour': array([1])}
{'weekday': array([1]), 'humidity': array([14.737376], dtype=float32), 'beta1': array([-3.1742942], dtype=float32), 'beta2': array([-4.9135385], dtype=float32), 'hour': array([0])}
{'weekday': array([4]), 'humidity': array([12.510398], dtype=float32), 'beta1': array([-2.9518473], dtype=float32), 'beta2': array([-2.1434014], dtype=float32), 'hour': array([9])}


In [60]:
!echo "Reading from: " $training_csv*
!cat $training_csv*

Reading from:  /tmp/tmpsrnhj_zd/training.csv-00000-of-00001
beta1,beta2,hour,humidity,weekday
-2.5367444,-0.34820482,22,12.726654,1
2.7414305,-1.2881843,10,24.225378,2
4.9162645,4.7222385,1,27.175468,2
-3.1742942,-4.9135385,0,14.737376,1
-2.9518473,-2.1434014,9,12.510398,4


### Serious transformation: Scale $\beta_1$ and $\beta_2$

In [61]:
data = measure(20000)
data.to_csv(signature_csv, index=None)

In [62]:
def process_data(row):
    for c in ['beta1', 'beta2']:
        row[c] = tft.scale_to_0_1(row[c])
    return row

In [63]:
signature_metadata = dataset_metadata.DatasetMetadata(schema)

In [64]:
csv_encoder = tft.coders.CsvCoder(ORDERED_SIGNATURE_COLUMNS, schema)    
tfr_encoder = tft.coders.ExampleProtoCoder(schema)            

metadata_dir = os.path.join(temp_dir, "metadata")
with beam.Pipeline('DirectRunner', PipelineOptions()) as p:

    #
    # The context is provided for the AnalyseAndTransform step.
    # That step needs a hand to do its magic.
    #
    with tft_beam.Context(temp_dir=temp_dir):

        #
        # Read from csv, skip headers. Note that we use ordered columns in the encoder
        #
        signature_data = ( p | 'read_from_csv' 
            >> beam.io.ReadFromText(
                 file_pattern=signature_csv, coder=csv_encoder, skip_header_lines=1))

        #
        # attach the metadata: required for AnalyzeAndTransform
        #
        signature_data = ( signature_data, signature_metadata)

        #
        # Do the magic two steps and return also the transform-function
        #
        data_and_metadata, transform_fn = ( signature_data | "AnalyzeAndTransform" 
                         >> beam_impl.AnalyzeAndTransformDataset(process_data))
        
        #
        # split data and metadata
        #
        training_data, training_metadata = data_and_metadata

        #
        # Write the resulting data to a csv file
        #
        _ = (training_data | 'write_to_csv' 
             >> beam.io.WriteToText(
                 file_path_prefix=training_csv, coder=csv_encoder, header=header))

        _ = (training_data | 'write_to_tfr' 
             >> beam.io.WriteToTFRecord(
                 file_path_prefix=training_tfr, coder=tfr_encoder))

        
        
        #
        # Eventually, save the transform function for re-use at prediction time.
        #
        _ = (transform_fn | 'WriteTransformFn' 
             >> transform_fn_io.WriteTransformFn(metadata_dir))


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/287ff8e2554a4e3bb62b838b363c886a/saved_model.pb


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/287ff8e2554a4e3bb62b838b363c886a/saved_model.pb


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/58bce881fccb441e8d175e65eb1a4c43/saved_model.pb


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/58bce881fccb441e8d175e65eb1a4c43/saved_model.pb


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/d9aab3ef32dd4d07829f6b92a26faf21/saved_model.pb


INFO:tensorflow:SavedModel written to: /tmp/tmpsrnhj_zd/tftransform_tmp/d9aab3ef32dd4d07829f6b92a26faf21/saved_model.pb


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


INFO:tensorflow:Saver not created because there are no variables in the graph to restore


In [68]:
!echo "Reading from: " $training_csv*
!echo
!cat $training_csv* | tail -5
!echo
!echo metadata is here:
!ls $metadata_dir
!echo 
!echo "TFRecords are here: " $training_tfr*

Reading from:  /tmp/tmpsrnhj_zd/training.csv-00000-of-00001

0.3818975,0.5458483,12,17.193748,2
0.16038401,0.5800147,9,10.631513,2
0.44184422,0.7403201,17,18.30096,5
0.7561718,0.61078686,2,23.344,1
0.73403597,0.96532387,4,21.984755,2

metadata is here:
transformed_metadata  transform_fn

TFRecords are here:  /tmp/tmpsrnhj_zd/training.tfr-00000-of-00001


### Wrap up

We're now able to analyze and process any size of data, scale particular features to the interval $[0, 1]$ while saving the function that actually did it for later. We'll need that function to apply exactly the same scaling to the incoming data at prediction time. 
Note that by simply swapping ```'DirectRunner'``` in the Apache Beam pipeline by ```'DataFlowRunner'``` in an adequately configured GCP environment, we could have the pipeline executed on an arbitrarily large cluster.