### 오른쪽 상단에 Python 3이라고 되어 있더라도, 다시 클릭해서 python3을 선택해준다.
### 안그러면, import할때 에러 난다.

# Preprocessing Using Dataflow

In [2]:
#Ensure that we have Apache Beam version installed.
!pip freeze | grep apache-beam || sudo pip install apache-beam[gcp]==2.12.0

apache-beam==2.12.0


In [3]:
import tensorflow as tf
import apache_beam as beam
import shutil
import os
print(tf.__version__)

1.15.0


In [5]:
PROJECT = "qwiklabs-gcp-ml-49b827b781ab"  # Replace with your PROJECT
BUCKET = "qwiklabs-gcp-ml-49b827b781ab"  # Replace with your BUCKET
REGION = "us-central1"            # Choose an available region for Cloud MLE
TFVERSION = "1.14"                # TF version for CMLE to use

In [6]:
import os
os.environ["BUCKET"] = BUCKET
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION

In [7]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
    gsutil mb -l ${REGION} gs://${BUCKET}
fi

# Save the query from earlier

In [8]:
# Create SQL query using natality data after the year 2000
query_string = """
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
    publicdata.samples.natality
WHERE
    year > 2000
"""

In [9]:
# Call BigQuery and examine in dataframe
from google.cloud import bigquery
bq = bigquery.Client(project = PROJECT)

df = bq.query(query_string + "LIMIT 100").to_dataframe()
df.head()

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,hashmonth
0,7.063611,True,32,1,37.0,7108882242435606404
1,4.687028,True,30,3,33.0,7170969733900686954
2,7.561856,True,20,1,39.0,6392072535155213407
3,7.561856,True,31,1,37.0,2126480030009879160
4,7.312733,True,32,1,40.0,3408502330831153141


# Create ML dataset using Dataflow

In [10]:
import apache_beam as beam
import datetime, os

def to_csv(rowdict):
    # Pull columns from BQ and create a line
    import hashlib
    import copy
    CSV_COLUMNS = "weight_pounds,is_male,mother_age,plurality,gestation_weeks".split(',')

    # Create synthetic data where we assume that no ultrasound has been performed
    # and so we don"t know sex of the baby. Let"s assume that we can tell the difference
    # between single and multiple, but that the errors rates in determining exact number
    # is difficult in the absence of an ultrasound.
    no_ultrasound = copy.deepcopy(rowdict)
    w_ultrasound = copy.deepcopy(rowdict)

    no_ultrasound["is_male"] = "Unknown"
    if rowdict["plurality"] > 1:
        no_ultrasound["plurality"] = "Multiple(2+)"
    else:
        no_ultrasound["plurality"] = "Single(1)"

    # Change the plurality column to strings
    w_ultrasound["plurality"] = ["Single(1)", "Twins(2)", "Triplets(3)", "Quadruplets(4)", "Quintuplets(5)"][rowdict["plurality"] - 1]

    # Write out two rows for each input row, one with ultrasound and one without
    for result in [no_ultrasound, w_ultrasound]:
        data = ','.join([str(result[k]) if k in result else "None" for k in CSV_COLUMNS])
        yield str("{}".format(data))
  
def preprocess(in_test_mode):
    import shutil, os, subprocess
    job_name = "preprocess-babyweight-features" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S")

    if in_test_mode:
        print("Launching local job ... hang on")
        OUTPUT_DIR = "./preproc"
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print("Launching Dataflow job {} ... hang on".format(job_name))
        OUTPUT_DIR = "gs://{0}/babyweight/preproc/".format(BUCKET)
        try:
            subprocess.check_call("gsutil -m rm -r {}".format(OUTPUT_DIR).split())
        except:
            pass

    options = {
        "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
        "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
        "job_name": job_name,
        "project": PROJECT,
        "teardown_policy": "TEARDOWN_ALWAYS",
        "no_save_main_session": True
    }
    opts = beam.pipeline.PipelineOptions(flags = [], **options)
    if in_test_mode:
        RUNNER = "DirectRunner"
    else:
        RUNNER = "DataflowRunner"
        
    p = beam.Pipeline(RUNNER, options = opts)
    query = """
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    ABS(FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING)))) AS hashmonth
FROM
    publicdata.samples.natality
WHERE
    year > 2000
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0
    AND month > 0
"""

    if in_test_mode:
        query = query + " LIMIT 100" 

    for step in ["train", "eval"]:
        if step == "train":
            selquery = "SELECT * FROM ({}) WHERE MOD(hashmonth, 100) < 80".format(query)
        elif step == "eval":
            selquery = "SELECT * FROM ({}) WHERE MOD(hashmonth, 100) >= 80 AND MOD(hashmonth, 100) < 90".format(query)
        else: 
            selquery = "SELECT * FROM ({}) WHERE MOD(hashmonth, 100) >= 90".format(query)
        (p 
         | "{}_read".format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
         | "{}_csv".format(step) >> beam.FlatMap(to_csv)
         | "{}_out".format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, "{}.csv".format(step))))
        )

    job = p.run()
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-babyweight-features-191206-120841 ... hang on


<img src="dataflow_20191206_1.png" alt="Drawing" style="width: 500px;"/>

<img src="dataflow_20191206_2.png" alt="Drawing" style="width: 800px;"/>

# View results

In [11]:
!gsutil ls gs://$BUCKET/babyweight/preproc/*-00000*

gs://qwiklabs-gcp-ml-49b827b781ab/babyweight/preproc/eval.csv-00000-of-00009
