## Overview

The pipeline will 

1. Read Titanic data in Google Cloud Storage
2. Using Dataflow and ingest into BigQuery
3. Train a Logistic regression model to classify survived person. 
4. Evaluate the model 

### Dataset
GCSに以下のバケットとフォルダを作成して、CSVファイルを配置してください。（このノートブックと同じディレクトリにある加工済みのtitanicデータ "train_processed.csv"です。追ってオリジナルのtitanicデータでもパイプラインが実行できるように修正予定）

gs://session11/titanic/train_processed.csv

Google collabo での実行を想定しています。
初期設定のあたりはこちらのノートブックを参考にしました。
- https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_bqml_text.ipynb


In [None]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

if os.getenv("IS_TESTING"):
    ! touch /builder/home/.local/lib/python3.9/site-packages/google_api_core-2.7.1.dist-info/METADATA

### Install additional packages


In [None]:
! pip3 install {USER_FLAG} --upgrade "apache-beam[gcp]==2.36.0"
! pip3 install {USER_FLAG} --upgrade "kfp==1.8.2"
! pip3 install {USER_FLAG} --upgrade "google-cloud-aiplatform==1.10.0"
! pip3 install {USER_FLAG} --upgrade "google_cloud_pipeline_components==1.0.1"

### Restart the kernel

パッケージのインストール後にカーネルを再起動する。

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Authenticate your Google Cloud account

Google collaboを使用する場合は認証が必要。

In [None]:
import os
import sys

IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### 環境変数の設定


In [None]:
import google.cloud.aiplatform as vertex_ai

PROJECT_NAME = "ml-session"  # @param {type:"string"}
BUCKET_KEYWORD = "session11"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}
ROOT_BUCKET = "gs://" + BUCKET_KEYWORD
PROJECT_NAME="ml-session"
BQ_DATASET = "session11"
BQ_TRAINING_TABLE = "train"
BQ_TESTING_TABLE = "test"
BQ_ML_MODEL = "model_titanic"
JOB_NAME = "titanic"

### Initialize client

In [None]:
vertex_ai.init(project=PROJECT_NAME, location=REGION, staging_bucket=ROOT_BUCKET)

## Pipeline formalization

### BQML components

1) BigQuery で データセット session11、空のテーブル train, testを作成

2) Dataflow でGCSからCSVファイルをBigQuery の上記２テーブルへロードする

3) BigQuery MLでモデル作成

4) 予測

In [None]:
# 現時点のこのノートブックではtestデータ "gs://session11/titanic/test_processed.csv" は使用しません。（今後改良して使用する予定）
# ここはGCSからBigQuery へロードするための処理
# https://dev.classmethod.jp/articles/cloud-dataflow_gcs2bq_python/　からコピーした

%%writefile ingest_pipeline.py
import logging
import csv
from datetime import datetime, timezone, timedelta

import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions


class BeamOptions:
    def __init__(self,runner):
        self.options = PipelineOptions()
        # GoogleCloud Option
        self.gcloud_options = self.options.view_as(GoogleCloudOptions)
        self.gcloud_options.job_name = "loadtobq"
        self.gcloud_options.project = "ml-session"
        self.gcloud_options.temp_location = "gs://session11/tmp" # 処理する際にGCSに一時ファイルを作成するのでその保管先のGCS URI
        self.gcloud_options.region = "us-central1"
        # Setup Option
        self.options.view_as(SetupOptions).save_main_session = True
        # Standard Option
        self.options.view_as(StandardOptions).runner = runner



def parse_file(element):
   #　元ファイルのヘッダーは削除すみ
    for line in csv.reader(
        [element],
        quotechar='"',
        delimiter=",",
        quoting=csv.QUOTE_ALL,
        skipinitialspace=True,
    ):
        return line


def convert_dict_train(element):
    return {
        "PassengerId": element[0],
        "Survived": element[1],
        "Pclass": element[2],
        "Sex": element[3],
        "Age": element[4],
        "Fare": element[5],
        "Embarked": element[6],
        "Title": element[7],
        "Age_Class": element[8],
        "IsAlone": element[9],
    }

def convert_dict_test(element):
    return {
        "PassengerId": element[0],
        "Pclass": element[1],
        "Sex": element[2],
        "Age": element[3],
        "Fare": element[4],
        "Embarked": element[5],
        "Title": element[6],
        "Age_Class": element[7],
        "IsAlone": element[8],
    }

def get_bigquery_schema():
    """
    A function to get the BigQuery schema.
    Returns:
        A list of BigQuery schema.
    """

    table_schema = bigquery.TableSchema()
    columns = (
        ('PassengerId', 'integer', 'nullable'),
        ('Survived', 'integer', 'nullable'),
        ('Pclass', 'integer', 'nullable'),
        ('Sex', 'integer', 'nullable'),
        ('Age', 'integer', 'nullable'),
        ('Fare', 'integer', 'nullable'),
        ('Embarked', 'integer', 'nullable'),
        ('Title', 'integer', 'nullable'),
        ('Age_Class', 'integer', 'nullable'),
        ('IsAlone', 'integer', 'nullable'),
        )

    for column in columns:
        column_schema = bigquery.TableFieldSchema()
        column_schema.name = column[0]
        column_schema.type = column[1]
        column_schema.mode = column[2]
        table_schema.fields.append(column_schema)

    return table_schema


def run(flag="train", runner="DataflowRunner"):

    pipeline = beam.Pipeline(options=BeamOptions(runner).options)
    if flag == "train":
        gcs_uri="gs://session11/titanic/train_processed.csv"
        # gcs_uri="gs://session11/titanic/train.csv"
        dest_table="train"
    else:
        gcs_uri="gs://session11/titanic/test_processed.csv"
        dest_table="test"

    # GCSからファイル読み込み
    # df = beam.dataframe.io.read_csv(gcs_uri)
    # beam.dataframe.io.to_csv(df, "gs://session11/titanic/train_processed.csv")
    
    raw_datas = pipeline | "Read from GCS" >> beam.io.ReadFromText(gcs_uri)
    # 変換処理
    if flag == "train":
        tran_datas = (
            raw_datas
            | "transform csv" >> beam.Map(parse_file)
            | "transform dict" >> beam.Map(convert_dict_train)
        )
    else:
      tran_datas = (
            raw_datas
            | "transform csv" >> beam.Map(parse_file)
            | "transform dict" >> beam.Map(convert_dict_test)
        )

    # BigQueryへデータ登録
    tran_datas | "Write to BigQuery" >> WriteToBigQuery(
        project="ml-session",
        dataset="session11",
        table=dest_table,
        schema=get_bigquery_schema(),
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=BigQueryDisposition.WRITE_TRUNCATE
    )
    job = pipeline.run()


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run(
        flag="train",
        # runner="DirectRunner"  # ローカル実行
        runner="DataflowRunner",  # Cloud Dataflow実行
    )

In [None]:
!gsutil cp ingest_pipeline.py $ROOT_BUCKET/script/

#### Create BQ queries

In [None]:
# 現時点ではtestテーブルは使用しません。（今後改良して使用する予定）
create_bq_dataset_table_query = f"""
CREATE SCHEMA IF NOT EXISTS `{PROJECT_NAME}.{BQ_DATASET}`;
CREATE OR REPLACE TABLE `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TRAINING_TABLE}`
(
  PassengerId SMALLINT,
  Survived SMALLINT,
  Pclass SMALLINT,
  Sex SMALLINT,
  Age SMALLINT,
  Fare SMALLINT,
  Embarked SMALLINT,
  Title SMALLINT,
  Age_Class SMALLINT,
  IsAlone SMALLINT,
);
CREATE OR REPLACE TABLE `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TESTING_TABLE}`
(
  PassengerId SMALLINT,
  -- Survived SMALLINT,
  Pclass SMALLINT,
  Sex SMALLINT,
  Age SMALLINT,
  Fare SMALLINT,
  Embarked SMALLINT,
  Title SMALLINT,
  Age_Class SMALLINT,
  IsAlone SMALLINT,
)
"""


create_bq_model_query = f"""
CREATE or REPLACE MODEL `{PROJECT_NAME}.{BQ_DATASET}.{BQ_ML_MODEL}`
OPTIONS (
  model_type = 'logistic_reg',
  input_label_cols=['Survived']
  -- num_trials=20,
  -- max_parallel_trials=2
  ) AS (
SELECT
  Pclass, Title, isAlone, Age, Sex, Embarked, Fare, Age_Class, Survived
FROM
  `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TRAINING_TABLE}`
WHERE PassengerId BETWEEN 1 AND 712
)
"""

create_bq_prediction_query = f"""
SELECT
 * 
FROM
  ML.PREDICT(MODEL `{PROJECT_NAME}.{BQ_DATASET}.{BQ_ML_MODEL}`, (
  SELECT
    PassengerId, Pclass, Title, Sex, Age, Fare, Embarked, IsAlone, Age_Class
  FROM
    --  `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TESTING_TABLE}`
    `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TRAINING_TABLE}`
  WHERE PassengerId > 712
  )
)
"""

create_bq_evaluate_query = f"""
SELECT
 * 
FROM
  ML.EVALUATE(MODEL `{PROJECT_NAME}.{BQ_DATASET}.{BQ_ML_MODEL}`, (
  SELECT
    Pclass, Title, Sex, Age, Fare, Embarked, IsAlone, Age_Class, Survived
  FROM
    `{PROJECT_NAME}.{BQ_DATASET}.{BQ_TRAINING_TABLE}`
  WHERE PassengerId > 712
  )
)
"""

### Build Pipeline

#### Create the pipeline

In [None]:
from kfp.v2 import dsl, compiler

@dsl.pipeline(name="mlops-bqml-titanic",
              description="A batch pipeline to generate RG model",
              pipeline_root=ROOT_BUCKET)
def pipeline(
    create_bq_dataset_table_query: str,
    python_file_path: str,
    temp_location: str = ROOT_BUCKET,
    project: str = PROJECT_NAME,
):

    from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryQueryJobOp, BigqueryCreateModelJobOp,
        BigqueryEvaluateModelJobOp,
        BigqueryPredictModelJobOp)
    from google_cloud_pipeline_components.v1.dataflow import \
        DataflowPythonJobOp
    from google_cloud_pipeline_components.v1.wait_gcp_resources import \
        WaitGcpResourcesOp

    # create the dataset, training and testing tables
    bq_tables_op = BigqueryQueryJobOp(
        query=create_bq_dataset_table_query,
        project=project,
        location="US",
    )

    # run dataflow job
    dataflow_python_training_data_op = DataflowPythonJobOp(
        python_module_path=python_file_path,
        project=project,
        temp_location=temp_location,
    ).after(bq_tables_op)

    dataflow_wait_training_data_op = WaitGcpResourcesOp(
    gcp_resources=dataflow_python_training_data_op.outputs["gcp_resources"]
        ).after(dataflow_python_training_data_op)

    # create the logistic regression model
    bq_model_op = BigqueryCreateModelJobOp(
        query=create_bq_model_query,
        project=project,
        location="US",
    ).after(dataflow_wait_training_data_op)

    # evaluate the logistic regression model
    bq_evaluate_op = BigqueryEvaluateModelJobOp(
        project=project, location="US", model=bq_model_op.outputs["model"],
        job_configuration_query={
            "destinationTable": {
                "projectId": PROJECT_NAME,
                "datasetId": "session11",
                "tableId": "evaluation",
            }
        }
    ).after(bq_model_op)

    # similuate prediction
    BigqueryPredictModelJobOp(
        model=bq_model_op.outputs["model"],
        query_statement=create_bq_prediction_query,
        job_configuration_query={
            "destinationTable": {
                "projectId": PROJECT_NAME,
                "datasetId": "session11",
                "tableId": "result",
            }
        },
        project=project,
        location="US",
    ).after(bq_evaluate_op)

## Compile and Run the pipeline

In [None]:
from pathlib import Path as path

PIPELINE_PACKAGE = "mlops_bqml_titanic_pipeline.json"
PYTHON_FILE_URI =  ROOT_BUCKET+"/script/ingest_pipeline.py"

compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE)

In [None]:
!cat mlops_bqml_titanic_pipeline.json

In [None]:
pipeline = vertex_ai.PipelineJob(
    display_name=f"data_preprocess",
    template_path=PIPELINE_PACKAGE,
    pipeline_root=ROOT_BUCKET,
    parameter_values={
        "create_bq_dataset_table_query": create_bq_dataset_table_query,
        "temp_location": ROOT_BUCKET,
        "python_file_path": PYTHON_FILE_URI,
    },
    enable_caching=False,
)

pipeline.run()

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.


In [None]:
# delete bucket
! gsutil -m rm -r $ROOT_BUCKET

# delete dataset
! bq rm -r -f -d $PROJECT_NAME:$BQ_DATASET