<h1> Preprocessing using Dataflow </h1>

This notebook illustrates:
<ol>
<li> Creating datasets for Machine Learning using Dataflow
</ol>
<p>
While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

In [None]:
import apache_beam as beam
from google.cloud import bigquery
import datetime
print(beam.__version__)

In [None]:
# change these to try this notebook out
ACCOUNT = 'student-02-b97575b6b46d@qwiklabs.net'
SAC = 'qwik-jupyter-notebook-sac-f'
SAC_KEY_DESTINATION = '/media/mujahid7292/Data/Gcloud_Tem_SAC'
BUCKET = 'bucket-qwiklabs-gcp-02-3fc87b630759'
PROJECT = 'qwiklabs-gcp-02-3fc87b630759'
REGION = 'us-central1'

In [None]:
import os
os.environ['ACCOUNT'] = ACCOUNT
os.environ['SAC'] = SAC
os.environ['SAC_KEY_DESTINATION'] = SAC_KEY_DESTINATION
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

# Set Google Application Credentials

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='{}/{}.json'.format(SAC_KEY_DESTINATION,SAC)

Check Whether Google Application Credential Was Set Successfully Outside Virtual Environment

In [None]:
%%bash
set | grep GOOGLE_APPLICATION_CREDENTIALS 

In [None]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

<h2> Save the query from earlier </h2>

The data is <a href='https://console.cloud.google.com/bigquery?GK=publicdata&page=table&t=natality&d=samples&p=publicdata&redirect_from_classic=true&project=corona-patient-finder&folder=&organizationId='>natality data</a>  (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that.

In [None]:
# Create SQL query using natality data after the year 2000
query="""
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_Weeks,
    FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(MONTH AS STRING))) AS hashmonth
FROM
    publicdata.samples.natality
WHERE year > 2000
"""

In [None]:
# Call Bigquery and examine in dataframe
df = bigquery.Client().query(query=query + "LIMIT 100").to_dataframe()
df.head()

In [None]:
df.size

In [None]:
df.to_csv('data.csv')

<h2> Create ML dataset using Dataflow </h2>
Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

Instead of using Beam/Dataflow, I had three other options:

* Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows me to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!
* Read from BigQuery directly using TensorFlow.
* Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to "allow large results" and save the result into a CSV file on Google Cloud Storage. 

<p>

However, in this case, I want to do some preprocessing, modifying data so that we can simulate what is known if no ultrasound has been performed. If I didn't need preprocessing, I could have used the web console. Also, I prefer to script it out rather than run queries on the user interface, so I am using Cloud Dataflow for the preprocessing.

Note that after you launch this, the actual processing is happening on the cloud. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about 20 minutes for me.
<p>
If you wish to continue without doing this step, you can copy my preprocessed output:
<pre>
gsutil -m cp -r gs://cloud-training-demos/babyweight/preproc gs://your-bucket/
</pre>

In [None]:
def to_csv(rowdict):
    """
    This function will pull a column from csv and will create a line.
    """
    import hashlib, copy
    
    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')
    
    # Create synthetic data, where we assume that no ultrasound has been performed
    # and so we don't know the sex of the baby. Let's assume that we can tell the 
    # difference between single and multiple baby without ultrasound, but the error
    # rate in determining exact number is difficult in the absence of an ultrasound.
    no_ultrasound = copy.deepcopy(rowdict)
    w_ultrasound = copy.deepcopy(rowdict)
    
    # Change the value of 'is_male' & 'plurality' column of no_ultrasound
    no_ultrasound['is_male'] = 'Unknown' # Means we don't have ultrasound data
    if rowdict['plurality']>1:
        no_ultrasound['plurality'] = 'Multiple(2+)' # We don't know how many, but know that more than one
    else:
        no_ultrasound['plurality'] = 'Single(1)'
    
    # Change the `plurality` column of w_ultrasound to string value
    w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]
    
    # Write out two rows for each input row, one with ultrasound and one without
    for result in [no_ultrasound, w_ultrasound]:
        data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
        key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # hash the columns to form a key
        yield str('{},{}'.format(data, key))

In [None]:
rowdict = {
    'weight_pounds':7,
    'is_male':True,
    'mother_age':28,
    'plurality':1,
    'gestation_weeks':47
}

In [None]:
data_gen = to_csv(rowdict)

In [None]:
next(data_gen)

In [None]:
def preprocess(in_test_mode):
    """
    """
    import shutil, os, subprocess
    
    job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%Y%M%D-%H%M%S')
    
    if in_test_mode:
        print('Launching local job.....please hang on')
        OUTPUT_DIR = './preproc'
        shutil.rmtree(path=OUTPUT_DIR, ignore_errors=True)
        os.makedirs(name=OUTPUT_DIR)
    else:
        print('Launching Dataflow job - {} ....hang on.'.format(job_name))
        OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
        except:
            pass
        
    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name':job_name,
        'project':PROJECT,
        'region':REGION,
        'teardown_policy':'TEARDOWN_ALWAYS',
        'no_save_main_session':True,
        'num_workers':4,
        'max_num_workers':5
    }
    
    # Create beam PipeLine Option object by passing above pipeline config dictionary.
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    
    # Choose the type of runner
    if in_test_mode:
        RUNNER = 'DirectRunner'
    else:
        RUNNER = 'DataflowRunner'
    
    # Now create the pipeline object using the appropriate runner
    pipeline = beam.Pipeline(runner=RUNNER, options=opts)
    
    # Now write our SQL query
    query = """
    SELECT
        weight_pounds,
        is_male,
        mother_age,
        plurality,
        gestation_weeks,
        FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(MONTH AS STRING))) AS hashmonth
    FROM
        publicdata.samples.natality
    WHERE year > 2000
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0
    AND month > 0
    """
    
    if in_test_mode:
        query = query + " LIMIT 100"
    
    for step in ['train', 'eval']:
        if step == 'train':
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
            # This above query will take 75% data as training data
        else:
            selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
            
        # Now create our pipeline
        (pipeline
         | '{}_read'.format(step) >> beam.io.Read(source=beam.io.BigQuerySource(query=selquery, use_standard_sql = True))
         | '{}_csv'.format(step) >> beam.FlatMap(fn=to_csv)
         | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}_csv'.format(step))))
        )
        
    # Now run our job
    job = pipeline.run()
    if in_test_mode:
        job.wait_until_finish()
        print('Done...')

In [None]:
preprocess(in_test_mode=True)

The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and <b>wait for the job to finish</b> before you run the following step.

In [None]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*