## 環境構築

### 注意点
本ノートブックは、講座の教材の手順に沿って、セル内のコードを1つずつ実行することを想定して作成されています。

Run All 等のコマンドで連続実行しないようにしましょう。

In [2]:
# 管理者権限不要な ~/ 配下にインストールするための設定
USER_FLAG = '--user'

!pip3 install --upgrade pip
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
# kfp : Kubeflow Pipelines DSL パッケージ
!pip3 install {USER_FLAG} kfp==1.8.9
!pip3 install {USER_FLAG} google-cloud-pipeline-components==0.2.0



In [3]:
import os

# 上記のパッケージをインストールした後、カーネルを再起動
# 下記コードを実行すると自動的にカーネルが再起動される
if not os.getenv('IS_TESTING'):
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [2]:
# パッケージが正しくインストールされていることを確認
# **KFP SDKのバージョンは>>1.8である必要があります
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


## 環境パラメータの設定

In [3]:
import os


PROJECT_ID = ''

# gcloud から Google Cloud プロジェクト ID を取得
if not os.getenv('IS_TESTING'):
    
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    
    PROJECT_ID = shell_output[0]
    print('Project ID: ', PROJECT_ID)


# バケット名を格納する変数を作成
# 違うバケットを使用する場合は適宜変更
BUCKET_NAME = 'gs://mlops-system-001'
print('BUCKET_NAME: ', BUCKET_NAME)

# アーティファクトが書き込まれるクラウドストレージのパス(PIPELINE_ROOT)設定
PATH = %env PATH
%env PATH = {PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = f'{BUCKET_NAME}/pipeline_root/'
print(f'PIPELINE_ROOT: {PIPELINE_ROOT}')

REGION = 'us-central1'
print(f'REGION: {REGION}')

# API service endpoint
# API_ENDPOINT: Dataset、Model、Job、Pipeline、Endpoint サービスの Vertex AI APIサービスエンドポイント
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
print(f'API_ENDPOINT: {API_ENDPOINT}')

Project ID:  mlops-sandbox-348105
BUCKET_NAME:  gs://mlops-system-001
env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin
PIPELINE_ROOT: gs://mlops-system-001/pipeline_root/
REGION: us-central1
API_ENDPOINT: us-central1-aiplatform.googleapis.com


In [4]:
# バケットがまだ存在していない場合に限り、クラウドストレージのバケットを作成
# ! gsutil mb -l $REGION $BUCKET_NAME

In [5]:
# クラウドストレージのバケットの内容を確認して、アクセスを検証
! gsutil ls -al $BUCKET_NAME

                                 gs://mlops-system-001/pipeline_root/
                                 gs://mlops-system-001/sample_dataset/


## モジュールのインポート

In [6]:
import time
import matplotlib.pyplot as plt
import pandas as pd

# import kfp
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, \
                    Input, Metrics, Model, Output, InputPath, OutputPath, \
                    ClassificationMetrics

from google.cloud import aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip

from typing import Optional
from typing import NamedTuple

In [7]:
VER_NAME = 'v1'

# ディスプレイネームを生成
DISPLAY_NAME = 'pug-image-clf-{}-{}'.format(VER_NAME, str(int(time.time())))
print(DISPLAY_NAME)

# データセット用の画像パスと、ラベルを記載したCSVを指定
SRC_URI = f'{BUCKET_NAME}/sample_dataset/dog_cat_v1/image_labels.csv'

pug-image-clf-v1-1650754833


## コンポーネントを定義

### データセット作成コンポーネントを定義

In [8]:
@component(
    base_image = 'python:3.7',
    output_component_file = f'{VER_NAME}_create_and_import_dataset_img.yaml',
    packages_to_install = ['google-cloud-aiplatform'],
)
def create_and_import_dataset_image(
    project_id: str,
    location: str,
    display_name: str,
    src_uri: str,
    sync: bool = True,
) -> NamedTuple(
    'Outputs',
    [
        ('dataset_img_resource_name', str), # Return parameter.
    ],
):
    
    '''
    src_uri -- a string, e.g.
        'gs://bucket1/source1.csv'
    '''
    
    from google.cloud import aiplatform as aip
    
    print('Initializing Vertex AI')
    aip.init(project=project_id, location=location)

    dataset_img = aip.ImageDataset.create(
        display_name = display_name,
        gcs_source = src_uri,
        import_schema_uri = aip.schema.dataset.ioformat.image.single_label_classification,
        sync = sync,
    )

    dataset_img.wait()
    
    dataset_img_resource_name = str(dataset_img.resource_name)
    
    return (dataset_img_resource_name, )

### モデルの学習コンポーネントを定義

In [9]:
@component(
    base_image = 'python:3.7',
    output_component_file = f'{VER_NAME}_create_img_clf_training.yaml',
    packages_to_install = ['google-cloud-aiplatform'],
)
def create_img_clf_training(
    project_id: str,
    location: str,
    display_name: str,
    dataset_img_resource_name: str, # データセットのresource_name
    model_display_name: Optional[str] = None,
    model_type: str = 'CLOUD',
    prediction_type: str = 'classification',
    multi_label: bool = False,
    training_fraction_split: float = 0.8,
    validation_fraction_split: float = 0.1,
    test_fraction_split: float = 0.1,
    budget_milli_node_hours: int = 8000, # 単位: ミリノード時間 # モデルタイプがCLOUDの場合、[8000, 800000]
    disable_early_stopping: bool = False, # True にすると early stopping しない
    sync: bool = True,
) -> NamedTuple(
    'Outputs',
    [
        ('model_resource_name', str), # Return parameter.
    ],
):
    
    from google.cloud import aiplatform as aip
    
    print('Initializing Vertex AI')
    aip.init(project=project_id, location=location)

    # 作成済みDatasetを resource_name から取得
    my_image_ds = aip.ImageDataset(dataset_img_resource_name)
    
    # -----------------------------------------------------------------------------------
    # モデルを学習
    # -----------------------------------------------------------------------------------
    job = aip.AutoMLImageTrainingJob(
        display_name = display_name,
        model_type = model_type,
        prediction_type = prediction_type,
        multi_label = multi_label
    )

    model = job.run(
        dataset = my_image_ds,
        model_display_name = model_display_name,
        training_fraction_split = training_fraction_split,
        validation_fraction_split = validation_fraction_split,
        test_fraction_split = test_fraction_split,
        budget_milli_node_hours = budget_milli_node_hours,
        disable_early_stopping = disable_early_stopping,
        sync = sync,
    )
    # -----------------------------------------------------------------------------------
    # モデルをロード
    # -----------------------------------------------------------------------------------
    # model_location = 'projects/xxx/locations/us-central1/models/xxx'
    # print('Getting model from {}'.format(model_location))
    # model = aip.Model(model_location)
    # -----------------------------------------------------------------------------------

    model.wait()

    model_resource_name = str(model.resource_name)
    
    return (model_resource_name, )

### モデルの性能評価コンポーネントを定義

In [10]:
@component(
    base_image = 'gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest',
    output_component_file = f'{VER_NAME}_clf_eval_component.yaml',
    packages_to_install = ['google-cloud-aiplatform'],
)
def clf_model_eval_metrics(
    project: str,
    location: str, 
    api_endpoint: str, 
    thresholds_dict_str: str,
    model_resource_name: str, # modelのresource_name
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple(
    'Outputs',
    [
        ('dep_decision', str), # Return parameter.
    ],
):
    import json
    import logging

    from google.cloud import aiplatform as aip


    # モデルの評価情報の取得
    def get_eval_info(client, model_name):

        from google.protobuf.json_format import MessageToDict
        
        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        
        for evaluation in response:
            
            print('model_evaluation')
            print(' name:', evaluation.name)
            print(' metrics_schema_uri:', evaluation.metrics_schema_uri)
            
            metrics = MessageToDict(evaluation._pb.metrics)
            
            for metric in metrics.keys():
                logging.info('metric: %s, value: %s', metric, metrics[metric])
                
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    

    def log_metrics(metrics_list, metricsc):
        """
        GCS　に、評価値をアーティファクトとして残すための関数
        """
        test_confusion_matrix = metrics_list[0]['confusionMatrix']
        logging.info('rows: %s', test_confusion_matrix['rows'])

        # confusionMatrixから得られる値を使って、False Positive Rate と True Positive Rateを算出
        fpr = []
        tpr = []
        thresholds = []

        for item in metrics_list[0]['confidenceMetrics']:
            fpr.append(item.get('falsePositiveRate', 0.0))
            tpr.append(item.get('recall', 0.0))
            thresholds.append(item.get('confidenceThreshold', 0.0))

        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # 混同行列を計算
        annotations = []
        for item in test_confusion_matrix['annotationSpecs']:
            annotations.append(item['displayName'])
        logging.info('confusion matrix annotations: %s', annotations)

        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix['rows'],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != 'confidenceMetrics':
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)


    # Use the given metrics threshold(s) to determine whether the model is accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info('k {}, v {}'.format(k, v))
            
            if k in ['auRoc', 'auPrc']:
                # higher is better
                if metrics_dict[k] < v:
                    # if under threshold, don't deploy
                    logging.info('{} < {}; returning False'.format(metrics_dict[k], v))
                    return False
        
        # if under threshold, don't deploy
        logging.info('threshold checks passed.')
        
        return True


    # ------------------------------------------------------------------------------------------------
    logging.getLogger().setLevel(logging.INFO)
    
    aip.init(project=project)

    logging.info('model path: %s', model_resource_name)
    client_options = {'api_endpoint': api_endpoint}
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(client, model_resource_name)
    
    logging.info('got evaluation name: %s', eval_name)
    logging.info('got metrics list: %s', metrics_list)
    
    log_metrics(metrics_list, metricsc) # GCS　に、評価値をアーティファクトとして残すための処理
    thresholds_dict = json.loads(thresholds_dict_str)
    
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    
    if deploy:
        dep_decision = 'true'
    else:
        dep_decision = 'false'
        
    logging.info('deployment decision is %s', dep_decision)

    return (dep_decision,)

### エンドポイント作成コンポーネントを定義

In [11]:
@component(
    base_image='python:3.7',
    output_component_file = f'{VER_NAME}_create_endpoint.yaml',
    packages_to_install=['google-cloud-aiplatform'],
)
def create_endpoint(
    project_id: str,
    display_name: str,
    location: str,
    sync: bool = True,
) -> NamedTuple(
    'Outputs',
    [
        ('endpoint_resource_name', str), # Return parameter.
    ],
):
    
    from google.cloud import aiplatform as aip
    
    endpoint = aip.Endpoint.create(
        display_name = display_name, 
        project = project_id, 
        location = location,
    )
    
    endpoint.wait()

    endpoint_resource_name = str(endpoint.resource_name)
    print(f'endpoint_resource_name ::: {endpoint_resource_name}')
    
    return (endpoint_resource_name, )

### エンドポイントにDeployするコンポーネントを定義

In [12]:
@component(
    base_image='python:3.7',
    output_component_file = f'{VER_NAME}_deploy_model.yaml',
    packages_to_install=['google-cloud-aiplatform'],
)
def deploy_model(
    project_id: str,
    display_name: str,
    location: str,
    model_resource_name: str, # modelのresource_name
    endpoint_resource_name: str, # endpointのresource_name
    sync: bool = True,
):
    
    from google.cloud import aiplatform as aip
    
    print('Initializing Vertex AI')
    aip.init(project=project_id, location=location)
    
    model = aip.Model(model_resource_name)
    endpoint = aip.Endpoint(endpoint_resource_name)
    endpoint.deploy(model)

## pipelineを定義

In [13]:
@pipeline(
    name = f'{VER_NAME}-pug-img-clf-pipeline', 
    pipeline_root=PIPELINE_ROOT)
def pug_img_clf_pipeline(
    display_name: str,
    project_id: str,
    region: str, 
    src_uri: str, # データセットの読み込みパスとラベルを記述したCSVを指定
    api_endpoint: str = 'us-central1-aiplatform.googleapis.com',
    thresholds_dict_str: str = '{"auPrc": 0.95}', # deploy判定する精度の閾値を設定
):
    
    # データセット作成
    ds = create_and_import_dataset_image(
            project_id = project_id,
            location = region,
            display_name = display_name,
            src_uri = src_uri,
        )
    
    #  学習
    model = create_img_clf_training(
            project_id = project_id,
            location = region,
            display_name = display_name,
            dataset_img_resource_name = ds.outputs['dataset_img_resource_name'],
        )
    
    location = region
    
    # 性能評価
    model_eval_task = clf_model_eval_metrics(
            project_id,
            location,
            api_endpoint,
            thresholds_dict_str,
            model_resource_name = model.outputs['model_resource_name'],
        )
    
    # kfp の Conditionでは、評価ステップの出力を使用してデプロイの進め方を決定
    with dsl.Condition(
        model_eval_task.outputs['dep_decision'] == 'true', # デプロイ可否判断
        name = f'{VER_NAME}_clf_deploy_decision',
    ):

        # モデルが デプロイするのに十分な精度であると判断されたら

        # エンドポイント作成
        endpoint = create_endpoint(
                project_id, 
                display_name, 
                location
            )

        # デプロイ処理
        deploy_model(
                project_id, 
                display_name, 
                location,
                model_resource_name = model.outputs['model_resource_name'], 
                endpoint_resource_name = endpoint.outputs['endpoint_resource_name'], 
            )

## pipelineをコンパイル

In [14]:
compiler.Compiler().compile(
    pipeline_func = pug_img_clf_pipeline, 
    package_path = f'{VER_NAME}-pug-img-clf-pipeline.json'
)



## pipeline jobを実行

In [15]:
pipeline_job = aip.PipelineJob(
    display_name = DISPLAY_NAME,
    template_path = f'{VER_NAME}-pug-img-clf-pipeline.json',
    pipeline_root = PIPELINE_ROOT,
    parameter_values = {'project_id': PROJECT_ID, 
                        'display_name': DISPLAY_NAME, 
                        'region': REGION, 
                        'src_uri': SRC_URI, 
                        'api_endpoint': API_ENDPOINT, 
                        'thresholds_dict_str': '{"auPrc": 0.95}',
                       },
    enable_caching=True # パイプラインの実行でキャッシングを無効にしたい場合は False
)

In [16]:
pipeline_job.submit() # pipeline jobを実行

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/663579093435/locations/us-central1/pipelineJobs/v1-pug-img-clf-pipeline-20220423233449
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/663579093435/locations/us-central1/pipelineJobs/v1-pug-img-clf-pipeline-20220423233449')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/v1-pug-img-clf-pipeline-20220423233449?project=663579093435


## pipeline jobをスケジュール実行

In [None]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

# 必要に応じてタイムゾーンやcronスケジュールを調整
response = api_client.create_schedule_from_job_spec(
    job_spec_path = f'{VER_NAME}-pug-img-clf-pipeline.json',
    schedule = '5 * * * *', # *時5分に実行
    time_zone = 'Asia/Tokyo',
    parameter_values = {'project_id': PROJECT_ID, 
                        'display_name': DISPLAY_NAME, 
                        'region': REGION, 
                        'src_uri': SRC_URI, 
                        'api_endpoint': API_ENDPOINT, 
                        'thresholds_dict_str': '{"auPrc": 0.95}',
                       },
    # pipeline_root=PIPELINE_ROOT  # パイプラインの定義でPIPELINE_ROOTを指定しなかった場合、この引数が必要
)