<a href="https://colab.research.google.com/github/silverstar0727/ML-Pipeline-Tutorial/blob/main/metadata_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 설치 및 라이브러리 임포트

In [1]:
# 해당 셀을 실행한 후에 반드시 "런타임 다시시작"을 해주세요
!pip install -q kfp

[K     |████████████████████████████████| 235kB 7.6MB/s 
[K     |████████████████████████████████| 133kB 12.4MB/s 
[K     |████████████████████████████████| 645kB 15.2MB/s 
[K     |████████████████████████████████| 112kB 32.1MB/s 
[K     |████████████████████████████████| 1.8MB 22.1MB/s 
[K     |████████████████████████████████| 61kB 8.5MB/s 
[K     |████████████████████████████████| 61kB 8.8MB/s 
[K     |████████████████████████████████| 61kB 7.4MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
[K     |████████████████████████████████| 92kB 9.4MB/s 
[K     |████████████████████████████████| 1.0MB 42.0MB/s 
[K     |████████████████████████████████| 81kB 11.8MB/s 
[K     |████████████████████████████████| 71kB 10.5MB/s 
[?25h  Building wheel for docstring-parser (PEP 517) ... [?25l[?25hdone
  Building wheel for kfp (setup.py) ... [?25l[?25hdone
  B

In [2]:
from typing import NamedTuple
import json 

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, ClassificationMetrics)
from kfp.v2.google.client import AIPlatformClient

## GCP 연결

In [3]:
# gcp 연결
from google.colab import auth as google_auth

google_auth.authenticate_user()

# 경로변수 설정
아래 항목들을 본인 환경에 맞게 수정해주세요.

* PROJECT_ID = <프로젝트 ID>
* REGION = <리전>
* BUCKET_NAME = <bucket 이름>
* USER = <user 이름>

In [10]:
from datetime import datetime

PROJECT_ID = 'mlops-210515'
REGION = "us-central1"

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_NAME = "gs://pipeline-129332"

USER = "JeongMin-Do"
PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)

## Metadata pipeline

In [11]:
@component
def preprocess(
    # An input parameter of type string.
    message: str,
    # Use Output to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # `Dataset`.
    output_dataset_two_path: OutputPath("Dataset"),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
):
    """'Mock' preprocessing step.
    Writes out the passed in message to the output "Dataset"s and the output message.
    """
    output_dataset_one.metadata["hello"] = "there"
    # Use OutputArtifact.path to access a local file path for writing.
    # One can also use OutputArtifact.uri to access the actual URI file path.
    with open(output_dataset_one.path, "w") as f:
        f.write(message)

    # OutputPath is used to just pass the local file path of the output artifact
    # to the function.
    with open(output_dataset_two_path, "w") as f:
        f.write(message)

    with open(output_parameter_path, "w") as f:
        f.write(message)

In [12]:
@component(
    base_image="python:3.9",  # Use a different base image.
)
def train(
    # An input parameter of type string.
    message: str,
    # Use InputPath to get a locally accessible path for the input artifact
    # of type `Dataset`.
    dataset_one_path: InputPath("Dataset"),
    # Use InputArtifact to get a metadata-rich handle to the input artifact
    # of type `Dataset`.
    dataset_two: Input[Dataset],
    # Output artifact of type Model.
    imported_dataset: Input[Dataset],
    model: Output[Model],
    # An input parameter of type int with a default value.
    num_steps: int = 3,
    # Use NamedTuple to return either artifacts or parameters.
    # When returning artifacts like this, return the contents of
    # the artifact. The assumption here is that this return value
    # fits in memory.
) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),  # Return parameter.
        ("generic_artifact", Artifact),  # Return generic Artifact.
    ],
):
    """'Mock' Training step.
    Combines the contents of dataset_one and dataset_two into the
    output Model.
    Constructs a new output_message consisting of message repeated num_steps times.
    """

    # Directly access the passed in GCS URI as a local file (uses GCSFuse).
    with open(dataset_one_path, "r") as input_file:
        dataset_one_contents = input_file.read()

    # dataset_two is an Artifact handle. Use dataset_two.path to get a
    # local file path (uses GCSFuse).
    # Alternately, use dataset_two.uri to access the GCS URI directly.
    with open(dataset_two.path, "r") as input_file:
        dataset_two_contents = input_file.read()

    with open(model.path, "w") as f:
        f.write("My Model")

    with open(imported_dataset.path, "r") as f:
        data = f.read()
    print("Imported Dataset:", data)

    # Use model.get() to get a Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact. This metadata will be
    # recorded in Managed Metadata and can be queried later. It will also show up
    # in the UI.
    model.metadata["accuracy"] = 0.9
    model.metadata["framework"] = "Tensorflow"
    model.metadata["time_to_train_in_seconds"] = 257

    artifact_contents = "{}\n{}".format(dataset_one_contents, dataset_two_contents)
    output_message = " ".join([message for _ in range(num_steps)])
    return (output_message, artifact_contents)

In [13]:
@component
def read_artifact_input(
    generic: Input[Artifact],
):
    with open(generic.path, "r") as input_file:
        generic_contents = input_file.read()
        print(f"generic contents: {generic_contents}")

In [14]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="metadata-pipeline-v2",
)
def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
        artifact_class=Dataset,
        reimport=False,
    )
    preprocess_task = preprocess(message=message)
    train_task = train(
        dataset_one=preprocess_task.outputs["output_dataset_one"],
        dataset_two=preprocess_task.outputs["output_dataset_two"],
        imported_dataset=importer.output,
        message=preprocess_task.outputs["output_parameter"],
        num_steps=5,
    )
    read_task = read_artifact_input(  # noqa: F841
        train_task.outputs["generic_artifact"]
    )

In [15]:
compiler.Compiler().compile(
    pipeline_func = pipeline, package_path = "metadata-pipeline.json"
)

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

response = api_client.create_run_from_job_spec(
    job_spec_path="metadata-pipeline.json", pipeline_root=PIPELINE_ROOT, parameter_values={"message": "Hello, World"},
)