In [13]:
from typing import NamedTuple

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google.client import AIPlatformClient


In [14]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  vijaymohireproject


In [15]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [16]:
BUCKET_NAME = "gs://vijaymohire_vertex_bucket"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

In [17]:
!gsutil ls -al $BUCKET_NAME

         9  2021-08-13T08:04:43Z  gs://vijaymohire_vertex_bucket/Test.txt#1628841883110934  metageneration=1
TOTAL: 1 objects, 9 bytes (9 B)


In [26]:
#! gsutil mb -l $REGION $BUCKET_NAME


Creating gs://vijaymohire_vertex_bucket_new/...
AccessDeniedException: 403 vertexservice@vijaymohireproject.iam.gserviceaccount.com does not have storage.buckets.create access to the Google Cloud project.


In [18]:
@component(output_component_file="hw.yaml", base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

In [19]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)


@component
def consumer(text1: str, text2: str, text3: str):
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")

In [30]:
#USER = "vijaymohire"  # <---CHANGE THIS
#PIPELINE_ROOT = "{}/pipeline_root/{}".format('vijaymohire_vertex_bucket', 'vijaymohire') 
PIPELINE_ROOT = 'gs://vijaymohire_vertex_bucket'

In [31]:
@dsl.pipeline(
    name="hello-world-v2",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def intro_pipeline(text: str = "hi there"):
    hw_task = hello_world(text)
    two_outputs_task = two_outputs(text)
    consumer_task = consumer(  # noqa: F841
        hw_task.output,
        two_outputs_task.outputs["output_one"],
        two_outputs_task.outputs["output_two"],
    )

In [32]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="hw_pipeline_job.json"
)

In [33]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(
    project_id='vijaymohireproject',
    region='uscentral1',
)

In [28]:
USER = "vijaymohire"  # <---CHANGE THIS
#PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)
PIPELINE_ROOT = 'gs://vijaymohire_vertex_bucket'
PIPELINE_ROOT

'gs://vijaymohire_vertex_bucket'

In [35]:
# RUN has been designed in Vertex AI Pipeline RUN 
#response = api_client.create_run_from_job_spec(
#    job_spec_path="hw_pipeline_job.json",
 #   pipeline_root=PIPELINE_ROOT,  # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
  #  parameter_values={
   #     'gcs_output_directory': 1
   # }
 #   )

In [None]:
# Copyright 2020 Google 
#This code is licensed under the Apache License, Version 2.0. You may obtain a copy of this license in the LICENSE.txt file in the root directory of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#Program executed by Bhadale IT in Google JupyterLab (https://www.bhadaleit.com).
#For more details on the code and tutorials visit Vertex AI website 