In [None]:
import kfp.dsl as dsl
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import component

In [None]:
# scikit-learn 線形回帰モデルのスコアを Cloud Storage へ出力する
# scikit-learn https://scikit-learn.org/stable/
# @component デコレータで関数をパッケージ化します
@component
def out_eval(bucket:str):
    import subprocess
    subprocess.run(["pip", "install", "--upgrade", "pandas"])
    subprocess.run(["pip", "install", "--upgrade", "sklearn"])
    subprocess.run(["pip", "install", "--upgrade", "fsspec"])
    subprocess.run(["pip", "install", "--upgrade", "gcsfs"])
    import pandas as pd
    from sklearn.datasets import load_boston
    from sklearn.model_selection import train_test_split
    from sklearn import linear_model

    boston = load_boston()
    x = boston.data # 説明変数
    y = boston.target # 目的変数
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

    model = linear_model.LinearRegression()
    model.fit(x_train, y_train)
    
    df = pd.DataFrame({'train_score': [model.score(x_train, y_train)],
                      'test_score': [model.score(x_test, y_test)]})
    df.to_csv('gs://{}/test.csv'.format(bucket), index=False)

In [None]:
# Cloud Storage に出力された CSV を BigQuery に出力する
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv?hl=ja#python
@component
def out_bq(project_id:str, bucket:str):
    import subprocess
    subprocess.run(["pip", "install", "--upgrade", "google-cloud-bigquery"])
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    table_id='{}.vertexai.boston_eval'.format(project_id)
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("train_score", "FLOAT"),
            bigquery.SchemaField("test_score", "FLOAT"),
        ],
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        source_format=bigquery.SourceFormat.CSV,
    )
    uri = "gs://{}/test.csv".format(bucket)

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )

    load_job.result()

In [None]:
# 参考 https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline
PROJECT_ID = 'xxx' # プロジェクトID
GCS_BUCKET = 'yyy' # バケット名

root_path = 'gs://{}/pipeline/'.format(GCS_BUCKET) # パイプラインサービスアカウントがアクセスできる Cloud Storage URI

@dsl.pipeline(
    name='sample-pipeline',
    description='VertexAIサンプルパイプライン',
    pipeline_root=root_path
)

# パイプライン構築
def root_pipeline():
    out_bq(PROJECT_ID, GCS_BUCKET).after(out_eval(GCS_BUCKET))

# パイプラインのコンパイル
compiler.Compiler().compile(pipeline_func=root_pipeline, package_path='sample_pipeline.json')

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region='us-central1')

api_client.create_schedule_from_job_spec(
    job_spec_path='sample_pipeline.json',
    schedule='0 10 * * *', # パイプラインを実行するスケジュール
    time_zone='Asia/Tokyo',
    pipeline_root=root_path
)