In [None]:
import urllib
from IPython.display import Markdown as md

### change to reflect your notebook
_nb_loc = "10_mlops/10a_mlpipeline.ipynb"
_nb_title = "ML Pipeline"

_icons=["https://raw.githubusercontent.com/GoogleCloudPlatform/practical-ml-vision-book/master/logo-cloud.png", "https://www.tensorflow.org/images/colab_logo_32px.png", "https://www.tensorflow.org/images/GitHub-Mark-32px.png", "https://www.tensorflow.org/images/download_logo_32px.png"]
_links=["https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?" + urllib.parse.urlencode({"name": _nb_title, "download_url": "https://github.com/takumiohym/practical-ml-vision-book-ja/raw/master/"+_nb_loc}), "https://colab.research.google.com/github/takumiohym/practical-ml-vision-book-ja/blob/master/{0}".format(_nb_loc), "https://github.com/takumiohym/practical-ml-vision-book-ja/blob/master/{0}".format(_nb_loc), "https://raw.githubusercontent.com/takumiohym/practical-ml-vision-book-ja/master/{0}".format(_nb_loc)]
md("""<table class="tfo-notebook-buttons" align="left"><td><a target="_blank" href="{0}"><img src="{4}"/>Run in Vertex AI Workbench</a></td><td><a target="_blank" href="{1}"><img src="{5}" />Run in Google Colab</a></td><td><a target="_blank" href="{2}"><img src="{6}" />View source on GitHub</a></td><td><a href="{3}"><img src="{7}" />Download notebook</a></td></table><br/><br/>""".format(_links[0], _links[1], _links[2], _links[3], _icons[0], _icons[1], _icons[2], _icons[3]))

# 機械学習パイプライン  

このノートブックでは、花の分類モデルを作成するための一連のワークフローをパイプラインとして実行する方法を示します

## 設定

In [None]:
%pip install --upgrade --user kfp google_cloud_pipeline_components

In [None]:
REGION = 'us-central1'  # Change as needed to a region where you have quota
PROJECT = !gcloud config get-value project
PROJECT = PROJECT[0]
print(PROJECT)
%env PROJECT = {PROJECT}
%env REGION = {REGION}
BUCKET = PROJECT + "-flowers-pipeline"
%env BUCKET = {BUCKET}

In [None]:
!gsutil mb -l {REGION} gs://{BUCKET}

コンテナをビルドする

In [None]:
!../build_docker_image.sh

## JPEGファイルをTF Recordsに変換するコンポーネント

In [None]:
%%bash 
echo > components/create_dataset.yaml "name: create_dataset
description: Converts JPEG files to TensorFlow Records using Dataflow or Apache Beam
inputs:
- {name: runner, type: String, default: 'DataflowRunner', description: 'DirectRunner or DataflowRunner'}
- {name: project_id, type: String, description: 'Project to bill Dataflow job to'}
- {name: region, type: String, description: 'Region to run Dataflow job in'}
- {name: input_csv, type: String, description: 'Path to CSV file'}
- {name: output_dir, type: String, description: 'Top-level directory for TF records'}
- {name: labels_dict, type: String, description: 'Dictionary file for class names'}
outputs:
- {name: tfrecords_topdir, type: String, description: 'Top-level directory for TF records'}
implementation:
  container:
    image: gcr.io/$PROJECT/practical-ml-vision-book:latest
    command: [
        'bash', '/src/practical-ml-vision-book/10_mlops/components/create_dataset.sh'
    ]
    args: [
        {inputValue: output_dir},
        {outputPath: tfrecords_topdir},
        '--all_data', {inputValue: input_csv},
        '--labels_file', {inputValue: labels_dict},
        '--project_id', {inputValue: project_id},
        '--output_dir', {inputValue: output_dir},
        '--runner', {inputValue: runner},
        '--region', {inputValue: region},
    ]"

In [None]:
%%bash
cd ../07_training/serverlessml
python ./setup.py sdist --formats=gztar
gsutil cp ./dist/flowers-1.0.tar.gz gs://${BUCKET}/model/

In [None]:
!gsutil ls gs://$BUCKET/model/

## パイプライン定義

In [None]:
import kfp
import kfp.v2.dsl as dsl
from kfp.v2.dsl import component
import json
import os
import datetime
from typing import NamedTuple

from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.aiplatform import (CustomPythonPackageTrainingJobRunOp,
                                                         ModelUploadOp,
                                                         EndpointCreateOp,
                                                         ModelDeployOp)

from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp

from kfp.v2.components import importer_node

TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

create_dataset_op = kfp.components.load_component_from_file(
    'components/create_dataset.yaml'
)

@component(base_image="python:3.8")
def construct_workerpool_spec_op(
    container_uri:str,
    machine_type:str,
    replica_count:int,
    accelerator_type:str,
    accelerator_count:int,
    bucket:str,
    timestamp:str,
    input_top_dir:str,
    num_epochs:int,
    distribute:str,
    pattern:str
)-> NamedTuple(
    "Outputs",
    [("workerpoolspec", list)]
):
    args = [f'--job_dir=gs://{bucket}/trained_model/{timestamp}',
            f'--input_topdir={input_top_dir.strip()}',
            f'--pattern={pattern}',
            f'--num_epochs={num_epochs}',
            f'--distribute={distribute}']

    worker_pool_specs=[
        {
            "pythonPackageSpec": {
                "args": args,
                "executorImageUri": container_uri,
                "packageUris": [f"gs://{bucket}/model/flowers-1.0.tar.gz"],
                "pythonModule": "flowers.classifier.train"
            },
            "replicaCount": replica_count,
            "machineSpec": {
                "machineType": machine_type,
                "accelerator_type": accelerator_type,
                "accelerator_count": accelerator_count,
            },
        }
    ]

    from collections import namedtuple
    output = namedtuple('Outputs', ['workerpoolspec'])
    
    return output(worker_pool_specs)


@dsl.pipeline(
    name='flowers-transfer-learning-pipeline',
    description='End-to-end pipeline',
    pipeline_root=f'gs://{os.getenv("BUCKET")}/pipeline',
)
def flowerstxf_pipeline(
    project_id:str,
    bucket:str,
    region:str,
    timestamp:str
):

    # Create dataset
    create_dataset = create_dataset_op(
        project_id=project_id,
        region=region,
        input_csv='gs://practical-ml-vision-book/flowers_5_jpeg/flower_photos/all_data.csv',
        output_dir=f'gs://{bucket}/data/flower_tfrecords/{timestamp}',
        labels_dict='gs://practical-ml-vision-book/flowers_5_jpeg/flower_photos/dict.txt'
    )

    construct_workerpool_spec = construct_workerpool_spec_op(
        container_uri='us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-8:latest',
        machine_type='n1-highmem-8',
        replica_count=1,
        accelerator_type='NVIDIA_TESLA_T4',
        accelerator_count=2,
        bucket=bucket,
        timestamp=timestamp,
        input_top_dir=create_dataset.outputs['tfrecords_topdir'],
        num_epochs=20,
        distribute='gpus_one_machine',
        pattern='-*'
    )
    
    # Train model
    train_model = CustomTrainingJobOp(
        project=project_id,
        display_name=f'flowers_{timestamp}_gpus_one_machine',
        worker_pool_specs=construct_workerpool_spec.outputs['workerpoolspec']
    )

    # Deploy trained model
    model_upload_op = ModelUploadOp(
        display_name=f"flower-model-{timestamp}",
        project=project_id,
        artifact_uri=f"gs://{bucket}/trained_model/{timestamp}/flowers_model",
        serving_container_image_uri="us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-8:latest",
    ).after(train_model)

    endpoint_create_op = EndpointCreateOp(
        project=project_id,
        display_name=f"flower-endpoint-{timestamp}",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=f'flower_model_{timestamp}',
        dedicated_resources_machine_type="n1-standard-16",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )


## パイプラインのコンパイル

In [None]:
PIPELINE_JSON = "flowerstxf_pipeline.json"

_compiler = kfp.v2.compiler.Compiler()
_compiler.compile(pipeline_func=flowerstxf_pipeline, package_path=PIPELINE_JSON)

## Vertex AI Pipelinesへジョブを送信する

In [None]:
from google.cloud import aiplatform

TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

aiplatform.init(project=PROJECT, location=REGION)

pipeline = aiplatform.PipelineJob(
    display_name="flower_transferlearning_pipeline_cloud",
    template_path=PIPELINE_JSON,
    enable_caching=False,
    parameter_values={'project_id': PROJECT, 'bucket': BUCKET, 'region': REGION, 'timestamp': TIMESTAMP},
)

pipeline.run()

## License
Copyright 2022 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.