In [9]:
%%capture
pip install apache-beam[gcp]
pip install apitools
pip install six==1.10
# Note: May need to restart kernel after install

In [1]:
!pip list | grep "beam"
!pip list | grep "tensorflow"
!pip list | grep "apitools"

apache-beam (2.16.0)
tensorflow (1.8.0)
tensorflow-estimator (1.14.0)
tensorflow-hub (0.6.0)
tensorflow-metadata (0.14.0)
tensorflow-probability (0.7.0)
tensorflow-serving-api (1.14.0)
tensorflow-transform (0.14.0)
apitools (0.1.4)
google-apitools (0.5.28)


In [4]:
import tensorflow as tf
import apache_beam as beam

#### Note: For using tf transform, ensure versions are compatible with: https://pypi.org/project/tensorflow-transform/

### Pipeline steps

In [12]:
FEATURES = ["weight_pounds", "is_male", "mother_age", "mother_race", "plurality", "gestation_weeks"]

def get_source_query(step):
    train_years = (1980,2004)
    eval_years  = (2005,2007)
    test_years  = (2008, 2008)
    
    query = """
    SELECT
      weight_pounds,
      is_male,
      mother_age,
      mother_race,
      plurality,
      gestation_weeks
    FROM
      publicdata.samples.natality
    WHERE year BETWEEN {} AND {}
      AND weight_pounds > 0
      AND mother_age > 0
      AND plurality > 0
      AND gestation_weeks > 0
      AND month > 0
    LIMIT 100
    """
    
    if step == 'eval':
        source_query = query.format(*eval_years)
    elif step == 'test':
        source_query = query.format(*test_years)
    elif step == "train":
        source_query = query.format(*train_years)
    else:
        raise ValueError("step value of {} must be one of 'train', 'eval', 'test'".format(step))
    return source_query


def read_from_bq(pipeline, step):
    source_query = get_source_query(step)
    raw_data = (
        pipeline
        | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(
            beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
        | '{} - Clean up Data'.format(step) >> beam.Map(prep_bq_row)
    )
    
    raw_metadata = create_raw_metadata()
    raw_dataset = (raw_data, raw_metadata)
    return raw_dataset


def prep_bq_row(bq_row):
    # modify opaque numeric race code into human-readable data
    races = dict(zip([1,2,3,4,5,6,7,18,28,39,48],
                     ['White', 'Black', 'American Indian', 'Chinese', 
                      'Japanese', 'Hawaiian', 'Filipino',
                      'Asian Indian', 'Korean', 'Samaon', 'Vietnamese']))
    result = {} 
    
    for feature_name in bq_row.keys():
        result[feature_name] = str(bq_row[feature_name])

    if 'mother_race' in bq_row and bq_row['mother_race'] in races:
        result['mother_race'] = races[bq_row['mother_race']]
    else:
        result['mother_race'] = 'Unknown'

    return result


def to_csv_string(bq_dict):
    FEATURES = ["weight_pounds", "is_male", "mother_age", "mother_race", "plurality", "gestation_weeks"]
    output = []
    for f in FEATURES:
        output.append(bq_dict[f])
        
    return ",".join(output)


def write_csv(transformed_data, location, step): 
    FEATURES = ["weight_pounds", "is_male", "mother_age", "mother_race", "plurality", "gestation_weeks"]
    (
        transformed_data 
        | '{} - Write Transformed Data'.format(step) >> beam.io.WriteToText(
            file_path_prefix=os.path.join(location, step, step),
            file_name_suffix=".csv",
            header=",".join(FEATURES)
        )
    )
    

### Define Pipeline

In [28]:
def run_transformation_pipeline(args):
    
    pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
    
    runner = args['runner']
    transformed_data_location = args['transformed_data_location']
    transform_artifact_location = args['transform_artifact_location']
    temporary_dir = args['temporary_dir']
    
    print("Sink transformed data files location: {}".format(transformed_data_location))
    print("Sink transform artifact location: {}".format(transform_artifact_location))
    print("Temporary directory: {}".format(temporary_dir))
    print("Runner: {}".format(runner))

    with beam.Pipeline(runner, options=pipeline_options) as pipeline:            
        for step in ("train", "eval", "test"):
            source_query = get_source_query(step)
            print('query built')
            data = (
                pipeline
                | '{} - Read Data from BigQuery'.format(step) >> beam.io.Read(
                    beam.io.BigQuerySource(query=source_query, use_standard_sql=True))
                | '{} - Clean up Data'.format(step) >> beam.Map(prep_bq_row)
                | '{} - Prepare for csv'.format(step) >> beam.Map(to_csv_string)
                | '{} - Write Transformed Data'.format(step) >> beam.io.WriteToText(
                    file_path_prefix=os.path.join(transformed_data_location, step, step),
                    file_name_suffix=".csv")
            )
            # Write transformed train data to sink in csv
            # write_csv(data, transformed_data_location, step)

In [29]:
import os
from datetime import datetime

RUN_LOCAL = False
PROJECT = 'dogwood-actor-255317'
BUCKET = 'mkneier-gcs-cloudml'
REGION = 'us-west1'
ROOT_DIR = 'bwt_data'

OUTPUT_DIR = ROOT_DIR if RUN_LOCAL else "gs://{}/{}".format(BUCKET, ROOT_DIR)
TRANSFORM_ARTIFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')

runner = 'DirectRunner' if RUN_LOCAL == True else 'DataflowRunner'

job_name = 'preprocess-babweight-data-tft-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))

args = {
    'job_name': job_name,
    'runner': runner,
    'transformed_data_location':  TRANSFORMED_DATA_DIR,
    'transform_artifact_location':  TRANSFORM_ARTIFACTS_DIR,
    'temporary_dir': TEMP_DIR,
    'debug':False,
    
    'project': PROJECT,
    'region': REGION,
    'staging_location': os.path.join(OUTPUT_DIR, 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'worker_machine_type': 'n1-standard-1',
}

In [None]:
run_transformation_pipeline(args)