<a href="https://colab.research.google.com/github/nicanornicolas/AI-APP/blob/main/ML_Pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing Vertex AI SDK for Python and other required packages.

In [1]:
!pip install --upgrade --quiet \
    google-cloud-aiplatform \
    google-cloud-storage \
    kfp \
    google-cloud-pipeline-components \
    ydf \
    opencv-python-headless \
    thinc \
    numpy

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/269.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m269.1/269.1 kB[0m [31m7.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.9/7.9 MB[0m [31m84.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m72.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Authenticating the environment on Google Colab
## Restarts the runtime.(Colab only)

In [2]:
import sys

if "google.colab" in sys.modules:

  import IPython

  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

Authenticate the notebook environment.

## For Google Colab only.

In [1]:
import sys

if "google.colab" in sys.modules:

  from google.colab import auth

  auth.authenticate_user()

Set the Google Cloud Project information.

In [2]:
PROJECT_ID = "valiant-imagery-464509-h0"
LOCATION = "us-central1(iowa)"

Creating a Cloud Storage bucket.
{Note: For any user-provided strings that need to be unique(like bucket names or model IDs), append "-unique" to the end so proper testing can occur}

In [3]:
BUCKET_URI = f"gs://demo_bucket101-valiant-imagery-464509-h0-unique"

## Run this cell only if your bucket doesn't already exist to create a Cloud Storage bucket.

In [4]:
! gsutil mb -1 {LOCATION} -p {PROJECT_ID} {BUCKET_URL}

CommandException: Incorrect option(s) specified. Usage:

  gsutil mb [-b (on|off)] [-c <class>] [-k <key>] [-l <location>] [-p <project>]
            [--autoclass] [--retention <time>] [--pap <setting>]
            [--placement <region1>,<region2>]
            [--rpo (ASYNC_TURBO|DEFAULT)] gs://<bucket_name>...

For additional help run:
  gsutil help mb


Service Account.

In [5]:
SERVICE_ACCOUNT = "my-service-account"

## Run this cell if you don't know the service account, to get the service account using gcloud command.

In [4]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "my-service-account"
):
    # Get your service account from gcloud
    if not IS_COLAB:
      shell_output = !gcloud auth list 2>/dev/null
      SERVICE_ACCOUNT = shell_output[2].replace("*","").strip()

    if IS_COLAB:
      shell_output = !gcloud projects describe $PROJECT_ID
      project_number = None
      for line in shell_output:
        if "projectNumber" in line:
          project_number = line.split(":")[1].strip().replace("'","")
          break
      if project_number:
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 102963417104-compute@developer.gserviceaccount.com


Set service account access for Vertex AI Pipelines.
## Run these commands to grant your service account access tp read and write pipeline artifacts in the bucket that was created in the previous step - this is required to only run once per service account.

In [6]:
! gsutil iam ch serviceAccount: 102963417104-compute@developer.gserviceaccount.com:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount: 102963417104-compute@developer.gserviceaccount.com:roles/storage.objectViewer $BUCKET_URI

CommandException: Must specify a role to grant.
CommandException: Must specify a role to grant.


Set up variables

## Import libraries and define constants

In [7]:
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component)

## Vertex AI Pipelines constants.

 Set up the following constants for Vertex AI Pipelines:

In [8]:
PIPELINE_ROOT = "{}/pipeline_root/shakespeare".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

In [9]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define Python function-based pipeline components.
Define function-based components that consume parameters and produce (typed) Artifacts and parameters.
Functions can produce Artifacts in 3 ways:
  - Accept an output local path using OutputPath
  - Accept an OutputArtifact which gives the function a handle to the output artifact's metadata.
  - Return an Artifact(or Dataset, Model, Metrics, etc) in a NamedTuple.

  // Options for producing Artifacts are demonstrated.

  ## Define preprocess component.

  The first component definition, preprocess, shows a component that outputs two Dataset Artifacts, as well as an output parameter. (In this example, the datasets do no reflect real data).

  For the parameter output, you would typically use the approach shown here, using the OutputPath type, for the "larger" data. For "small data", like a short string, it might be more convenient to use the NamedTuple function output as shown in the second component instead.









In [10]:
@component(base_image="python:3.9")
def preprocess(
    # An input parameter of type string.
    message: str,
    # Use Output to get a metadata-rich handle to the output artifact
    # of type Dataset.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # Dataset.
    output_dataset_two_path: OutputPath("Dataset"),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
):
  """'Mock' preprocessing step.
  Writes out the passed in message to the output "Dataset's and the output message.
  """
  output_dataset_one.metadata["hello"] = "there"
  # Use OutputArtifact.path to access a local file path for writing.
  # One can also use OutputArtifact.uri to access the actual URI file path.
  with open(output_dataset_one.path, "w") as f:
    f.write(message)

  # OutputPath is used to just pass the local file path of the output artifact
  # to the function.
  with open(output_dataset_two_path, "w") as f:
    f.write(message)

  # Output parameters are written to the output "String"
  with open(output_parameter_path, "w") as f:
    f.write(message)

## Define Train component.

The second component definition, train, defines as input both an InputPath of type Dataset, and an InputArtifact of type Dataset(as well as other parameter inputs). It uses the NamedTuple format for function output. As shown, these outputs can be Artifacts as well as parameters.

Additionally, this component writes some metrics. Metadata to the model output Artifact. This information is displayed in the Cloud Console user interface when the pipeline runs.



In [11]:
@component(
    base_image="python:3.10", # Use a different base image.
)
def train(
    # An input parameter of type string.
    message: str,
    # Use InputPath to get a locally accessible path for the input artifact
    # of type Dataset.
    dataset_one_path: InputPath("Dataset"),
    # Use InputArtifact to get a metadata-rich handle to the input artifact
    # of type 'Dataset'.
    dataset_two: Input[Dataset],
    # Output artifact of type Model.
    imported_dataset: Input[Dataset],
    model: Output[Model],
    # An input parameter of type int with a default value.
    num_steps: int = 3,
    # Use NamedTuple to return either artifacts or parameters.
    # When returning artifacts like this, return the contents of
    # the artifact. The assumption here is that this return value
    # fits in memory.
    ) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),  # Return parameter.
        ("generic_artifact", Artifact),  # Return generic Artifact.
    ],
):
    """'Mock' Training step.
    Combines the contents of dataset_one and dataset_two into the output Model.
    Constructs a new output_message consisting of message repeated num_steps times.
    """

    # Directly access the passed-in GCS URI as a local file (uses GCSFuse).
    with open(dataset_one_path, "r") as input_file:
      dataset_one_contents = input_file.read()

    # dataset_two is an Artifact handle. Use dataset_two.path to get a
    # local file path (uses GCSFuse).
    # Alternatively, use dataset_two.uri to access the GCS URI directly.
    with open(dataset_two.path, "r") as input_file:
      dataset_two_contents = input_file.read()

    with open(model.path, "w") as f:
      f.write("My Model")

    with open(imported_dataset.path, "r") as f:
      data = f.read()
    print("Imported Dataset:", data)

    # Use model.get() to get a Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact. This metadata is
    # recorded in Managed Metadata and can be queried later. It also shows up
    # in the Google Cloud Console.
    model.metadata["accuracy"] = 0.9
    model.metadata["framework"] = "TensorFlow"
    model.metadata["time_to_train_in_seconds"] = 264

    artifact_contents = "{}\n{}".format(dataset_one_contents, dataset_two_contents)
    output_message = " ".join([message for _ in range(num_steps)])
    return (output_message, Artifact(uri=model.uri, metadata=model.metadata))

## Define read_artifact_input component.

Finally, you define a small component that takes as input the generic_artifact returned by the train component function, and reads and prints the Artifact's contents.

In [12]:
@component(base_image="python:3.9")
def read_artifact_input(
    generic_artifact: Input[Artifact],
):
    with open(generic_artifact.path, "r") as input_file:
      generic_contents = input_file.read()
    print("Generic Artifact Contents:", generic_contents)


## Define a pipeline that uses the components and the importer.

The next step is to define a pipeline that uses the components that have been build, and also shows the use of kfp.dsl.importer.

In this example, the importer is used for creation, in our case, a Dataset artifact from an existing URI.

Note that the train_task step takes as inputs 3 of the outputs of the preprocess_task step, as well as the output of the importer step. In the "train" inputs, we refer to the preprocess output_parameter, which gives us the output string directly.

The read_task step takes as input the train_task generic artifact output.

In [13]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="metadata-pipeline-v2",
)
def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
        artifact_class=Dataset,
        reimport=False,
    )
    preprocess_task = preprocess(message=message)
    train_task = train(
        dataset_one_path=preprocess_task.outputs["output_dataset_one"],
        dataset_two=preprocess_task.outputs["output_dataset_two_path"],
        imported_dataset=importer.output,
        message=preprocess_task.outputs["output_parameter_path"],
        num_steps=10,
    )
    read_task = read_artifact_input(
        generic_artifact=train_task.outputs["generic_artifact"])

## Compile the pipeline

In [14]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="lightweight_pipeline.yaml"
)

## Run the pipeline

In [15]:
DISPLAY_NAME = "shakespeare"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="lightweight_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"message": "Hello World"},
    enable_caching=False,
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
