In [10]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.24 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

In [11]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

#### Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
project_id = "your projectid"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "your GCS bucket for pipeline root"

#### Create Pipeline Components

We can create a component from Python functions (inline) and from a container. We will first try inline python functions. 

Step 1: Define the python function

Step 2:  Use **kfp.components.create_component_from_func** build the component. This function takes four parameters.

**1.func**: The Python function to convert.

**2.base_image**: (Optional.) Specify the Docker container image to run this function in. 

**3.output_component_file**: (Optional.) Writes your component definition to a file. 

**4.packages_to_install**: (Optional.) A list of versioned Python packages to install before running your function.

Another thing we need to consider is passing parameters between components. We can pass simple parameters such as integer, string, tuple, dict, and list by values. To pass the large datasets or complex configurations, we can use files. We can annotate the Python function’s parameters to indicate input or output files for the component. 

Refer to  https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/ for more information.

#### Pipeline Component : Data Ingestion

In [4]:
@component
def add(a: float, b: float) -> float:
  '''Calculates sum of two arguments'''
  return a + b

In [5]:
@dsl.pipeline(
  name='addition-pipeline',
  description='An example pipeline that performs addition calculations.',
  pipeline_root='gs://my-pipeline-root/example-pipeline'
)
def add_pipeline(
  a: float=1,
  b: float=7,
):
  # Passes a pipeline parameter and a constant value to the `add` factory
  # function.
  first_add_task = add(a, 4)
  # Passes an output reference from `first_add_task` and a pipeline parameter
  # to the `add` factory function. For operations with a single return
  # value, the output reference can be accessed as `task.output` or
  # `task.outputs['output_name']`.
  second_add_task = add(first_add_task.output, b)

#### Compile the pipeline into a JSON file

In [6]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=add_pipeline,
        package_path='add_pipeline.json')



#### Submit the pipeline run

In [9]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="add-pipeline",
    template_path="add_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'a':8,
        'b': 9
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/958343845263/locations/us-central1/pipelineJobs/addition-pipeline-20221004084116
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/958343845263/locations/us-central1/pipelineJobs/addition-pipeline-20221004084116')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/addition-pipeline-20221004084116?project=958343845263
PipelineJob projects/958343845263/locations/us-central1/pipelineJobs/addition-pipeline-20221004084116 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/958343845263/locations/us-central1/pipelineJobs/addition-pipeline-20221004084116 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/958343845263/locations/us-central1/pipelineJobs/addition-pipeline-20221004084116 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/958343845263/locations/us-central1/pip