In [None]:
pip install --upgrade google-cloud-pipeline-components

## Google Cloud Pipeline Components
The Google Cloud (GCPC) SDK provides a set of prebuilt Kubeflow Pipelines components that are production quality, performant, and easy to use.

For example, you can use these components to complete the following:
- Create a new dataset and load different data types into the dataset (image, tabular, text, or video).
- Export data from a dataset to Cloud Storage.
- Use AutoML to train a model using image, tabular, text, or video data.
- Run a custom training job using a custom container or a Python package.
- Upload an existing model to Vertex AI for batch prediction.
- Create a new endpoint and deploy a model to it for online predictions.  

Additionally, these prebuilt Google Cloud Pipeline Components are supported in Vertex AI Pipelines and offer the following benefits:
- **Easier debugging**: Show the underlying resources launched from the component for simplified debugging.
- **Standardized artifact types**: Provide consistent interfaces to use standard artifact types for input and output. These standard artifacts are tracked in Vertex ML Metadata, making it easier to analyze the lineage of your pipeline's artifacts.
- **Understand pipeline costs with billing labels**: Resource labels are automatically propagated to Google Cloud services generated by the Google Cloud Pipeline Components in your pipeline run. You can use billing labels along with Cloud Billing export to BigQuery to review the cost of your pipeline run.
- **Cost efficiencies***: Vertex AI Pipelines optimize the execution of these components by launching the Google Cloud resources, without having to launch the container. This reduces the startup latency and reduces the costs of the busy-waiting container.

>\*	This feature applies to the following components only:  
    - CustomTrainingJobOp  
    - WaitGcpResourcesOp (for Dataflow)  
    - DataflowPythonJobOp  

**[Google Cloud Pipeline Components list](https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list)**

## Use Google Cloud Pipeline Components 
When you use Google Cloud Pipeline Components (GCPC), you can use the following Vertex AI and Google Cloud features to secure your components and artifacts.

### Specify a service account for a component
When you use a component, you can optionally specify a service account. Your component launches and acts with the permissions of this service account.

In [None]:
model_deploy_op = ModelDeployOp(model=training_job_run_op.outputs["model"],
    endpoint=endpoint_op.outputs["endpoint"],
    automatic_resources_min_replica_count=1,
    automatic_resources_max_replica_count=1,
    service_account="SERVICE_ACCOUNT_ID@PROJECT_ID.iam.gserviceaccount.com")

### Use VPC Service Controls to prevent data exfiltration
VPC Service Controls can help you mitigate the risk of data exfiltration from Vertex AI Pipelines. When you use VPC Service Controls to create a service perimeter, resources and data that are created by Vertex AI Pipelines and the Google Cloud Pipeline Components are automatically protected. For example, when you use VPC Service Controls to protect your pipeline, the following artifacts can't leave your service perimeter:
- Training data for an AutoML model
- Models that you created
- Results from a batch prediction request

### Set up VPC Network Peering
You can configure Google Cloud Pipeline Components to peer with a Virtual Private Cloud by providing extra parameters.

In [None]:
endpoint_create_op = EndpointCreateOp(
    project="PROJECT_ID",
    location="REGION",
    display_name="endpoint-display-name",
    network="NETWORK")      # The VPC network

### Use customer-managed encryption keys (CMEK)
By default, Google Cloud automatically encrypts data when at rest using encryption keys managed by Google. If you have specific compliance or regulatory requirements related to the keys that protect your data, you can use customer-managed encryption keys (CMEK) for your resources.

#### Configuring your component with CMEK
After you create a key ring and key in Cloud Key Management Service, and grant Vertex AI encrypter and decrypter permissions for your key, you can create a new CMEK-supported component by specifying your key as one of the create parameters.

In [None]:
model_batch_predict_op = ModelBatchPredictOp(project="PROJECT_ID",
    model=model_upload_op.outputs["model"],
    encryption_spec_key_name="projects/PROJECT_ID/locations/LOCATION_ID/keyRings/KEY_RING_NAME/cryptoKeys/KEY_NAME")

> **Note**: Google Cloud components that aren't Vertex AI components might require additional permissions. For example, a BigQuery component might require encryption and decryption permission. In addition, the location of the CMEK key must be the same as the location of the component.

### Consume or produce artifacts in your component
The Google Cloud SDK defines a set of ML metadata artifact types that serve as component input and output. Some Google Cloud Pipeline Components consume these artifacts as input or produce them as output

#### Consume an artifact in component YAML
The artifact's metadata can serve as input to a component. To prepare an artifact to be consumed as input, you must extract it and put it in a component YAML file.

For example, the `ModelUploadOp` component generates a `google.VertexModel` artifact which can be consumed by a `ModelDeployOp` component. Use the following code in a component YAML file to retrieve the a Vertex AI Model resource from the inputs (reference):  
`"model": "',"{{$.inputs.artifacts['model'].metadata['resourceName']}}", '"'`

#### Consume an artifact in a lightweight Python component

In [None]:
from kfp.dsl import Artifact, Input

@dsl.component
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    model: Input[Artifact],
) :
   # Consumes the `resourceName` metadata
   model_resource_path = model.metadata["resourceName"]

### Create an ML artifact

#### Use an importer node
The following example creates an Importer node that registers a new artifact entry to Vertex ML Metadata. The importer node takes the artifact's URI and metadata as primitives and packages them into an artifact.

In [None]:
from google_cloud_pipeline_components import v1
from google_cloud_pipeline_components.types import artifact_types
from kfp.components import importer_node
from kfp import dsl

@dsl.pipeline(name=_PIPELINE_NAME)
def pipeline():
  # Using importer and UnmanagedContainerModel artifact for model upload
  # component.
  importer_spec = importer_node.importer(
      artifact_uri='gs://managed-pipeline-gcpc-e2e-test/automl-tabular/model',
      artifact_class=artifact_types.UnmanagedContainerModel,
      metadata={
          'containerSpec': {
              'imageUri':
                  'us-docker.pkg.dev/vertex-ai/automl-tabular/prediction-server:prod'
          }
      })

  # Consuming the UnmanagedContainerModel artifact for the previous step
  model_upload_with_artifact_op = v1.model.ModelUploadOp(
      project=_GCP_PROJECT_ID,
      location=_GCP_REGION,
      display_name=_MODEL_DISPLAY_NAME,
      unmanaged_container_model=importer_spec.outputs['artifact'])

#### Use Python function-based components
The following example shows how to output a Vertex ML Metadata artifact directly from a Python component.

In [None]:
from google_cloud_pipeline_components import v1
from kfp.components import importer_node
from kfp import dsl

@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-aiplatform'],
)
# Note currently KFP SDK doesn't support outputting artifacts in `google` namespace.
# Use the base type dsl.Artifact instead.
def return_unmanaged_model(model: dsl.Output[dsl.Artifact]):
  model.metadata['containerSpec'] = {
      'imageUri':
          'us-docker.pkg.dev/vertex-ai/automl-tabular/prediction-server:prod'
  }
  model.uri = f'gs://automl-tabular-pipeline/automl-tabular/model'

@dsl.pipeline(name=_PIPELINE_NAME)
def pipeline():

  unmanaged_model_op = return_unmanaged_model()

  # Consuming the UnmanagedContainerModel artifact for the previous step
  model_upload_with_artifact_op = v1.model.ModelUploadOp(
      project=_GCP_PROJECT_ID,
      location=_GCP_REGION,
      display_name=_MODEL_DISPLAY_NAME,
      unmanaged_container_model=unmanaged_model_op.outputs['model'])
      

#### Use your own container-based component
The following example shows how to generate a `VertexBatchPredictionJob` artifact as output from a container-based component using the [artifact_types.py](https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/types/artifact_types.py) utility class.

In [None]:
bp_job_artifact = VertexBatchPredictionJob(
    'batchpredictionjob', vertex_uri_prefix + get_job_response.name,
    get_job_response.name, get_job_response.output_info.bigquery_output_table,
    get_job_response.output_info.bigquery_output_dataset,
    get_job_response.output_info.gcs_output_directory)

    output_artifacts = executor_input_json.get('outputs', {}).get('artifacts', {})
    executor_output['artifacts'] = bp_job_artifact.to_executor_output_artifact(output_artifacts[bp_job_artifact.name])

## Build your own pipeline components

### Write a component to show a Google Cloud console link
It's common that when running a component, you want to not only see the link to the component job being launched, but also the link to the underlying cloud resources, such as the Vertex batch prediction jobs or dataflow jobs.

The [`gcp_resource` proto](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud/google_cloud_pipeline_components/proto) is a special parameter that you can use in your component to enable the Google Cloud console to provide a customized view of the resource's logs and status in the Vertex AI Pipelines console.

#### Output the gcp_resource parameter

##### Using a container-based component
First, you'll need to define the gcp_resource parameter in your component.

In [None]:
from typing import List

from google_cloud_pipeline_components import _image
from google_cloud_pipeline_components import _placeholders
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath


@container_component
def dataflow_python(
    python_module_path: str,
    temp_location: str,
    gcp_resources: OutputPath(str),
    location: str = 'us-central1',
    requirements_file_path: str = '',
    args: List[str] = [],
    project: str = _placeholders.PROJECT_ID_PLACEHOLDER,
):
  # fmt: off
  """Launch a self-executing Beam Python file on Google Cloud using the
  Dataflow Runner.

  Args:
      location: Location of the Dataflow job. If not set, defaults to `'us-central1'`.
      python_module_path: The GCS path to the Python file to run.
      temp_location: A GCS path for Dataflow to stage temporary job files created during the execution of the pipeline.
      requirements_file_path: The GCS path to the pip requirements file.
      args: The list of args to pass to the Python file. Can include additional parameters for the Dataflow Runner.
      project: Project to create the Dataflow job. Defaults to the project in which the PipelineJob is run.

  Returns:
      gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
  """
  # fmt: on
  return ContainerSpec(
      image=_image.GCPC_IMAGE_TAG,
      command=[
          'python3',
          '-u',
          '-m',
          'google_cloud_pipeline_components.container.v1.dataflow.dataflow_launcher',
      ],
      args=[
          '--project',
          project,
          '--location',
          location,
          '--python_module_path',
          python_module_path,
          '--temp_location',
          temp_location,
          '--requirements_file_path',
          requirements_file_path,
          '--args',
          args,
          '--gcp_resources',
          gcp_resources,
      ],
  )

Next, inside the container, install the Google Cloud Pipeline Components package:  
`pip install --upgrade google-cloud-pipeline-components`   

In [None]:
from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import MessageToJson

dataflow_resources = GcpResources()
dr = dataflow_resources.resources.add()
dr.resource_type='DataflowJob'
dr.resource_uri='https://dataflow.googleapis.com/v1b3/projects/[your-project]/locations/us-east1/jobs/[dataflow-job-id]'

with open(gcp_resources, 'w') as f:
    f.write(MessageToJson(dataflow_resources))

##### Using a Python component
Alternatively, you can return the gcp_resources output parameter as you would any string output parameter:

In [None]:
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['google-cloud-pipeline-components==2.19.0'],
)
def launch_dataflow_component(project: str, location:str) -> NamedTuple("Outputs",  [("gcp_resources", str)]):
  # Launch the dataflow job
  dataflow_job_id = [dataflow-id]
  dataflow_resources = GcpResources()
  dr = dataflow_resources.resources.add()
  dr.resource_type='DataflowJob'
  dr.resource_uri=f'https://dataflow.googleapis.com/v1b3/projects/{project}/locations/{location}/jobs/{dataflow_job_id}'
  gcp_resources=MessageToJson(dataflow_resources)
  return gcp_resources

##### Supported resource_type values
You can set the `resource_type` to be an arbitrary string, but only the following types have links in the Google Cloud console:
- BatchPredictionJob
- BigQueryJob
- CustomJob
- DataflowJob
- HyperparameterTuningJob

### Write a component to cancel the underlying resources
When a pipeline job is canceled, the default behavior is for the underlying Google Cloud resources to keep running. They are not canceled automatically. To change this behavior, you should attach a `SIGTERM` handler to the pipeline job. A good place to do this is just before a polling loop for a job that could run for a long time.

Cancellation has been implemented on several Google Cloud Pipeline Components, including:
- Batch prediction job
- BigQuery ML job
- Custom job
- Dataproc Serverless batch job
- Hyperparameter tuning job

For more information, including sample code that shows how to attach a `SIGTERM` handler, see the following GitHub links:
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.19.0/components/google-cloud/google_cloud_pipeline_components/container/utils/execution_context.py
- https://github.com/kubeflow/pipelines/blob/google-cloud-pipeline-components-2.19.0/components/google-cloud/google_cloud_pipeline_components/container/v1/gcp_launcher/job_remote_runner.py#L124

Consider the following when implementing your SIGTERM handler:
- Cancellation propagation works only after the component has been running for a few minutes. This is typically due to background startup tasks that need to be processed before the Python signal handlers are called.
- Some Google Cloud resources might not have cancellation implemented. For example, creating or deleting a Vertex AI Endpoint or Model could create a long-running operation that accepts a cancellation request through its REST API, but doesn't implement the cancellation operation itself.