## Installation

Install the packages required for executing this notebook.

In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        'kfp<2' \
                        'google-cloud-pipeline-components<2'

In [2]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.22


## Before you begin

#### Set your project ID

**If you don't know your project ID**, try the following:
* Run `gcloud config list`.
* Run `gcloud projects list`.
* See the support page: [Locate the project ID](https://support.google.com/googleapi/answer/7014113)

In [2]:
! gcloud config list

[core]
account = 665554893092-compute@developer.gserviceaccount.com
disable_usage_reporting = True
project = vertex-nirvana-poc

Your active configuration is: [default]


In [3]:
PROJECT_ID = "vertex-nirvana-poc"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [4]:
REGION = "us-central1"  # @param {type: "string"}

### Authenticate your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**2. Local JupyterLab instance, uncomment and run:**


### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [5]:
BUCKET_URI = "gs://vertex-nirvana-poc"  # @param {type:"string"}

In [6]:
BUCKET = f"gs://vertex-nirvana-poc/"

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [6]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://vertex-nirvana-poc/...
ServiceException: 409 A Cloud Storage bucket named 'vertex-nirvana-poc' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [7]:
SERVICE_ACCOUNT = "665554893092-compute@developer.gserviceaccount.com"

In [8]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://vertex-nirvana-poc/
No changes made to gs://vertex-nirvana-poc/


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [9]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

#### Vertex AI constants

Setup up the following constants for Vertex AI:

- `API_ENDPOINT`: The Vertex AI API service endpoint for `Dataset`, `Model`, `Job`, `Pipeline` and `Endpoint` services.

In [10]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [11]:
PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_URI)

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [12]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [13]:
from google.cloud import aiplatform as aip
from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    Model,
    Output,
    ClassificationMetrics,
    component,
    pipeline,
)
from kfp.v2 import compiler

In [14]:
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

In [15]:
from google_cloud_pipeline_components.v1.model import ModelUploadOp


In [16]:
PIPELINE_ROOT = "{}/pipeline_root/intro".format(BUCKET_URI)

In [17]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform==1.20.0",
        "pandas",
        "fsspec",
        "gcsfs",
        "google-cloud-bigquery-storage",
        "pyarrow",
    ],
)
def data_download(
    project: str,
    location: str,
    feature_store: str,
    data_url: str,
    split_date: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    import pandas as pd
    import logging

    from google.cloud import aiplatform as aip

    logging.warn("Import file:", data_url)

    # Initialize Vertex AI client
    aip.init(project=project, location=location)

    # Initiate feature store and run batch serve request
    flight_delays_feature_store = aip.Featurestore(featurestore_name=feature_store)

    read_instances = pd.read_csv(data_url)
    read_instances["flight"] = read_instances["flight"].astype(str)
    read_instances["airport"] = read_instances["airport"].astype(str)
    read_instances["timestamp"] = pd.to_datetime(read_instances["timestamp"])

    # Read features into a dataframe
    data: pd.DataFrame = flight_delays_feature_store.batch_serve_to_df(
        serving_feature_ids={
            "flight": ["*"],
            "airport": ["*"],
        },
        read_instances_df=read_instances,
    )

    completed_flights = data[~data["is_cancelled"]]

    # Consider flights that arrive more than 15 min late as delayed
    completed_flights["target"] = completed_flights["arrival_delay_minutes"] > 15
    training_data = completed_flights.drop(
        columns=[
            "timestamp",
            "entity_type_flight",
            "is_cancelled",
            "arrival_delay_minutes",
            "origin_airport_id",
            "entity_type_airport",
        ]
    )

    test_data = training_data[completed_flights["timestamp"] >= split_date]
    training_data = training_data[completed_flights["timestamp"] < split_date]

    training_data.to_csv(dataset_train.path, index=False)
    test_data.to_csv(dataset_test.path, index=False)

In [18]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
)
def model_train(
    dataset: Input[Dataset],
    model: Output[Artifact],
):
    import pandas as pd
    import pickle
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression

    data = pd.read_csv(dataset.path)
    X = data.drop(columns=["target"])
    y = data["target"]

    model_pipeline = Pipeline(
        [
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(random_state=42)),
        ]
    )

    model_pipeline.fit(X, y)

    model.metadata["framework"] = "scikit-learn"
    model.metadata["containerSpec"] = {
        "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    }

    file_name = model.path + "/model.pkl"
    import pathlib

    pathlib.Path(model.path).mkdir()
    with open(file_name, "wb") as file:
        pickle.dump(model_pipeline, file)


In [19]:
@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
)
def model_evaluate(
    test_set: Input[Dataset],
    model: Input[Model],
    metrics: Output[ClassificationMetrics],
):
    import pandas as pd
    import pickle
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score

    data = pd.read_csv(test_set.path)[:1000]
    file_name = model.path + "/model.pkl"
    with open(file_name, "rb") as file:
        model_pipeline = pickle.load(file)

    X = data.drop(columns=["target"])
    y = data.target
    y_pred = model_pipeline.predict(X)

    y_scores = model_pipeline.predict_proba(X)[:, 1]
    fpr, tpr, thresholds = roc_curve(y_true=y, y_score=y_scores, pos_label=True)
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    metrics.log_confusion_matrix(
        ["False", "True"],
        confusion_matrix(y, y_pred).tolist(),
    )


In [20]:
# Define the workflow of the pipeline.
@dsl.pipeline(name="gcp-mlops-v0", pipeline_root=PIPELINE_ROOT)
def pipeline(
    training_data_url: str = f"gs://{BUCKET}/features/read_instances/-00000-of-00001.csv",
    test_split_date: str = "2021-12-20",
):
    data_op = data_download(
        project=PROJECT_ID,
        location=REGION,
        feature_store="flight_delays",
        data_url=training_data_url,
        split_date=test_split_date,
    )

    from google_cloud_pipeline_components.experimental.custom_job.utils import (
        create_custom_training_job_op_from_component,
    )

    custom_job_distributed_training_op = create_custom_training_job_op_from_component(
        model_train, replica_count=1
    )

    model_train_op = custom_job_distributed_training_op(
        dataset=data_op.outputs["dataset_train"],
        project=PROJECT_ID,
        location=REGION,
    )

    model_evaluate_op = model_evaluate(
        test_set=data_op.outputs["dataset_test"],
        model=model_train_op.outputs["model"],
    )

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-model",
        unmanaged_container_model=model_train_op.outputs["model"],
    ).after(model_evaluate_op)

    endpoint_create_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-endpoint",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name="flight-delay-model",
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )


compiler.Compiler().compile(pipeline_func=pipeline, package_path="gcp-mlops-v0.json")

aip.init(project=PROJECT_ID, staging_bucket=BUCKET, location=REGION)

job = aip.PipelineJob(
    display_name="gcp-mlops-v0",
    template_path="gcp-mlops-v0.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run(service_account=SERVICE_ACCOUNT)



Creating PipelineJob
PipelineJob created. Resource name: projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-20231011171506
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-20231011171506')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/gcp-mlops-v0-20231011171506?project=665554893092
PipelineJob projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-20231011171506 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-20231011171506 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-20231011171506 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/665554893092/locations/us-central1/pipelineJobs/gcp-mlops-v0-2023101

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [data-download].; Job (project_id = vertex-nirvana-poc, job_id = 5409011718716129280) is failed due to the above error.; Failed to handle the job: {project_number = 665554893092, job_id = 5409011718716129280}"


### Define Python function-based pipeline components

In this tutorial, you define a simple pipeline that has three steps, where each step is defined as a component.

#### Define hello_world component

First, define a component based on a very simple Python function. It takes a string input parameter and returns that value as output.

Note the use of the `@component` decorator, which compiles the function to a KFP component when evaluated.  For example purposes, this example specifies a base image to use for the component (`python:3.9`), and a component YAML file, `hw.yaml`. The compiled component specification is written to this file.  (The default base image is `python:3.7`, which would of course work just fine too).

In [18]:
from kfp import dsl

@dsl.component
def conditional_branch_op(output: str) -> str:
    """Conditionally branch based on the output value."""
    # Define your condition logic here
    if output == "Hello, world!":
        return "condition_met"
    else:
        return "condition_not_met"


In [22]:
from typing import NamedTuple
import kfp
from kfp import dsl, components

# Define your component functions here

@components.func_to_container_op
# def hello_world(text: str) -> str:
#     print(text)
#     return text
#@func_to_container_op
def hello_world(text: str) -> str:
    return f"Hello, {text}!"


#     return step1.output , step2.output, step3.output, step4.output
@dsl.pipeline(name='Conditional Hello World Pipeline')
def conditional_hello_world_pipeline(input_condition: str) -> NamedTuple(
    'Outputs',
    [
        ('step1_output', str),
        ('step2_output', str),
        ('step3_output', str),
        ('step4_output', str),
    ]
):
    # Step 1: Call the hello_world function with "world" input
    step1 = hello_world("world")

    # Step 2: Use the input_condition parameter to control the branching
    with dsl.Condition(input_condition == "condition_met"):
        # If the input_condition is "condition_met", execute this branch
        step2 = hello_world("Alice")

    with dsl.Condition(input_condition == "condition_met"):
        # If the input_condition is "condition_met", execute this branch
        step3 = hello_world("Bob")

    # Step 4: Always execute this step
    step4 = hello_world("OpenAI")

    return step1.output, step2.output, step3.output, step4.output


@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hello world"):
    hw_task = hello_world(text)
    # two_outputs_task = two_outputs(text)
    conditional_hello_world_pipeline_task = conditional_hello_world_pipeline(hw_task.output)
    # consumer_task = consumer(
    #     hw_task.output,
    #     two_outputs_task.outputs["output_one"],
    #     two_outputs_task.outputs["output_two"],
    # )

if __name__ == '__main__':
    compiler.Compiler().compile(pipeline_func=pipeline, package_path="Conditional_pipeline_job_with_caching.json")
    DISPLAY_NAME = "Conditional_pipeline_job_with_caching"
    job = aip.PipelineJob(
        display_name=DISPLAY_NAME,
        template_path="Conditional_pipeline_job_with_caching.json",
        pipeline_root=PIPELINE_ROOT,
        enable_caching=True,
    )
    job.run()


Creating PipelineJob
PipelineJob created. Resource name: projects/665554893092/locations/us-central1/pipelineJobs/intro-pipeline-unique-20231004155310
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/665554893092/locations/us-central1/pipelineJobs/intro-pipeline-unique-20231004155310')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/intro-pipeline-unique-20231004155310?project=665554893092
PipelineJob run completed. Resource name: projects/665554893092/locations/us-central1/pipelineJobs/intro-pipeline-unique-20231004155310


As you'll see below, compilation of this component creates a [task factory function](https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/)—called `hello_world`— that you can use in defining a pipeline step.

While not shown here, if you want to share this component definition, or use it in another context, you could also load it from its yaml file like this:
`hello_world_op = components.load_component_from_file('./hw.yaml')`.
You can also use the `load_component_from_url` method, if your component yaml file is stored online. (For GitHub URLs, load the 'raw' file.)

#### Define two_outputs component

The first component below, `two_outputs`, demonstrates installing a package -- in this case the `google-cloud-storage` package. Alternatively, you can specify a base image that includes the necessary installations.

*Note:* The component function won't actually use the package.

Alternatively, you can specify a base image that includes the necessary installations.

The `two_outputs` component returns two named outputs.

#### Define the consumer component

The third component, `consumer`, takes three string inputs and prints them out.

### Define a pipeline that uses the components

Next, define a pipeline that uses these three components.

By evaluating the component definitions above, you've created task factory functions that are used in the pipeline definition to create the pipeline steps.

The pipeline takes an input parameter, and passes that parameter as an argument to the first two pipeline steps (`hw_task` and `two_outputs_task`).

Then, the third pipeline step (`consumer_task`) consumes the outputs of the first and second steps.  Because the `hello_world` component definition just returns one unnamed output, you refer to it as `hw_task.output`.  The `two_outputs` task returns two named outputs, which you access as `two_outputs_task.outputs["<output_name>"]`.

*Note:* In the `@dsl.pipeline` decorator, you're defining the `PIPELINE_ROOT` Cloud Storage path to use.  If you had not included that info here, it would be required to specify it when creating the pipeline run, as you'll see below.

## Compile the pipeline

Next, compile the pipeline.

## Run the pipeline

Next, run the pipeline.

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the UI, many of the pipeline DAG nodes will expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/intro_pipeline.png" width="60%"/></a>

### Delete the pipeline job

You can delete the pipeline job with the method `delete()`.

In [27]:
job.delete()

## Specifying a service account to use for a pipeline run

By default, the [service account](https://cloud.google.com/iam/docs/service-accounts) used for your pipeline run is your [default compute engine service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account).
However, you might want to run pipelines with permissions to access different roles than those configured for your default SA (e.g. perhaps using a more restricted set of permissions).

If you want to execute your pipeline using a different service account, this is straightforward to do.  You just need to give the new service account the correct permissions.

### Create a service account

Once your service account is configured, you pass it as an argument to the `create_run_from_job_spec` method. The pipeline job runs with the permissions of the given service account.

Learn about [creating and configuring a service account to work with Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account).

In [5]:
from typing import List
from pandas import DataFrame
from google.cloud import aiplatform


def ingest_features_df(
    project: str,
    location: str,
    featurestore_id: str,
    entity_type_id: str,
    features_ids: List[str],
    feature_time: str,
    features_df: DataFrame,
    entity_id_field: str
) -> aiplatform.featurestore.EntityType:
    """
    Ingests features into a Featurestore from a Pandas DataFrame.
    Args:
        project: The Google Cloud project ID.
        location: The Google Cloud location.
        featurestore_id: The Featurestore ID.
        entity_type_id: The Entity Type ID.
        features_ids: The list of Feature IDs.
        feature_time: The Feature timestamp.
        features_df: The Pandas DataFrame containing the features.
        entity_id_field: The Entity ID field.
    Returns:
        None
    """
    # Initialize the Vertex SDK for Python
    aiplatform.init(project=project, location=location)

    # Get the entity type from an existing Featurestore
    entity_type = aiplatform.featurestore.EntityType(entity_type_id=entity_type_id,
                                                     featurestore_id=featurestore_id)
    # Ingest the features
    entity_type.ingest_from_df(
        feature_ids=features_ids,
        feature_time=feature_time,
        df_source=features_df,
        entity_id_field=entity_id_field
    )

    return entity_type

In [4]:
from typing import Dict, List
from pandas import DataFrame
from google.cloud import aiplatform


def batch_serve_features_df(
        project: str,
        location: str,
        featurestore_id: str,
        serving_feature_ids: Dict[str, List[str]],
        read_instances_df: DataFrame,
        pass_through_fields: List[str]) -> DataFrame:
    """
    Retrieves batch feature values from a Featurestore and writes them to a GCS bucket.
    Args:
        project: The Google Cloud project ID.
        location: The Google Cloud location.
        featurestore_id: The Featurestore ID.
        serving_feature_ids: The dictionary of Entity Type IDs and Feature IDs to retrieve.
        read_instances_df: The Pandas DataFrame containing entities and feature values.
        pass_through_fields: The list of fields to pass through extra to the label column.
    Returns:
        The Pandas DataFrame containing the dataset.
    """

    # Initialize the Vertex SDK for Python
    aiplatform.init(project=project, location=location)

    # Get an existing Featurestore
    featurestore = aiplatform.featurestore.Featurestore(featurestore_name=featurestore_id)

    # Get data with a point-in-time query from the Featurestore
    df = featurestore.batch_serve_to_df(
        serving_feature_ids=serving_feature_ids,
        read_instances_df=read_instances_df,
        pass_through_fields=pass_through_fields
    )

    return df

In [3]:
from kfp.v2.dsl import component
from typing import NamedTuple


@component(output_component_file="batch_serve_features_gcs.yaml",
           base_image="python:3.9",
           packages_to_install=["google-cloud-aiplatform"])
def batch_serve_features_gcs(feature_store_id: str,
                             gcs_destination_output_uri_prefix: str,
                             gcs_destination_type: str,
                             serving_feature_ids: str,
                             read_instances_uri: str,
                             project: str,
                             location: str) -> NamedTuple("Outputs", [("gcs_destination_output_uri_paths", str)]):
    # Import libraries
    import os
    from json import loads
    from google.cloud import aiplatform
    from google.cloud.aiplatform.featurestore import Featurestore

    # Initialize Vertex AI client
    aiplatform.init(project=project, location=location)

    # Initiate feature store and run batch serve request
    featurestore = Featurestore(featurestore_name=feature_store_id)

    # Serve features in batch on GCS
    serving_feature_ids = loads(serving_feature_ids)
    featurestore.batch_serve_to_gcs(
        gcs_destination_output_uri_prefix=gcs_destination_output_uri_prefix,
        gcs_destination_type=gcs_destination_type,
        serving_feature_ids=serving_feature_ids,
        read_instances_uri=read_instances_uri
    )

    # Store metadata
    gcs_destination_output_path_prefix = gcs_destination_output_uri_prefix.replace("gcs://", "/gcs/")
    gcs_destination_output_paths = os.path.join(gcs_destination_output_path_prefix, "*.csv")
    component_outputs = NamedTuple("Outputs",
                                   [("gcs_destination_output_uri_paths", str), ], )

    return component_outputs(gcs_destination_output_paths)

  from kfp.v2.dsl import component
  @component(output_component_file="batch_serve_features_gcs.yaml",
  def batch_serve_features_gcs(feature_store_id: str,


In [2]:
from typing import List
from pandas import DataFrame
from google.cloud import aiplatform


def online_serve_feature_values(
        project: str,
        location: str,
        featurestore_id: str,
        entity_type_id: str,
        entity_ids: List[str],
        feature_ids: List[str]) -> DataFrame:
    """
    Retrieves online feature values from a Featurestore.
    Args:
        project: The Google Cloud project ID.
        location: The Google Cloud location.
        featurestore_id: The Featurestore ID.
        entity_type_id: The Entity Type ID.
        entity_ids: The list of Entity IDs.
        feature_ids: The list of Feature IDs.
    Returns:
        A Pandas DataFrame containing the feature values.
    """

    # Initialize the Vertex SDK for Python
    aiplatform.init(project=project, location=location)

    # Get the entity type from an existing Featurestore
    entity_type = aiplatform.featurestore.EntityType(entity_type_id=entity_type_id,
                                                     featurestore_id=featurestore_id)
    # Retrieve the feature values
    feature_values = entity_type.read(entity_ids=entity_ids, feature_ids=feature_ids)

    return feature_values

In [1]:
from google.cloud import aiplatform


def batch_serve_features_to_bq_sample(
    project: str,
    location: str,
    featurestore_name: str,
    bq_destination_output_uri: str,
    read_instances_uri: str,
    sync: bool = True,
):

    aiplatform.init(project=project, location=location)

    fs = aiplatform.featurestore.Featurestore(featurestore_name=featurestore_name)

    SERVING_FEATURE_IDS = {
        "users": ["age", "gender", "liked_genres","favorites"],
        "movies": ["title", "average_rating", "genres"],
    }

    fs.batch_serve_to_bq(
        bq_destination_output_uri=bq_destination_output_uri,
        serving_feature_ids=SERVING_FEATURE_IDS,
        read_instances_uri=read_instances_uri,
        sync=sync,
    )


In [9]:
PROJECT_ID

NameError: name 'PROJECT_ID' is not defined

In [12]:
! gcloud services enable compute.googleapis.com \
                       containerregistry.googleapis.com \
                       aiplatform.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com 

Operation "operations/acat.p2-665554893092-2def6259-1b87-46c7-8d5e-ece13075da57" finished successfully.


In [18]:
! gsutil mb -c STANDARD -l us-central1 gs://vertex-nirvana-bucket/

Creating gs://vertex-nirvana-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'vertex-nirvana-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [32]:
BUCKET= f"gs://vertex-nirvana-bucket/"

In [20]:
from kfp.v2.dsl import (
    Dataset,
    Output,
    component,
)

@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest",
)
def data_download(
    data_url: str,
    split_date: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    import pandas as pd

    import logging

    logging.warn("Import file:", data_url)

    data = pd.read_csv(data_url)

    cancelled = (data["Cancelled"] > 0) | (data["Diverted"] > 0)
    completed_flights = data[~cancelled]

    training_data = completed_flights[["DepDelay", "TaxiOut", "Distance"]]
    # Consider flights that arrive more than 15 min late as delayed
    training_data["target"] = completed_flights["ArrDelay"] > 15

    test_data = training_data[completed_flights["FlightDate"] >= split_date]
    training_data = training_data[completed_flights["FlightDate"] < split_date]

    training_data.to_csv(dataset_train.path, index=False)
    test_data.to_csv(dataset_test.path, index=False)

In [23]:
from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    Output,
    component,
)

@component(
    base_image="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest",
)
def model_train(
    dataset: Input[Dataset],
    model: Output[Artifact],
):
    import pandas as pd
    import pickle
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression

    data = pd.read_csv(dataset.path)
    X = data.drop(columns=["target"])
    y = data["target"]

    model_pipeline = Pipeline(
        [
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(random_state=42)),
        ]
    )

    model_pipeline.fit(X, y)

    model.metadata["framework"] = "scikit-learn"
    model.metadata["containerSpec"] = {
        "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest"
    }

    file_name = model.path + "/model.pkl"
    import pathlib

    pathlib.Path(model.path).mkdir()
    with open(file_name, "wb") as file:
        pickle.dump(model_pipeline, file)

In [24]:
model.metadata['containerSpec'] = {
    'imageUri': "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest"
}

NameError: name 'model' is not defined

In [33]:
from kfp.v2.dsl import pipeline
from kfp.v2 import compiler

# from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
# from google_cloud_pipeline_components.v1.model import ModelUploadOp

@pipeline(name="gcp-mlops-v0", pipeline_root=PIPELINE_ROOT)
def pipeline(
    training_data_url: str = f"gs://{BUCKET}/data/2021/2012-01.csv",
    test_split_date: str = "2021-12-20",
):
    data_op = data_download(
        data_url=training_data_url,
        split_date=test_split_date
    )

    from google_cloud_pipeline_components.experimental.custom_job.utils import (
        create_custom_training_job_op_from_component,
    )

    custom_job_distributed_training_op = create_custom_training_job_op_from_component(
        model_train, replica_count=1
    )

    model_train_op = custom_job_distributed_training_op(
        dataset=data_op.outputs["dataset_train"],
        project=PROJECT_ID,
        location=REGION,
    )

    model_evaluate_op = model_evaluate(
        test_set=data_op.outputs["dataset_test"],
        model=model_train_op.outputs["model"],
    )

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-model",
        unmanaged_container_model=model_train_op.outputs["model"],
    ).after(model_evaluate_op)

    endpoint_create_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name="flight-delay-endpoint",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name="flight-delay-model",
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="gcp-mlops-v0.json"
)

ModuleNotFoundError: No module named 'google_cloud_pipeline_components'