In [2]:
# Installing Dependencies
!pipenv update

[1mRunning[0m [33m[1m$ pipenv lock[0m [1mthen[0m [33m[1m$ pipenv sync[0m[1m.[0m
Locking[0m [33m[packages][0m dependencies...[0m
[?25lBuilding requirements[33m...[0m
[2KResolving dependencies[33m...[0m
[2K✔ Success! Locking...
[2K[32m⠦[0m Locking...
[1A[2KLocking[0m [33m[dev-packages][0m dependencies...[0m
[?25lBuilding requirements[33m...[0m
[2KResolving dependencies[33m...[0m
[2K✔ Success! Locking...
[2K[32m⠴[0m Locking...
[1A[2K[1mUpdated Pipfile.lock (d6f9e44df7ae7e4a36f8b94f06c165d77d00edf5560a42bd71f34b0edb0b652a)![0m
[1mInstalling dependencies from Pipfile.lock (0b652a)...[0m
To activate this project's virtualenv, run [33mpipenv shell[0m.
Alternatively, run a command inside the virtualenv with [33mpipenv run[0m.
[32mAll dependencies are now up-to-date![0m


# Library Imports 

In [3]:
# The KFP SDK version should be >=1.6
import kfp 
# !pip install -v https://files.pythonhosted.org/packages/65/75/8b706e1170e2c7b6242b1675259e47986bb4fc490f29387989a965972e6e/grpcio-1.44.0.tar.gz

import google.cloud.aiplatform as aip
import google_cloud_pipeline_components

print('KFP SDK version: {}'.format(kfp.__version__))
print('AI Platform version: {}'.format(aip.__version__))
print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))

KFP SDK version: 1.8.19
AI Platform version: 1.22.0
google_cloud_pipeline_components version: 1.0.39


# Environment Configuration

In [None]:
import os

PROJECT_ID = "ground-zero-377715"  # @param {type:"string"}
PROJECT_NUMBER="245954991926"
# ! gcloud config set project $PROJECT_ID
# REGION = "us-central1"  # @param {type: "string"}
GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')

BUCKET_NAME = "ga-pipelines"  # @param {type:"string"}
BUCKET_URI = f"gs://ga-pipelines"
SERVICE_ACCOUNT = "sa-vertex-ai@ground-zero-377715.iam.gserviceaccount.com"  # @param {type:"string"}

# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/minimal_pipeline"

# Initialite Vertex AI SDK
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)


In [None]:
# !gcloud iam service-accounts list

In [None]:
# Delegation to your service account

# !gcloud iam service-accounts add-iam-policy-binding "sa-vertex-ai@ground-zero-377715.iam.gserviceaccount.com" \
#    --member "user:muhammad.ahsan@ginkgo-analytics.com" \
#    --role "roles/iam.serviceAccountUser" \
#    --project="ground-zero-377715"

In [None]:
# Validate Access to Bucket
# !gsutil ls -al $BUCKET_URI

# Pipeline Architecture

### Pipeline -> Hello World

In [None]:
# Hello World Pipeline

from kfp.v2 import dsl
from kfp.v2 import compiler

from kfp.v2.dsl import component

@component
def hello_world(text: str) -> str:
    return text

@dsl.pipeline(name='hello-world', description='A simple introduction to pipelines')
def pipeline_hello_world(text: str = 'hi there'):
    """Pipeline that passes small pipeline parameter string to consumer op."""
    consume_task = hello_world(text=text)

### Pipeline -> String Concat and Reverse

In [45]:
from typing import NamedTuple
@component(output_component_file="../pipelines/concat_op.json")
def concat_op(a: str, b: str) -> str:
    return a + b

@component(output_component_file="../pipelines/reverse_op.json")
def reverse_op(a: str) -> NamedTuple("outputs", [("before", str),("after", str)]):
    return a, a[::-1]


@dsl.pipeline(name='string-manipulator', description='A pipeline to concatenate two strings and finally reverse it')
def pipeline_string_manipulator(first_string: str = 'muhammad', second_string: str = 'ahsan'):
    
    concat_task = concat_op(first_string, second_string) 
    reverse_task = reverse_op(concat_task.output)


# Pipeline Compilation

In [None]:
!rm '../pipelines/string_manipulator_pl.json'
compiler.Compiler().compile(
    pipeline_func=pipeline_string_manipulator,
    package_path='../pipelines/string_manipulator_pl.json')

# Pipeline Execution

In [None]:
# NOT WORKING - SERVICE ACCOUNT ISSUE


from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

DISPLAY_NAME = "hello_world_pipeline_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="../pipelines/hello_world_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={},
)

job.run()