In [1]:
import kfp

from typing import NamedTuple

from kfp.dsl import pipeline
from kfp.dsl import component
from kfp.dsl import OutputPath
from kfp.dsl import InputPath


from kfp.dsl import Output
from kfp.dsl import Metrics

from kfp import compiler
#from kfp.google.client import AIPlatformClient


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

#from google_cloud_pipeline_components import aiplatform as gcc_aip

from google_cloud_pipeline_components.v1.model import ModelUploadOp

In [2]:
from google.oauth2 import service_account
import google.auth
from dotenv import load_dotenv
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv('SERVICE_ACCOUNT_KEY')

# Set credentials as default for session
google.auth.default()

(<google.oauth2.credentials.Credentials at 0x131f92700>,
 'ml-pipelines-project-433602')

In [3]:
credentials, project_id = google.auth.default()

In [4]:
project_id

'ml-pipelines-project-433602'

In [5]:
!gcloud auth application-default print-access-token

ya29.a0AcM612zHX-oNPmShEz3ulVONnaCtD4lg21-0t5DOmIR_5jwrlY5eqyFCELpwUk5giJZF21lK5AA26wxRHoe2YkFHj75UTS0_PxjJd2Bw7gI7OB8ZiwuItRV85ivAxuWfXd5a5MbtrbgAsd_JQEVlLMMIKpE8ono-d3DMfXCcaCgYKAd4SARESFQHGX2MiexMIwGwb_d4NLdmbeZ3spQ0175


In [6]:
# !gcloud auth application-default login

In [7]:
# from google.cloud import storage
# client = storage.Client(credentials=credentials)
# # Try to access or create the bucket
# bucket_name = 'sb-test-bucket-name'
# bucket = client.bucket(bucket_name)

# if not bucket.exists():
#     bucket = client.create_bucket(bucket_name)
#     print(f'Bucket {bucket_name} created.')
# else:
#     print(f'Bucket {bucket_name} already exists.')

# print('Bucket status checked.')

In [8]:
PROJECT_ID = "ml-pipelines-project-433602"
PIPELINE_ROOT = "gs://sb-vertex-temp/"
REGION = "us-central1"

## Clients

In [9]:
aiplatform.init(project=PROJECT_ID,
                location=REGION)

# Pipeline Basic

## Components

In [10]:
@component()
def concat(a: str, b: str) -> str:
  return a + b

  return component_factory.create_component_from_func(


In [11]:
@component
def reverse(a: str)->NamedTuple("outputs", [("before", str), ("after", str)]):
  return a, a[::-1]

## Pipeline

In [12]:
@pipeline(name="basic-pipeline",
          pipeline_root=PIPELINE_ROOT + "basic-pipeline")
def basic_pipeline(a: str='stres', b: str='sed'):
    concat_task = concat(a=a, b=b)
    reverse_task = reverse(a=concat_task.output)

## Compile

In [13]:
compiler.Compiler().compile(
pipeline_func=basic_pipeline, package_path="basic_pipeline.json"
)

## Run

In [14]:
job = pipeline_jobs.PipelineJob(
    display_name="basic-pipeline",
    template_path="basic_pipeline.json",
    parameter_values={"a": "stres", "b": "sed"}
)

In [15]:
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/553515216022/locations/us-central1/pipelineJobs/basic-pipeline-20240825002256
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/553515216022/locations/us-central1/pipelineJobs/basic-pipeline-20240825002256')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/basic-pipeline-20240825002256?project=553515216022
PipelineJob run completed. Resource name: projects/553515216022/locations/us-central1/pipelineJobs/basic-pipeline-20240825002256


# Component Specification (function based component)

In [16]:
@component(output_component_file="concat_component.yaml")
def concat(a: str, b: str) -> str:
  return a + b

  @component(output_component_file="concat_component.yaml")
  def concat(a: str, b: str) -> str:
  return component_factory.create_component_from_func(


In [17]:
!cat ./concat_component.yaml

# PIPELINE DEFINITION
# Name: concat
# Inputs:
#    a: str
#    b: str
# Outputs:
#    Output: str
components:
  comp-concat:
    executorLabel: exec-concat
    inputDefinitions:
      parameters:
        a:
          parameterType: STRING
        b:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-concat:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - concat
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec


# Pipeline with GPU and machine type

In [18]:
@component(output_component_file="gpu_training.yaml",
           base_image="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-14.py310:latest")
def gpuTrainingFunc() -> bool:
  import logging
  import tensorflow as tf

  gpus = tf.config.list_physical_devices('GPU')

  for gpu in gpus:
    logging.info('Name: {} Type: {}'.format(gpu.name, gpu.device_type))

  return True

  @component(output_component_file="gpu_training.yaml",
  def gpuTrainingFunc() -> bool:


In [21]:
@pipeline(name="gpu-pipeline",
          pipeline_root=PIPELINE_ROOT + "gpu-pipeline")
def gpu_pipeline():
    gpuTraining = gpuTrainingFunc().add_node_selector_constraint("NVIDIA_TESLA_T4")
    #.add_node_selector_constraint(
    #    label_name="cloud.google.com/gke-accelerator",
    #    value="NVIDIA_TESLA_T4")

In [22]:
compiler.Compiler().compile(
  pipeline_func=gpu_pipeline, package_path="gpu_pipeline.json"
)

In [23]:
job = pipeline_jobs.PipelineJob(
   display_name="gpu-pipeline",
   template_path="gpu_pipeline.json"
)

In [24]:
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-20240825003347
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-20240825003347')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/gpu-pipeline-20240825003347?project=553515216022
PipelineJob projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-20240825003347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-20240825003347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-20240825003347 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/553515216022/locations/us-central1/pipelineJobs/gpu-pipeline-2024082

RuntimeError: Job failed with:
code: 9
message: " The DAG failed because some tasks failed. The failed tasks are: [gputrainingfunc].; Job (project_id = ml-pipelines-project-433602, job_id = 411723948770721792) is failed due to the above error.; Failed to handle the job: {project_number = 553515216022, job_id = 411723948770721792}"


# Schedule

In [None]:
from kfp.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID, region='us-central1')