In [1]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Next24: ML pipelines

{TODO: Update the links below.}

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/notebook_template.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>                                                                                               
</table>

**_NOTE_**: This notebook has been tested in the following environment:

* Python version = 3.9

## Overview

This notebook shows how to run simple Sklearn-based ML pipelines on Vertex AI Pipelines.

### Objective

In this tutorial, you learn how to build ML pipelines interactivly.

This tutorial uses the following Google Cloud ML services and resources:

- Vertex AI Pipelines
- Cloud storage

The steps performed include:

- Build a data processing component
- Build a training component
- Build a model eval component
- Build a KFP ML pipeline

### Dataset

The California housing dataset contains census data of houses found in a given California district in 1990.



### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),
and [Cloud Storage pricing](https://cloud.google.com/storage/pricing),
and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

## Installation

Install the following packages required to execute this notebook.

{TODO: Suggest using the latest major GA version of each package; i.e., --upgrade}

In [2]:
! pip3 install --upgrade --quiet numpy pandas scikit-learn xgboost kfp google-cloud-aiplatform google-cloud-pipeline-components

### Colab only: Uncomment the following cell to restart the kernel.

In [3]:
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

3. [Enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

4. If you are running this notebook locally, you need to install the [Cloud SDK](https://cloud.google.com/sdk).

#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [4]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

PROJECT_ID = "rick-and-nardy-demo"

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [5]:
REGION = "us-central1"  # @param {type: "string"}

### Authenticate your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**1. Vertex AI Workbench**
* Do nothing as you are already authenticated.

**2. Local JupyterLab instance, uncomment and run:**

In [6]:
# ! gcloud auth login

**3. Colab, uncomment and run:**

In [7]:
# from google.colab import auth
# auth.authenticate_user()

**4. Service account or other**
* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

- *{Note to notebook author: For any user-provided strings that need to be unique (like bucket names or model ID's), append "-unique" to the end so proper testing can occur}*

In [8]:
#BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}
#BUCKET_URI = f"gs://{PROJECT_ID}"
import os

USER = os.getenv('USER')
BUCKET_URI = f"gs://{PROJECT_ID}-{USER}-pipeline"
print(BUCKET_URI)

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [9]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://rick-and-nardy-demo-ricc-pipeline/...
ServiceException: 409 A Cloud Storage bucket named 'rick-and-nardy-demo-ricc-pipeline' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


### Import libraries

In [10]:
import kfp
from kfp import dsl, compiler
from google.cloud import aiplatform
from kfp.dsl import importer_node
from google_cloud_pipeline_components.v1.model import ModelUploadOp, ModelGetOp
from google_cloud_pipeline_components.types import artifact_types

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


### Set variables

In [11]:
PIPELINE_ROOT = f"{BUCKET_URI}/california_pipeline_{USER}"
MODEL_PATH    = f"{PIPELINE_ROOT}/model_{USER}"
MODEL_NAME    = f"california_reg_model_by_{USER}"
DEMO_PIPELINE_NAME = "california-pipeline-riccardo"

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project.

In [12]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

### Create pipeline components

#### Data processing component

In [13]:
@dsl.component(base_image='python:3.10', packages_to_install=["numpy", "pandas", "scikit-learn"])
def data_preprocessing_op(processed_dataset: dsl.Output[dsl.Dataset]):

  from pathlib import Path as p
  from sklearn.datasets import fetch_california_housing
  from sklearn.impute import SimpleImputer
  from sklearn.preprocessing import StandardScaler
  import pandas as pd

  housing = fetch_california_housing(as_frame=True)
  housing_df = housing['frame']
  x_df = housing_df.drop('MedHouseVal', axis=1)
  y_df = housing_df[['MedHouseVal']]
  processed_x = SimpleImputer().fit_transform(x_df)
  processed_x = StandardScaler().fit_transform(processed_x)

  processed_x_df = pd.DataFrame(processed_x, columns=x_df.columns)
  housing_df = pd.merge(processed_x_df, y_df, left_index=True, right_index=True)

  p(processed_dataset.path).mkdir(exist_ok=True)
  processed_dataset_path = str(p(processed_dataset.path, "processed_dataset.csv"))
  housing_df.to_csv(processed_dataset_path, index=False)
  processed_dataset.path = processed_dataset_path

#### Training component

In [14]:
@dsl.component(base_image='python:3.10', packages_to_install=["numpy", "pandas", "scikit-learn", "xgboost"])
def training_op(params: dict , model_path: str, processed_dataset: dsl.Input[dsl.Dataset],
                trained_model: dsl.Output[dsl.Model], metrics: dsl.Output[dsl.Metrics]):

  from pathlib import Path as p
  import numpy as np
  import pandas as pd
  from sklearn.model_selection import train_test_split
  from xgboost import XGBRegressor
  from sklearn.metrics import mean_squared_error
  import joblib

  with open(processed_dataset.path, "r") as preprocessed_data:
      processed_df = pd.read_csv(preprocessed_data)

  x = processed_df.drop('MedHouseVal', axis=1)
  y = processed_df['MedHouseVal']
  X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=0)
  model = XGBRegressor()
  if params:
    model = XGBRegressor(**params)
  model = model.fit(X_train, y_train)
  y_pred = model.predict(X_test)
  rmse = round(np.sqrt(mean_squared_error(y_test, y_pred)), 3)

  metrics.log_metric("rmse", rmse)
  model_path = model_path.replace('gs://', '/gcs/')
  p(model_path).mkdir(exist_ok=True)
  model_filepath = str(p(model_path, "model.joblib"))
  joblib.dump(model, model_filepath)
  trained_model.path = model_filepath

### Build the pipeline

In [15]:
@dsl.pipeline(
    name="california-demo-pipeline",
)
def pipeline(params: dict = None, model_path:str = None, model_name: str = "None"):

    """A demo pipeline."""

    preprocessing_data_task = data_preprocessing_op()

    training_task = training_op(params=params, model_path=model_path,
                                processed_dataset=preprocessing_data_task.outputs['processed_dataset']).after(preprocessing_data_task)

    with dsl.If(model_name == "None"):

      model_importer_task = importer_node.importer(
        artifact_uri=model_path,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
            }
        },
      ).after(training_task)

      model_upload_op = ModelUploadOp(
          display_name=MODEL_NAME,
          unmanaged_container_model=model_importer_task.outputs["artifact"],
          version_aliases=['v1']
      ).after(model_importer_task)

    with dsl.Else():

      model_importer_task = importer_node.importer(
        artifact_uri=model_path,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest"
            }
        },
      ).after(training_task)

      get_model_task = ModelGetOp(model_name=model_name).after(model_importer_task)

      model_upload_op = ModelUploadOp(
          display_name=MODEL_NAME,
          unmanaged_container_model=model_importer_task.outputs["artifact"],
          parent_model=get_model_task.outputs["model"],
          version_aliases=['v2']
      ).after(get_model_task)

### Compile the pipeline

In [16]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.yaml")

### Run the pipeline for training the v1 of the model

In [17]:
#pipeline_name = f"california-demo-pipeline-{USER}"
#print(f"pipeline_name: {pipeline_name}")
job = aiplatform.PipelineJob(
    display_name= DEMO_PIPELINE_NAME, # pipeline_name,
    template_path="pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={'model_path': MODEL_PATH},
    enable_caching=True
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/849075740253/locations/us-central1/pipelineJobs/california-demo-pipeline-20240321084726
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/849075740253/locations/us-central1/pipelineJobs/california-demo-pipeline-20240321084726')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/california-demo-pipeline-20240321084726?project=849075740253
PipelineJob projects/849075740253/locations/us-central1/pipelineJobs/california-demo-pipeline-20240321084726 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/849075740253/locations/us-central1/pipelineJobs/california-demo-pipeline-20240321084726 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/849075740253/locations/us-central1/pipelineJobs/california-demo-pipeline-20240321084726 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob proje

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [training-op].; Job (project_id = rick-and-nardy-demo, job_id = 4571908161635614720) is failed due to the above error.; Failed to handle the job: {project_number = 849075740253, job_id = 4571908161635614720}"


### Run the pipeline for training the v2 of the model

In [None]:
model_list = aiplatform.Model.list(filter=f"display_name={MODEL_NAME}", order_by="create_time")
model_name = model_list[0].name

In [None]:
job = aiplatform.PipelineJob(
    display_name=DEMO_PIPELINE_NAME, # pipeline_name, # "california-demo-pipeline",
    template_path="pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={'params': {'learning_rate': 0.0001,
                                 'n_estimators': 4000,
                                 'max_depth': 20,
                                 'random_state': 8},
                      'model_path': MODEL_PATH,
                      'model_name': model_name
                      },
    enable_caching=True
)

job.run()

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

{TODO: Include commands to delete individual resources below}

In [None]:
import os

# Delete endpoint resource
# e.g. `endpoint.delete()`

# Delete model resource
# e.g. `model.delete()`

# Delete Cloud Storage objects that were created
delete_bucket = False
if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil -m rm -r $BUCKET_URI