In [None]:
%%bash
pip uninstall -y google-cloud-dataflow
pip install --upgrade --force tensorflow_transform apache-beam[gcp]

In [1]:
from __future__ import absolute_import, division, print_function
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_hub as hub
from tensorflow_transform.beam import impl as beam_impl
import shutil
import os
import datetime
import apache_beam as beam
import config
import variables
print(tf.__version__)

# Cloud Setup
This section is only required if running on cloud

In [None]:
os.environ['BUCKET'] = config.BUCKET
os.environ['PROJECT'] = config.PROJECT
os.environ['REGION'] = config.REGION

In [None]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

# Create Dataset using tf.transform
Much of the code is taken from [a notebook in Google's training data analyst repo](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/feateng/tftransform.ipynb).


We will use `tf.transform` (which uses Apache Beam under the hood) for the data transformation pipeline. Doing this has several advantages:
1. Same input pipeline can be used for batch and streaming data (just need to change the `beam.io.ReadFromText` line to whichever other file/DB connection).
1. Same pipeline code can be used regardless of where it is running (on local machine, on GCP, on a Spark cluster).


We need to have the following pre-requisites:
1. Have a `requirements.txt` file for the Apache Beam pipeline
1. Have a data filtering / validation function (here, it is the `is_valid` function)
1. Have a data processing function (here, it is the `preprocess_tft` function)


We can then define the pipeline:
1. Import required packages for the pipeline
1. Prepare the filesystem by deleting leftover files
1. Set up the options for the pipeline
1. Create metadata about the inputs (input columns and their datatypes) and save it
1. Create a pipeline with the desired runner (e.g. `DirectRunner` or `DataflowRunner`)
1. Transform raw training data
  1. Read in raw training data
  1. Analyze and transform the raw training data (produces a transformed dataset **and** a transform function to use on the eval and test datasets)
  1. Save the transformed data
1. Transform eval and test data
  1. Read in raw eval/test data
  1. Transform raw eval/test data
  1. Save transformed eval/test data
1. Save transform function for serving

Note: when you read in data, it becomes a `PCollection`. Combining a `PCollection` with `DatasetMetadata` makes it a dataset. A dataset is a `tf.transform` concept.

In [2]:
%%writefile requirements.txt
tensorflow
tensorflow-transform
tensorflow-hub
apache-beam
python-snappy

In [3]:
# keras doesn't support the label_vocabulary argument like canned estimators in tf,estimator,
# so we have to reverse engineer the vocab lookup
with open('data/misc/labels.txt', 'r') as f:
    labels = []
    for line in f.readlines():
        labels.append(line.strip('\n'))

def decode_csv(row):
    try:
        split = row.split(config.DELIM)
        features = dict(zip(config.RENAMED_COLS, split))
        for col in config.STRING_COLS:
            features[col] = features[col].strip()
        for col in config.NUMERIC_COLS:
            features[col] = float(features[col])
    except ValueError:
        features = {}
        for col in config.STRING_COLS:
            features[col] = ''.join(['dummy_', col])
        for col in config.NUMERIC_COLS:
            features[col] = 0.0
    finally:
        all_cols = config.STRING_COLS + config.NUMERIC_COLS
        for key in features.keys():
            if key not in all_cols:
                features.pop(key)
        return features

def is_valid(inputs):
    try:
        text = inputs['text']
        spam = inputs['spam']
        valid = (spam in ('ham', 'spam')) and (len(text) > 0)
        return valid
    except:
        return False


def preprocess_tft(inputs):
    with tf.Session() as sess:
        result = inputs.copy()
        # string split returns rank 2 sparse tensor, containing (i) indices of the rows
        # and (ii) the integer mapping of each word
        tokens = tf.strings.split(inputs[config.TOKENIZE_COL])
        tokens = tft.ngrams(tokens, ngram_range=config.NGRAM_RANGE, separator=' ')
        indices = tft.compute_and_apply_vocabulary(tokens, top_k=config.MAX_TOKENS)
        bow_indices, bow_weight = tft.tfidf(indices, vocab_size=config.MAX_TOKENS+1)
        result['bow_indices'] = bow_indices
        result['bow_weight'] = bow_weight
        
        # keras doesn't support the label_vocabulary argument like canned estimators in tf.estimator,
        # so we have to reverse engineer the vocab lookup
        result[config.LABEL_COL + '_one_hot'] = tft.apply_function(
            lambda s: tf.contrib.lookup.string_to_index(s, mapping=labels), 
            result[config.LABEL_COL])
        result[config.LABEL_COL + '_one_hot'] = tf.one_hot(result[config.LABEL_COL + '_one_hot'], depth=variables.N_CLASSES)
        result[config.LABEL_COL] = inputs[config.LABEL_COL]
    return result


def get_dataset_path(phase, on_cloud=False):
    if on_cloud:
        data_dir = 'gs://{bucket}/spam-classification/data/split'.format(bucket=config.BUCKET)
    else:
        data_dir = 'data/split'
  
    if phase == 'train':
        dataset_dir = os.path.join(data_dir, 'train*.csv')
    elif phase == 'eval':
        dataset_dir = os.path.join(data_dir, 'eval*.csv')
    else:
        dataset_dir = os.path.join(data_dir, 'test*.csv')
    
    return dataset_dir


def preprocess(on_cloud=False):
    import datetime
    import os
    import tempfile
    from apache_beam.io import tfrecordio
    from tensorflow_transform.coders import example_proto_coder
    from tensorflow_transform.tf_metadata import dataset_metadata
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.beam import tft_beam_io
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io

    job_name = 'preprocess-for-{project}-'.format(project=config.PROJECT) + datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
    
    # flush all old data before running
    if on_cloud:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://{bucket}/spam-classification/data/tft'.format(bucket=config.BUCKET)
        import subprocess
        subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
    else:
        import shutil
        print('Launching local job ... hang on')
        OUTPUT_DIR = './data/tft'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    
    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': config.PROJECT,
        'max_num_workers': 24,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'requirements_file': 'requirements.txt'
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
  
    if on_cloud:
        RUNNER = 'DataflowRunner'
    else:
        RUNNER = 'DirectRunner'
    
    # set up metadata
    raw_data_schema = {}
    if config.STRING_COLS:
        raw_data_schema.update({
            colname: dataset_schema.ColumnSchema(
                tf.string, 
                [], 
                dataset_schema.FixedColumnRepresentation()
            ) 
            for colname in config.STRING_COLS
        })
    if config.NUMERIC_COLS:
        raw_data_schema.update({
            colname: dataset_schema.ColumnSchema(
                tf.float32,
                [],
                dataset_schema.FixedColumnRepresentation()
            )
            for colname in config.NUMERIC_COLS
        })
    raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
  
    # run Beam
    with beam.Pipeline(RUNNER, options=opts) as p:
        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
            # save the raw data metadata
            _ = (
                raw_data_metadata
                | 'write_input_metadata' >> tft_beam_io.WriteMetadata(
                    os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
                    pipeline=p))
      
            # analyze and transform training
            raw_train_data = (
                p
                | 'train_read' >> beam.io.ReadFromText(get_dataset_path(phase='train', on_cloud=on_cloud))
                | 'train_decode' >> beam.Map(decode_csv)
                | 'train_filter' >> beam.Filter(is_valid))

            raw_train_dataset = (raw_train_data, raw_data_metadata)
            transformed_train_dataset, transform_fn = (
                raw_train_dataset 
                | 'transform_train_data' >> beam_impl.AnalyzeAndTransformDataset(preprocessing_fn=preprocess_tft))
            transformed_train_data, transformed_metadata = transformed_train_dataset

            # write transformed training data
            _ = (
                transformed_train_data
                | 'write_train_data' >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, 'train'),
                    file_name_suffix='.gz',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema)))

            # transform eval data
            raw_eval_data = (
                p
                | 'eval_read' >> beam.io.ReadFromText(get_dataset_path(phase='eval', on_cloud=on_cloud))
                | 'eval_decode' >> beam.Map(decode_csv)
                | 'eval_filter' >> beam.Filter(is_valid))
            raw_eval_dataset = (raw_eval_data, raw_data_metadata)
            transformed_eval_dataset = (
                (raw_eval_dataset, transform_fn)
                | 'transform_eval_data' >> beam_impl.TransformDataset())
            transformed_eval_data, _ = transformed_eval_dataset

            # write eval data
            _ = (
                transformed_eval_data
                | 'write_eval_data' >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, 'eval'),
                    file_name_suffix='.gz',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema)))

            # transform test data
            raw_test_data = (
                p
                | 'test_read' >> beam.io.ReadFromText(get_dataset_path(phase='test', on_cloud=on_cloud))
                | 'test_decode' >> beam.Map(decode_csv)
                | 'test_filter' >> beam.Filter(is_valid))
            raw_test_dataset = (raw_test_data, raw_data_metadata)
            transformed_test_dataset = (
                (raw_test_dataset, transform_fn)
                | 'transform_test_data' >> beam_impl.TransformDataset())
            transformed_test_data, _ = transformed_test_dataset

            # write test data
            _ = (
                transformed_test_data
                | 'write_test_data' >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, 'test'),
                    file_name_suffix='.gz',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema)))

            # write transform function for serving
            _ = (
                transform_fn
                | 'write_transform_fn' >> transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata'))
            )
      

preprocess(on_cloud=False)