![tracker](https://us-central1-vertex-ai-mlops-369716.cloudfunctions.net/pixel-tracking?path=statmike%2Fvertex-ai-mlops%2FMLOps%2FPipelines&file=Vertex+AI+Pipelines+-+Introduction.ipynb)
<!--- header table --->
<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/jbrache/vertex-ai-things/blob/main/MLOps/Pipelines/Private_PyPI/1_Vertex_AI_Pipelines_Introduction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo">
      <br>Run in<br>Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https%3A%2F%2Fraw.githubusercontent.com%2Fjbrache%2Fvertex-ai-things%2Fmain%2FMLOps%2FPipelines%2FPrivate_PyPI%2F1_Vertex_AI_Pipelines_Introduction.ipynb">
      <img width="32px" src="https://lh3.googleusercontent.com/JmcxdQi-qOpctIvWKgPtrzZdJJK-J3sWE1RsfjZNwshCFgE_9fULcNpuXYTilIR2hjwN" alt="Google Cloud Colab Enterprise logo">
      <br>Run in<br>Colab Enterprise
    </a>
  </td>      
  <td style="text-align: center">
    <a href="https://github.com/jbrache/vertex-ai-things/blob/main/MLOps/Pipelines/Private_PyPI/1_Vertex_AI_Pipelines_Introduction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      <br>View on<br>GitHub
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/jbrache/vertex-ai-things/main/MLOps/Pipelines/Private_PyPI/1_Vertex_AI_Pipelines_Introduction.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      <br>Open in<br>Vertex AI Workbench
    </a>
  </td>
</table>

---
This is part of a [series of notebook based workflows](https://github.com/statmike/vertex-ai-mlops/tree/main/MLOps/Pipelines) that teach all the ways to use pipelines within Vertex AI. Kudos to [statmike/vertex-ai-mlops](https://github.com/statmike/vertex-ai-mlops)!

---

# Vertex AI Pipelines - Introduction

**Note**: This sample has been modified from the [original sample](https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/Pipelines/Vertex%20AI%20Pipelines%20-%20Introduction.ipynb) (Kudos [statmike](https://github.com/statmike)!) to incorporate the use of a private Artifact Registry Python repository and a custom KFP base container image.

When an ML workflow has more than one step it can benefit from a pipeline.  [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction) is a managed service that executes [Kubeflow Pipelines (KFP)](https://www.kubeflow.org/docs/components/pipelines/v2/introduction/) and [TensorFlow Extended (TFX)](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines) pipelines.  For this introduction the focus will be on KFP due to its vast flexibility, it was even originally developed as a simplified way of running TFX on Kubernetes - [history of Kubeflow](https://www.kubeflow.org/docs/started/introduction/#history).
This notebook based workflow will introduce KFP pipelines runninng on Vertex AI Pipelines and point to the additional detailed workflows within [this repository](https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/Pipelines) that help with a deeper understanding.

<p align="center"><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/overview.png?raw=1" width="75%">
</center><p>

---

**Pipelines** - The Concept

Pipelines are constructed from steps.  The steps are called **tasks** and utilize **components**.

The tasks are connected by inputs and outputs.  This forms the execution graph of the pipeline.

Inputs and outputs can be made up of **parameters** (`str`, `int`, `float`, `bool`, `dict`, `list`) as well as **artifacts**.  Artifacts are multiparameter objects that have defined schemas for machine learning objects (datasets, models, etc.) and automatically get stored as [Vertex AI ML Metadata](https://cloud.google.com/vertex-ai/docs/ml-metadata/introduction) and have lineage.

**Kubeflow Pipelines** - Constructing Pipelines

Pipelines are written in Python using the [`kfp` package](https://www.kubeflow.org/docs/components/pipelines/v2/installation/).  This SDK has decorators for multiple ways of writing [**components**](https://www.kubeflow.org/docs/components/pipelines/v2/components/) and constructing [**pipelines**](https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/) from the components with features to control the execution and flow of the pipeline. Pipeline code is [**compiled**](https://www.kubeflow.org/docs/components/pipelines/v2/compile-a-pipeline/) into a YAML file using a single line command with the SDK.  

**Vertex AI Pipelines** - Executing Pipelines

The resulting YAML file is the input for [running the pipeline](https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline) with Vertex AI Pipelines.  The runs can even be scheduled with [Vertex AI Pipeline scheduler API](https://cloud.google.com/vertex-ai/docs/pipelines/schedule-pipeline-run).

The runs of a pipeline can be directly monitored, and compared, in the Vertex AI console:

**Pipeline Dashboard View In The Console: Overall Pipeline**
<p align="center"><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/overview.png?raw=1" width="75%">
</center><p>

---
## Colab Setup

To run this notebook in Colab run the cells in this section.  Otherwise, skip this section.

This cell will authenticate to GCP (follow prompts in the popup).

In [None]:
PROJECT_ID = 'ds-dev-jb02-pypi' # replace with project ID

In [None]:
try:
    from google.colab import auth
    auth.authenticate_user()
    !gcloud config set project {PROJECT_ID}
    print('Colab authorized to GCP')
except Exception:
    print('Not a Colab Environment')
    pass

---
## Installs

The list `packages` contains tuples of package import names and install names.  If the import name is not found then the install name is used to install quitely for the current user.

In [None]:
# tuples of (import name, install name, min_version)
packages = [
    ('google.cloud.aiplatform', 'google-cloud-aiplatform', '1.51.0'),
    ('kfp', 'kfp'),
]

import importlib
install = False
for package in packages:
    if not importlib.util.find_spec(package[0]):
        print(f'installing package {package[1]}')
        install = True
        !pip install {package[1]} -U -q --user
    elif len(package) == 3:
        if importlib.metadata.version(package[0]) < package[2]:
            print(f'updating package {package[1]}')
            install = True
            !pip install {package[1]} -U -q --user

### API Enablement

In [None]:
!gcloud services enable aiplatform.googleapis.com

### Restart Kernel (If Installs Occured)

After a kernel restart the code submission can start with the next cell after this one.

In [None]:
if install:
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)
    IPython.display.display(IPython.display.Markdown("""<div class=\"alert alert-block alert-warning\">
        <b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. The previous cells do not need to be run again⚠️</b>
        </div>"""))

---
## Setup

Inputs

In [None]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

In [None]:
REGION = 'us-central1'
SERIES = 'mlops'
EXPERIMENT = 'pipeline-intro'

# gcs bucket
GCS_BUCKET = PROJECT_ID

# Private Artifact Repository Python repo
PYTHON_REPO_PRIVATE="python-repo-vertex"

# Public PyPI Remote Repo
PYTHON_REPO_REMOTE="python-remote-repo"

# Training Container Image
PRIVATE_REPO = "kfp-base-images"
BASE_IMAGE = (
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{PRIVATE_REPO}/python-3.11-slim:latest"
)

In [None]:
############## Vertex AI Custom Job Service Account ###########################
PROJECT_NUMBER = !(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
PROJECT_NUMBER = PROJECT_NUMBER[0]

# This option creates a service account to use with Vertex AI Pipelines
SERVICE_ACCOUNT_NAME = "vertex-ai-sa"
SERVICE_ACCOUNT = f"vertex-ai-sa@{PROJECT_ID}.iam.gserviceaccount.com"

# This option uses the default compute engine service account
# SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)'
# SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
# SERVICE_ACCOUNT = f"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com"

Packages

In [None]:
import os, time, importlib
from typing import NamedTuple
from IPython.display import Markdown as show_md

from google.cloud import aiplatform
import kfp

In [None]:
kfp.__version__

In [None]:
aiplatform.__version__

Clients

In [None]:
# vertex ai clients
aiplatform.init(project = PROJECT_ID, location = REGION)

parameters:

In [None]:
DIR = f"temp/{SERIES}-{EXPERIMENT}"

environment:

In [None]:
if not os.path.exists(DIR):
    os.makedirs(DIR)

### Service Account Creation and IAM Grants

Create Service Account (if needed)

In [None]:
!gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
  --description="Vertex AI Service account for Custom Training Jobs" \
  --display-name="vertex-ai-sa"

Grant the SA IAM roles on your project to created Service Account (if needed)

In [None]:
!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SERVICE_ACCOUNT" \
  --role="roles/aiplatform.user"

!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SERVICE_ACCOUNT" \
  --role="roles/artifactregistry.reader"

!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SERVICE_ACCOUNT" \
  --role="roles/monitoring.metricWriter"

!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SERVICE_ACCOUNT" \
  --role="roles/logging.logWriter"

!gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:$SERVICE_ACCOUNT" \
  --role="roles/storage.objectUser"

---
## KFP Pipeline On Vertex AI Pipelines

An example workflow: a KFP pipeline constructed from custom components and run on Vertex AI Pipelines.

### Create Pipeline Components

These are simple Python components, specifically lightweight Python components.  

Simple component with multiple inputs of different data types and outputs a dictionary.

In [None]:
packages_to_install = ["pandas", "google-genai"]
# Test either repo
# PYTHON_REPO = PYTHON_REPO_PRIVATE
PYTHON_REPO = PYTHON_REPO_REMOTE
@kfp.dsl.component(
    #base_image = "python:3.11",
    base_image = BASE_IMAGE, # This previously configured image has keyring to authenenticate to Artifact Registry
    install_kfp_package = False,
    packages_to_install = packages_to_install,
    pip_index_urls = [f"https://us-central1-python.pkg.dev/{PROJECT_ID}/{PYTHON_REPO}/simple/"]*len(packages_to_install)
)
def example_parameters(
    in_str: str,
    in_int: int,
    in_float: float,
    in_bool: bool,
    in_list: list
) -> dict:
    results = dict(
        ex_str = in_str,
        ex_int = in_int,
        ex_float = in_float,
        ex_bool = in_bool,
        ex_list = in_list
    )
    return results

Simple component that take input parameters and outputs an artifact.

In [None]:
packages_to_install = ["pandas", "google-genai"]
# Test either repo
# PYTHON_REPO = PYTHON_REPO_PRIVATE
PYTHON_REPO = PYTHON_REPO_REMOTE
@kfp.dsl.component(
    #base_image = "python:3.11",
    base_image = BASE_IMAGE, # This previously configured image has keyring to authenenticate to Artifact Registry
    install_kfp_package = False,
    packages_to_install = packages_to_install,
    pip_index_urls = [f"https://us-central1-python.pkg.dev/{PROJECT_ID}/{PYTHON_REPO}/simple/"]*len(packages_to_install)
)
def example_artifact(
    metadata: dict
) -> kfp.dsl.Artifact:
    ex_artifact = kfp.dsl.Artifact(
        metadata = metadata,
        uri = 'https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/#artifact-types'
    )
    return ex_artifact

### Create Pipeline

In [None]:
@kfp.dsl.pipeline(
    name = f'{SERIES}-{EXPERIMENT}',
    description = 'An introduction pipeline',
    pipeline_root = f'gs://{GCS_BUCKET}/{SERIES}/{EXPERIMENT}/pipeline_root'
)
def intro_pipeline(
    in_str: str,
    in_int: int,
    in_float: float,
    in_bool: bool,
    in_list: list
) -> dict:

    task1 = example_parameters(
        in_str = in_str,
        in_int = in_int,
        in_float = in_float,
        in_bool = in_bool,
        in_list = in_list
    )

    with kfp.dsl.If(in_bool == True, name = 'Proceed If True'):
        task2 = example_artifact(metadata = task1.output)

    return task1.output

### Compile Pipeline

In [None]:
kfp.compiler.Compiler().compile(
    pipeline_func = intro_pipeline,
    package_path = f'{DIR}/{SERIES}-{EXPERIMENT}.yaml'
)

### Create Pipeline Job (With Vertex AI SDK)

The compiled pipeline file can be submitted for running with the console or the SDK (shown here).  Check out the details in the documentation [here](https://cloud.google.com/vertex-ai/docs/pipelines/run-pipeline#create_a_pipeline_run) for an overview with the console.


In [None]:
pipeline_job = aiplatform.PipelineJob(
    display_name = f"{SERIES}-{EXPERIMENT}",
    template_path = f"{DIR}/{SERIES}-{EXPERIMENT}.yaml",
    parameter_values = dict(
        in_str = 'An Example String',
        in_int = 45,
        in_float = 4.5,
        in_bool = True,
        in_list = [1, 27, 'Another String']
    ),
    pipeline_root = f'gs://{GCS_BUCKET}/{SERIES}/{EXPERIMENT}/pipeline_root',
    enable_caching = False # True (enabled), False (disable), None (defer to component level caching)
)

### Submit Pipeline Job (On Vertex AI Pipelines)

In [None]:
response = pipeline_job.submit(
    service_account = SERVICE_ACCOUNT,
)

In [None]:
show_md(f'The Dashboard can be viewed here:\n{pipeline_job._dashboard_uri()}')

In [None]:
pipeline_job.wait()

---
## Results: Vertex AI Console And SDK

As soon as the job is submitted the response gives a link to the the dashboard view in the console for Vertex AI Pipelines.  Using this link, or navigating directly though Vertex AI, gives a dashboard view of the pipeline as it is running with progress indicatiors for each task.  This information call also be retrieved using the Vertex AI SDK. Both approaches will be covered in this section for:
- Pipeline Run
- Pipeline Run Tasks Level
- Pipeline IO Artifacts
- Artifact Lineage In Vertex AI ML Metadata

### Pipeline Run Overview
**Pipelines Runs List In The Console: Review All Runs And Select Pipeline Run**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/runs.png?raw=1" width="75%">
</center><p>


**Pipeline Dashboard View In The Console: Overall Pipeline**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/overview.png?raw=1" width="75%">
</center><p>

Note that:
- each node, or tasks, is displayed.
- the connections between nodes are visual and reflect the input/output dependencies
- the node purpose is reflected with an icon and the status is reflected with an icon and color coding
- a summary of the pipeline run is provide on the right and includes all the pipelines inputs parameters.

    
The same information can be retrieved with the Vertex AI SDK as follows.
- SDK Reference: [`aiplatform.PipelineJob`](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob)

#### Retrieve Latest Run: By Display Name

In [None]:
aiplatform.PipelineJob.list(
    project = PROJECT_ID,
    location = REGION,
    filter = f'display_name="{SERIES}-{EXPERIMENT}"',
    order_by='update_time desc'
)[0]

#### Retrieve all runs to dataframe:
- SDK Refrence: [`aiplatform.get_pipeline_df`](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform#google_cloud_aiplatform_get_pipeline_df)

In [None]:
aiplatform.get_pipeline_df(pipeline = f'{SERIES}-{EXPERIMENT}')

#### Review PipelineJob Object

In [None]:
pipeline_job

In [None]:
type(pipeline_job)

In [None]:
pipeline_job.state

In [None]:
pipeline_job.display_name

In [None]:
pipeline_job.name

In [None]:
pipeline_job.to_dict().keys()

In [None]:
pipeline_job.to_dict()['pipelineSpec'].keys()

### Pipeline Run: Task Overview

Each component used in the pipeline created a task.  These task can be retrieved, reviewed, and even examine for input/ouput values of parameters.

**Pipeline Dashboard View In The Console: Task Level Details**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/overview.png?raw=1" width="75%">
</center><p>

#### Review Tasks with `PipelineJob.task_details`

In [None]:
[(task.task_name, task.state) for task in pipeline_job.task_details]

In [None]:
tasks = {task.task_name: task for task in pipeline_job.task_details}

In [None]:
#tasks['example-parameters'].execution.metadata

#### Review Tasks With `PipelineJob.gca_resource` As Dictionary with `.to_dict()`

In [None]:
pipeline_job.to_dict()['jobDetail']['taskDetails'][1]['execution']['metadata']

### Pipeline Run: `CustomJob` For Task:

Each task runs as a Vertex AI Training Job.  The details of these jobs can also be reviewed directly from the console by click the 'VIEW JOB' link in the task overview panel.  This opens the exact training job under Vertex AI Training:

**Vertex AI Training Job In The Console: Task Level Job Resources**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/job.png?raw=1" width="75%">
</center><p>

### Retrieve `CustomJob` Details Using `PipelineJob` object:

In [None]:
task_job_id = pipeline_job.to_dict()['jobDetail']['taskDetails'][1]['executorDetail']['containerDetail']['mainJob']
task_job_id

In [None]:
task_job = aiplatform.CustomJob.get(resource_name = task_job_id)

In [None]:
type(task_job)

In [None]:
task_job

In [None]:
task_job.display_name

In [None]:
task_job.to_dict()

### Pipeline IO Artifacts

The pipeline task created from the `example_artifact` component returned an artifact (`kfp.dsl.Artifact`) as output.  Pipelines automatically store artifacts in Vertex AI ML Metadata.

**Pipeline Dashboard View In The Console: Artifact Info**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/overview.png?raw=1" width="75%">
</center><p>

#### Retrieve Artifact Info From `PipelineJob`

In [None]:
pipeline_job.to_dict()['jobDetail']['taskDetails'][3]['execution']['metadata']

In [None]:
artifact_id = pipeline_job.to_dict()['jobDetail']['taskDetails'][3]['execution']['metadata']['vertex-ai-pipelines-artifact-argument-binding']['output:Output'][0]
artifact_id

In [None]:
artifact = aiplatform.Artifact.get(resource_id = artifact_id)

In [None]:
artifact.uri

In [None]:
artifact.metadata

#### Retrieve Pipeline Artifacts Vertex AI SDK

- [`aiplatform.Artifact()`](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Artifact)

In [None]:
artifacts = aiplatform.Artifact.list(
    filter = "metadata.ex_bool.bool_value = true"
)

In [None]:
artifacts

In [None]:
type(artifacts[0])

In [None]:
artifacts[0].metadata

In [None]:
artifacts[0].lineage_console_uri

### Pipeline IO Artifact Metadata Lineage

The consolve view of artifact info include a link to the Vertex AI ML Metadata.  This link can also be directly retrieved for the artifact using the SDK with `lineage_console_uri` attribute.  This ones the lineage view of the artifact in the Vertex AI ML Metadata Console.


**Vertex AI ML Metadata Console: Artifact Lineage**
<p><center>
    <img src="https://github.com/statmike/vertex-ai-mlops/blob/main/MLOps/resources/images/screenshots/pipelines/intro/metadata.png?raw=1" width="75%">
</center><p>

In [None]:
artifact.lineage_console_uri