In [1]:
# ! pip install google-cloud-pipeline-components --user
# ! pip install kfp --upgrade --user

In [2]:
from kfp.dsl import component
from typing import  NamedTuple,  List, Dict
import json
from kfp.dsl import Dataset, Input, Model, Output
import google.cloud.aiplatform as aip
from kfp import dsl
from kfp import compiler
from kfp.dsl import component, Artifact 
from typing import NamedTuple
import datetime
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
import time

epoch_time = int(time.time())

#ML Metadata types https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.0.0/api/artifact_types.html
from google_cloud_pipeline_components.types.artifact_types import BQTable 
# We will also build a custom artifact

#### Set variables

In [65]:
PROJECT_ID = 'cpg-cdp'
BUCKET_NAME = f'jsw-artifact-{PROJECT_ID}'
BUCKET = f'gs://{BUCKET_NAME}'
YEAR = 2016
ARTIFACT_BLOB = f'artifacts/baseball-{YEAR}'
DATASET_ID = 'google_trends_my_project' #blank created BQ dataset
OUTPUT_TABLE_NAME = f'baseball-schedule-{YEAR}'
PIPELINE_ROOT = f"{BUCKET}/pipeline_root/brandwatch_ingest_{now}"
PIPELINE_ROOT = PIPELINE_ROOT.replace(' ', '')


#### Create a bucket to store the pandas artifact
This is to demonstrate a custom artifact

Use [this guide](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.0.0/api/artifact_types.html) for the documentation on specialized platform metadata

Note that standard components will produce the proper metadata, use this technique when building custom KFP components

In [66]:
# !gsutil mb -l us-central1 $BUCKET 

#### Component to create a BQ Table

In [67]:
@component(
        base_image='python:3.9',
        packages_to_install=[
            'google.cloud.bigquery'
            ,'google.cloud.storage'
            ,'google_cloud_pipeline_components'
            ],
)
def pull_baseball_data(
    year: int,
    project_id: str,
    dataset_id: str,
    output_table_name: str,
) -> NamedTuple('outputs'
                , year=int
                , bq_table=Artifact):

    from google.cloud import bigquery
    from google_cloud_pipeline_components.types.artifact_types import BQTable 

    client = bigquery.Client(project=project_id)
    output_table_uri = f'{project_id}.{dataset_id}.{output_table_name}'
    query = f"""CREATE OR REPLACE TABLE `{output_table_uri}` AS (
                SELECT * FROM `bigquery-public-data.baseball.schedules` WHERE year = {year}
            )
            """
    get_baseball_by_year_job = client.query(query)
    get_baseball_by_year_job.result()
    # make sure name = none so Vertex sets the name to the ML datastore
    bq_table = BQTable.create(name = None
                              , project_id = project_id
                              , dataset_id = dataset_id
                              , table_id = output_table_name
                            )

    return (year
        , bq_table
    )

#### Component to consume the BQ Artifact and write a generic artifact to GCS

In [73]:
@component(
        base_image='python:3.9',
        packages_to_install=[
            'google.cloud.bigquery'
            ,'google.cloud.storage'
            ,'pandas'
            ,'google_cloud_pipeline_components'
            ,'db-dtypes'
            ],
)
def save_schedule_to_gcs(
    bq_table: Input[BQTable],
    project_id: str,
    bucket_name: str,
    destination_blob_name: str,
) -> NamedTuple('outputs'
, csv_data=Artifact):
    from google.cloud import bigquery, storage
    from kfp.dsl import Artifact 


    client = bigquery.Client(project=project_id)
    input_table_uri = f"{bq_table.metadata['projectId']}.{bq_table.metadata['datasetId']}.{bq_table.metadata['tableId']}" #using the artifact properties
    query = f"SELECT gameId, homeTeamName, awayTeamName, startTime FROM `{input_table_uri}`"
    
    get_baseball_by_year_job = client.query(query)
    result_df = get_baseball_by_year_job.result().to_dataframe()

    #save the data frame to gcs and produce an artifact
    storage_client = storage.Client(project=project_id)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_string(result_df.to_csv(), 'text/csv')

    csv_data = Artifact(name=None
                            , uri = f'gs://{bucket_name}/{destination_blob_name}')
    
        
    return (csv_data ,)

In [74]:
VERSION = 'v0_2'
NAME = 'ml_metadata_simple_' + VERSION

@dsl.pipeline(
    name=NAME,
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    year: int = YEAR,
    project_id: str = PROJECT_ID,
    dataset_id: str = DATASET_ID,
    output_table_name: str = OUTPUT_TABLE_NAME,
    bucket_name: str = BUCKET_NAME,
    destination_blob_name: str = ARTIFACT_BLOB
):

    pull_baseball_data_op = pull_baseball_data(
        year = year,
        project_id = project_id,
        dataset_id = dataset_id,
        output_table_name = output_table_name
    ).set_display_name("Pull Public Baseball Data Schedules")

    save_schedule_pandas_gcs_op = save_schedule_to_gcs(
        bq_table = pull_baseball_data_op.outputs['bq_table'],
        project_id = project_id,
        bucket_name = bucket_name,
        destination_blob_name = destination_blob_name
    ).set_display_name("Save the csv data to GCS")

In [75]:
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="artifact_pipeline_example.json".replace(" ", "_")
)

In [76]:
job = aip.PipelineJob(
    display_name="Getting artifacts with baseball data",
    template_path="artifact_pipeline_example.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True,
    parameter_values={
            'year': YEAR,
            'project_id': PROJECT_ID,
            'dataset_id': DATASET_ID,
            'output_table_name': OUTPUT_TABLE_NAME,
            'bucket_name': BUCKET_NAME,
            'destination_blob_name': ARTIFACT_BLOB
            },
    failure_policy="slow" #in case one of the many parallel/async components fails, it all stops
)
SERVICE_ACCOUNT = "social-pulse-pipeline@cpg-cdp.iam.gserviceaccount.com"


In [77]:
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/939655404703/locations/us-central1/pipelineJobs/ml-metadata-simple-v0-2-20231201002822
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/939655404703/locations/us-central1/pipelineJobs/ml-metadata-simple-v0-2-20231201002822')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/ml-metadata-simple-v0-2-20231201002822?project=939655404703
