<h1> Preprocessing using Dataflow </h1>

This notebook illustrates:
<ol>
<li> Creating datasets for Machine Learning using Dataflow
</ol>
<p>
While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

Each learning objective will correspond to a __#TODO__ in this student lab notebook -- try to complete this notebook first and then review the [solution notebook](https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive2/end_to_end_ml/solutions/preproc.ipynb).


In [1]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [2]:
!pip install --user google-cloud-bigquery==1.25.0

Collecting google-cloud-bigquery==1.25.0
  Downloading google_cloud_bigquery-1.25.0-py2.py3-none-any.whl (169 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m169.1/169.1 KB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting google-cloud-core<2.0dev,>=1.1.0
  Downloading google_cloud_core-1.7.2-py2.py3-none-any.whl (28 kB)
Collecting google-auth<2.0dev,>=1.9.0
  Downloading google_auth-1.35.0-py2.py3-none-any.whl (152 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.9/152.9 KB[0m [31m703.5 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting google-resumable-media<0.6dev,>=0.5.0
  Downloading google_resumable_media-0.5.1-py2.py3-none-any.whl (38 kB)
Collecting google-api-core<2.0dev,>=1.15.0
  Downloading google_api_core-1.31.5-py2.py3-none-any.whl (93 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.3/93.3 KB[0m [31m306.1 kB/s[0m eta [36m0:00:00[0m00:01[0m
Installing collected pac

Kindly ignore the deprecation warnings and incompatibility errors related to google-cloud-storage.

In [3]:
!pip install --user apache-beam[interactive]==2.24.0

Collecting apache-beam[interactive]==2.24.0
  Downloading apache_beam-2.24.0-cp37-cp37m-manylinux2010_x86_64.whl (8.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.5/8.5 MB[0m [31m56.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting mock<3.0.0,>=1.0.1
  Downloading mock-2.0.0-py2.py3-none-any.whl (56 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.8/56.8 KB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting oauth2client<4,>=2.0.1
  Downloading oauth2client-3.0.0.tar.gz (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.2/77.2 KB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting typing-extensions<3.8.0,>=3.7.0
  Downloading typing_extensions-3.7.4.3-py3-none-any.whl (22 kB)
Collecting fastavro<0.24,>=0.21.4
  Downloading fastavro-0.23.6-cp37-cp37m-manylinux2010_x86_64.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

**NOTE**: In the output of the above cell you can safely ignore any **WARNINGS** (in Yellow text) related to: "hdfscli", "hdfscli-avro", "pbr", "fastavro", "gen_client" and **ERRORS** (in Red text) related to the related to: "witwidget-gpu", "fairing" etc.

If you get any related errors or warnings mentioned above please rerun the above cell.

**Note**: Restart your kernel to use updated packages.

Make sure the Dataflow API is enabled by going to this [link](https://console.developers.google.com/apis/api/dataflow.googleapis.com). Ensure that you've installed Beam by importing it and printing the version number.

In [1]:
import apache_beam as beam
print(beam.__version__)

2.24.0


In [2]:
import tensorflow as tf
print("TensorFlow version: ", tf.version.VERSION)

TensorFlow version:  2.6.3


You may receive a `UserWarning` about the Apache Beam SDK for Python 3 as not being yet fully supported. Don't worry about this.

In [3]:
# change these to try this notebook out
BUCKET = 'qwiklabs-gcp-00-4fef3d844de3'
PROJECT = 'qwiklabs-gcp-00-4fef3d844de3'
REGION = 'australia-southeast1'

In [4]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [5]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

<h2> Save the query from earlier </h2>

The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that.

In [6]:
# Create SQL query using natality data after the year 2000
query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
"""

In [7]:
# Call BigQuery and examine in dataframe
from google.cloud import bigquery
df = bigquery.Client().query(query + " LIMIT 100").to_dataframe()
df.head(7)

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,hashmonth
0,7.063611,True,32,1,37.0,7108882242435606404
1,4.687028,True,30,3,33.0,-7170969733900686954
2,7.561856,True,20,1,39.0,6392072535155213407
3,7.561856,True,31,1,37.0,-2126480030009879160
4,7.312733,True,32,1,40.0,3408502330831153141
5,7.627994,False,30,1,40.0,-2126480030009879160
6,7.251004,True,33,1,37.0,3408502330831153141


<h2> Create ML dataset using Dataflow </h2>
Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

Instead of using Beam/Dataflow, we have three other options:
* Use Cloud Dataprep to visually author a Dataflow pipeline. Cloud Dataprep also allows us to explore the data, so we could have avoided much of the handcoding of Python/Seaborn calls above as well!
* Read from BigQuery directly using TensorFlow.
* Use the BigQuery console (http://bigquery.cloud.google.com) to run a Query and save the result as a CSV file. For larger datasets, you may have to select the option to "allow large results" and save the result into a CSV file on Google Cloud Storage. 

<p>

However, in this case, we want to do some preprocessing, modifying data so that we can simulate what is known if no *ultrasound* has been performed. If we didn't need preprocessing, we could have used the web console. Also, we prefer to script it out rather than run queries on the user interface, so we are using Cloud Dataflow for the preprocessing.

Note that after we launch this, the actual processing is happening on the cloud. Go to the GCP web console/Dataflow section and monitor the running job. It took about 20 minutes to complete...
<p>
If you wish to continue without doing this step, you can copy the preprocessed output:
<pre>
gsutil -m cp -r gs://cloud-training-demos/babyweight/preproc gs://your-bucket/
</pre>

**Lab Task #1:** Creating datasets for ML using Dataflow

In [15]:
 # TODO 1
 # TODO -- Your code here.
 def to_csv(rowdict):
    import copy, hashlib # for key
    
    no_ultrasound = copy.deepcopy(rowdict)
    w_ultrasound = copy.deepcopy(rowdict)
    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')

    # Change the plurality column to strings
    w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]
    no_ultrasound['is_male'] = 'Unknown'
    no_ultrasound['plurality'] = 'Multiple(2)+' if rowdict['plurality'] > 1 else 'Single(1)'

    for result in [no_ultrasound, w_ultrasound]:
      data = ','.join(str(result[k]) if k in result else 'None' for k in CSV_COLUMNS)
      key = hashlib.sha224(data.encode('utf-8')).hexdigest()  
      # hash the columns to form a key # no needed here - but really useful as this will be run across multiple nodes
      # and having a unique value allows to identify where it comes from
      yield str('{},{}'.format(data, key))


def preprocess(in_test_mode: bool):
  import shutil, os, subprocess, datetime
  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
    
  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'num_workers': 4,
      'max_num_workers': 5
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  RUNNER = 'DirectRunner' if in_test_mode else 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
  query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query) if step == 'train' else \
        'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)
    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
  return
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-babyweight-features-220228-085403 ... hang on




The above step will take 20+ minutes. Go to the GCP web console, navigate to the Dataflow section and <b>wait for the job to finish</b> before you run the following step.

Please re-run the above cell if you get a <b>failed status</b> of the job in the dataflow UI console.

In [14]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*

CommandException: One or more URLs matched no objects.


CalledProcessError: Command 'b'gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*\n'' returned non-zero exit status 1.

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License