<h1> Dataflowを使用した前処理 </h1>

このノートブックは次のことを示しています。
<ol>
<li> Dataflowを使用した機械学習用のデータセットの作成
</ol>
<p>
Pandasは実験には適していますが、ワークフローの運用化には、ApacheBeamで前処理を行うことをお勧めします。 Apache Beamはストリーミングも可能にするため、これは飛行中のデータを前処理する必要がある場合にも役立ちます。

各学習目標は、[学生ラボノートブック]（https://github.com/GoogleCloudPlatform/training-data-analyst/tree/master/courses/machine_learning/deepdive2/end_to_end_ml/labs/preproc）の__＃TODO__に対応します。 ipynb）-このソリューションノートブックを確認する前に、まずそのノートブックを完成させてください


In [1]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [2]:
!pip install --user google-cloud-bigquery==1.25.0

Collecting google-cloud-bigquery==1.25.0
  Downloading google_cloud_bigquery-1.25.0-py2.py3-none-any.whl (169 kB)
     |████████████████████████████████| 169 kB 8.2 MB/s            
[?25hCollecting google-resumable-media<0.6dev,>=0.5.0
  Downloading google_resumable_media-0.5.1-py2.py3-none-any.whl (38 kB)
Collecting google-auth<2.0dev,>=1.9.0
  Downloading google_auth-1.35.0-py2.py3-none-any.whl (152 kB)
     |████████████████████████████████| 152 kB 65.4 MB/s            
[?25hCollecting google-api-core<2.0dev,>=1.15.0
  Downloading google_api_core-1.31.4-py2.py3-none-any.whl (93 kB)
     |████████████████████████████████| 93 kB 2.6 MB/s             
Collecting google-cloud-core<2.0dev,>=1.1.0
  Downloading google_cloud_core-1.7.2-py2.py3-none-any.whl (28 kB)
Installing collected packages: google-auth, google-api-core, google-resumable-media, google-cloud-core, google-cloud-bigquery
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are 

google-cloud-storageに関連する非推奨の警告と非互換性エラーは無視してください。

In [1]:
!pip install --user apache-beam[interactive]==2.24.0

**注**：上記のセルの出力では、「hdfscli」、「hdfscli-avro」、「pbr」、「fastavro」、「」に関連する**警告**（黄色のテキスト）は無視しても問題ありません。 「witwidget-gpu」、「fairing」などに関連する「gen_client」および**エラー**（赤いテキスト）。

上記の関連するエラーまたは警告が表示された場合は、上記のセルを再実行してください。

**注**：更新されたパッケージを使用するには、カーネルを再起動してください。

この[リンク]（https://console.developers.google.com/apis/api/dataflow.googleapis.com）にアクセスして、DataflowAPIが有効になっていることを確認してください。 Beamをインポートし、バージョン番号を印刷して、Beamがインストールされていることを確認します。

In [1]:
import apache_beam as beam
print(beam.__version__)

2.34.0


In [2]:
import tensorflow as tf
print("TensorFlow version: ",tf.version.VERSION)


TensorFlow version:  2.6.2


Python3用のApacheBeam SDKについては、まだ完全にはサポートされていないため、「UserWarning」が表示される場合があります。これについては心配しないでください。

In [3]:
# これらを変更して、このノートブックを試してみてください
BUCKET = 'qwiklabs-gcp-02-81acbdc9387b'
PROJECT = 'qwiklabs-gcp-02-81acbdc9387b'
REGION = 'us-central1'

In [4]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [5]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
  gsutil mb -l ${REGION} gs://${BUCKET}
fi

<h2> 以前のクエリを保存します</h2>

データは出生率データ（米国での出生記録）です。私の目標は、妊娠と赤ちゃんの母親に関するいくつかの要因を考慮して、赤ちゃんの体重を予測することです。後で、データをトレーニングデータセットと評価データセットに分割する必要があります。そのために年月のハッシュが使用されます。

In [7]:
# 2000年以降の出生率データを使用してSQLクエリを作成する
query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
"""

In [8]:
# BigQueryを呼び出して、データフレームで調べます
from google.cloud import bigquery
df = bigquery.Client().query(query + " LIMIT 100").to_dataframe()
df.head()

Unnamed: 0,weight_pounds,is_male,mother_age,plurality,gestation_weeks,hashmonth
0,6.68662,True,18,1,43,8904940584331855459
1,9.360828,True,32,1,41,1088037545023002395
2,8.437091,False,30,1,39,5896567601480310696
3,6.124442,False,24,1,40,-6244544205302024223
4,7.12534,False,26,1,41,-8029892925374153452


<h2> Dataflowを使用してMLデータセットを作成する</h2>
Cloud Dataflowを使用してBigQueryデータを読み込み、前処理を行って、CSVファイルとして書き出しましょう。

Beam / Dataflowを使用する代わりに、他に3つのオプションがありました。

* Cloud Dataprepを使用して、Dataflowパイプラインを視覚的に作成します。 Cloud Dataprepを使用すると、データを探索することもできるため、上記のPython / Seaborn呼び出しのハンドコーディングの多くを回避することもできます。
* TensorFlowを使用してBigQueryから直接読み取ります。
* BigQueryコンソール（http://bigquery.cloud.google.com）を使用してクエリを実行し、結果をCSVファイルとして保存します。より大きなデータセットの場合、「大きな結果を許可する」オプションを選択し、その結果をGoogle CloudStorageのCSVファイルに保存する必要がある場合があります。
<p>

ただし、この場合、超音波が実行されていない場合に既知のことをシミュレートできるように、データを変更して前処理を行いたいと思います。前処理が必要なければ、Webコンソールを使用できたはずです。また、ユーザーインターフェイスでクエリを実行するよりも、スクリプトを作成する方が好きなので、前処理にCloudDataflowを使用しています。

これを起動した後、実際の処理はクラウド上で行われることに注意してください。 GCP Webコンソールの[データフロー]セクションに移動し、実行中のジョブを監視します。私にとっては約20分かかりました。
<p>
この手順を実行せずに続行する場合は、前処理された出力をコピーできます。
<pre>
gsutil -m cp -r gs://cloud-training-demos/babyweight/preproc gs://your-bucket/
</pre>

In [9]:
# TODO 1
import datetime, os

def to_csv(rowdict):
  # BQから列を引き出し、線を作成します
  import hashlib
  import copy
  CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks'.split(',')

  # 超音波が実行されていないと仮定して合成データを作成します
  # だから私たちは赤ちゃんの性別を知りません。違いがわかるとしましょう
  # 単一と複数の間ですが、正確な数を決定する際のエラー率
  # 超音波がないと難しいです。
  no_ultrasound = copy.deepcopy(rowdict)
  w_ultrasound = copy.deepcopy(rowdict)

  no_ultrasound['is_male'] = 'Unknown'
  if rowdict['plurality'] > 1:
    no_ultrasound['plurality'] = 'Multiple(2+)'
  else:
    no_ultrasound['plurality'] = 'Single(1)'

  # 複数の列を文字列に変更します
  w_ultrasound['plurality'] = ['Single(1)', 'Twins(2)', 'Triplets(3)', 'Quadruplets(4)', 'Quintuplets(5)'][rowdict['plurality'] - 1]

  # 入力行ごとに2つの行を書き出します。1つは超音波あり、もう1つは超音波なしです。
  for result in [no_ultrasound, w_ultrasound]:
    data = ','.join([str(result[k]) if k in result else 'None' for k in CSV_COLUMNS])
    key = hashlib.sha224(data.encode('utf-8')).hexdigest()  # 列をハッシュしてキーを形成します
    yield str('{},{}'.format(data, key))
  
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/babyweight/preproc/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'num_workers': 4,
      'max_num_workers': 5
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
  query = """
SELECT
  weight_pounds,
  is_male,
  mother_age,
  plurality,
  gestation_weeks,
  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth
FROM
  publicdata.samples.natality
WHERE year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
AND month > 0
    """

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-babyweight-features-211127-035204 ... hang on


Removing gs://qwiklabs-gcp-02-81acbdc9387b/babyweight/preproc/tmp/preprocess-babyweight-features-211127-034607.1637984773.429562/#1637984783896328...
Removing gs://qwiklabs-gcp-02-81acbdc9387b/babyweight/preproc/tmp/preprocess-babyweight-features-211127-034607.1637984773.429562/dax-tmp-2021-11-26_19_46_16-13056023304912991580-S01-0-64f2da94ab634d58/#1637984784083537...
Removing gs://qwiklabs-gcp-02-81acbdc9387b/babyweight/preproc/tmp/preprocess-babyweight-features-211127-034607.1637984773.429562/dax-tmp-2021-11-26_19_46_16-13056023304912991580-S01-1-ab08b1a0616af2e5/#1637984783709002...
Removing gs://qwiklabs-gcp-02-81acbdc9387b/babyweight/preproc/tmp/preprocess-babyweight-features-211127-034607.1637984773.429562/dax-tmp-2021-11-26_19_46_16-13056023304912991580-S06-0-e95ecae89410e0d4/#1637984784206835...
Removing gs://qwiklabs-gcp-02-81acbdc9387b/babyweight/preproc/tmp/preprocess-babyweight-features-211127-034607.1637984773.429562/dax-tmp-2021-11-26_19_46_16-13056023304912991580-S23-0-

  temp_location = pcoll.pipeline.options.view_as(


上記の手順には20分以上かかります。次の手順を実行する前に、GCP Webコンソールに移動し、[データフロー]セクションに移動して、<b>ジョブが終了するのを待ちます</b>。

データフローUIコンソールでジョブの<b>失敗ステータス</ b>が表示された場合は、上記のセルを再実行してください。

In [10]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*

CommandException: One or more URLs matched no objects.


CalledProcessError: Command 'b'gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*\n'' returned non-zero exit status 1.

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License