In [1]:
from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, Dataset, Model,
                     component)
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '' # Fill with Service Account

In [2]:
PROJECT_ID = '' # project id
REGION='asia-southeast1'
BUCKET_URI = f"gs://{PROJECT_ID}" # fill with gcs path

In [3]:
PIPELINE_NAME = "" #pipeline name
PIPELINE_ROOT = "{}/pipeline_root/taxi_trips".format(BUCKET_URI)

In [4]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [5]:
@component(
        packages_to_install=["google-cloud-bigquery", "google-cloud-storage", "pandas", "db-dtypes"]
)
def get_cleaned_data(bq_source: str, Clean_Data: Output[Dataset]) -> NamedTuple("Outputs", [("path", str)]):
    from google.cloud import bigquery
    from google.cloud import storage
    client = bigquery.Client(project='FILL with Project ID', location='asia-southeast1')
    QUERY = f"""
        SELECT trip_seconds, trip_miles, pickup_community_area, dropoff_community_area, fare, tolls, extras, tips FROM `{bq_source}`
    """

    df = client.query(QUERY).to_dataframe()
    df.to_csv(f'{Clean_Data.path}.csv', index=False)

  return component_factory.create_component_from_func(


In [6]:
@component(
        packages_to_install=["scikit-learn", "pandas", "google-cloud-storage", "fsspec", "gcsfs"]
)
def split_the_data(Clean_Data: Input[Dataset], Train_Data: Output[Dataset], Val_Data: Output[Dataset]):
    from sklearn.model_selection import train_test_split
    import pandas as pd
    df = pd.read_csv(f'{Clean_Data.path}.csv')
    train, test = train_test_split(df, test_size=0.2)
    train.to_csv(f'{Train_Data.path}.csv', index=False)
    test.to_csv(f'{Val_Data.path}.csv', index=False)

In [7]:
@component(
        base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-9:latest",
        packages_to_install=["google-cloud-storage", "fsspec", "gcsfs"]
)
def create_tensorflow_model(input_size: int,
                            hidden_layer_sizes: int,
                            output_size: int ,
                            activation_name: str,
                            output_activation_name: str,
                            Base_Model: Output[Model]):
    import tensorflow as tf
    tf.random.set_seed(seed=0)
    model = tf.keras.models.Sequential()
    model.add(tf.keras.Input(shape=(input_size,)))
    for layer_size in [hidden_layer_sizes]:
        model.add(tf.keras.layers.Dense(units=layer_size, activation=activation_name))
    # The last layer is left without activation
    model.add(tf.keras.layers.Dense(units=output_size, activation=output_activation_name))
    tf.keras.models.save_model(model, f'{Base_Model.path}')

In [8]:
@component(
        base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-9:latest",
        packages_to_install=["google-cloud-storage", "fsspec", "gcsfs"]
)
def train_model(
        Base_Model: Input[Model],
        Train_Data: Input[Dataset],
        learning_rate:float,
        optimizer_name:str,
        loss_function_name:str,
        batch_size:int,
        label_column_name:str,
        random_seed:int,
        metric_names:str,
        number_of_epochs:int,
        Trained_Model: Output[Model]):
    import tensorflow as tf
    tf.random.set_seed(seed=0)
    keras_model = tf.keras.models.load_model(filepath=f'{Base_Model.path}')
    optimizer_parameters = {}
    optimizer_parameters["learning_rate"] = learning_rate
    optimizer_config = {
        "class_name": optimizer_name,
        "config": optimizer_parameters,
    }
    optimizer = tf.keras.optimizers.get(optimizer_config)
    loss = tf.keras.losses.get(loss_function_name)
    training_dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=f'{Train_Data.path}.csv',
        batch_size=batch_size,
        label_name=label_column_name,
        header=True,
        # Need to specify num_epochs=1 otherwise the training becomes infinite
        num_epochs=1,
        shuffle=True,
        shuffle_seed=random_seed,
        ignore_errors=True,
    )
    def stack_feature_batches(features_batch, labels_batch):
        # Need to stack individual feature columns to create a single feature tensor
        # Need to cast all column tensor types to float to prevent error:
        # TypeError: Tensors in list passed to 'values' of 'Pack' Op have types [int32, float32, float32, int32, int32] that don't all match.
        list_of_feature_batches = list(tf.cast(x=feature_batch, dtype=tf.float32) for feature_batch in features_batch.values())
        return tf.stack(list_of_feature_batches, axis=-1), labels_batch

    training_dataset = training_dataset.map(stack_feature_batches)
    if metric_names == '':
        metric_names = None
    keras_model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metric_names,
    )
    keras_model.fit(
        training_dataset,
        epochs=number_of_epochs,
    )

    tf.keras.models.save_model(keras_model, f'{Trained_Model.path}')

In [9]:
@component(
        base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-9:latest",
        packages_to_install=["numpy", "google-cloud-storage", "fsspec", "gcsfs"]
)
def test_model(
        Trained_Model: Input[Model],
        Val_Data: Input[Dataset],
        label_column_name:str,
        batch_size:int,
        metrics: Output[Metrics]):
    import tensorflow as tf
    model = tf.saved_model.load(export_dir=f'{Trained_Model.path}')

    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=f'{Val_Data.path}.csv',
        batch_size=batch_size,
        label_name=label_column_name,
        header=True,
        num_epochs=1,
        shuffle=False,
        ignore_errors=False,
    )

    def stack_feature_batches(features_batch, labels_batch):
        # Need to stack individual feature columns to create a single feature tensor
        # Need to cast all column tensor types to float to prevent error:
        # TypeError: Tensors in list passed to 'values' of 'Pack' Op have types [int32, float32, float32, int32, int32] that don't all match.
        list_of_feature_batches = list(tf.cast(x=feature_batch, dtype=tf.float32) for feature_batch in features_batch.values())
        return tf.stack(list_of_feature_batches, axis=-1), labels_batch
    dataset_with_label = dataset.map(stack_feature_batches)
    mse = tf.keras.metrics.MeanSquaredError()
    rmse = tf.keras.metrics.RootMeanSquaredError()
    mae  = tf.keras.metrics.MeanAbsoluteError()
    for features_batch, labels_batch in dataset_with_label:  # Assuming you have labels in your dataset
        predictions_tensor = model(features_batch)
        
        # Update metrics
        mse.update_state(labels_batch, predictions_tensor)
        rmse.update_state(labels_batch, predictions_tensor)
        mae.update_state(labels_batch, predictions_tensor)

    # Get the results
    mse_result = mse.result().numpy()
    rmse_result = rmse.result().numpy()
    mae_result = mae.result().numpy()
    metrics.log_metric("mean_absolute_error", float(mae_result))
    metrics.log_metric("mean_squared_error", float(mse_result))
    metrics.log_metric("root_mean_squared_error", float(rmse_result))


In [10]:
@component(
        base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-9:latest",
        packages_to_install=["google-cloud-aiplatform", "google-cloud-storage", "fsspec", "gcsfs"]
)
def upload_model_to_vertex_ai(
    Trained_Model: Input[Model],
    use_gpu: bool,
    display_name: str,
    description: str,
    project: str,
    location: str,
)-> NamedTuple("Outputs", [("resource_name", str)]):
    import json
    from google.cloud import aiplatform

    model = aiplatform.Model.upload_tensorflow_saved_model(
        saved_model_dir=f'{Trained_Model.path}',
        tensorflow_version="2.9",
        use_gpu=use_gpu,

        display_name=display_name,
        description=description,

        project=project,
        location=location,
    )
    model_json = json.dumps(model.to_dict(), indent=2)
    print(model_json)
    resource_name = model.resource_name
    return (resource_name,)
                    

In [11]:
@component(
        base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-9:latest",
        packages_to_install=["google-cloud-aiplatform", "google-cloud-storage", "fsspec", "gcsfs"]
)
def deploy_model_to_vertex_ai_endpoint(
          model_name: str,
          endpoint_name: str = None,
          machine_type: str = "n1-standard-2",
          min_replica_count: int = 1,
          max_replica_count: int = 1,
          accelerator_type: str = None,
          accelerator_count: str = None,
      )-> NamedTuple("Outputs", [("endpoint_name", str)]):
        import json
        from google.cloud import aiplatform

        model = aiplatform.Model(model_name=model_name)

        if endpoint_name:
            endpoint = aiplatform.Endpoint(endpoint_name=endpoint_name)
        else:
            endpoint_display_name = model.display_name[:118] + "_endpoint"
            endpoint = aiplatform.Endpoint.create(
                display_name=endpoint_display_name,
                project=model.project,
                location=model.location,
            )

        endpoint = model.deploy(
            endpoint=endpoint,
            # deployed_model_display_name=deployed_model_display_name,
            machine_type=machine_type,
            min_replica_count=min_replica_count,
            max_replica_count=max_replica_count,
            accelerator_type=accelerator_type,
            accelerator_count=accelerator_count,
            # service_account=service_account,
            # explanation_metadata=explanation_metadata,
            # explanation_parameters=explanation_parameters,
            # encryption_spec_key_name=encryption_spec_key_name,
        )

        endpoint_json = json.dumps(endpoint.to_dict(), indent=2)
        print(endpoint_json)
        endpoint_name = endpoint.resource_name
        return (endpoint_name,)

In [12]:
@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def pipeline(
    bigquery_dataset: str,
    MODEL_DISPLAY_NAME: str,
    MACHINE_TYPE: str,
    project: str,
    gcp_region: str,
):
    # Get the Data
    get_cleaned_data_op = get_cleaned_data(bq_source=bigquery_dataset)

    # Split the Data into Train and Test
    splitted_data_op = split_the_data(Clean_Data=get_cleaned_data_op.outputs['Clean_Data'])

    # Build Tensorflow Model
    base_model = create_tensorflow_model(input_size=7,
                            hidden_layer_sizes=10,
                            output_size=1,
                            activation_name='elu',
                            output_activation_name='sigmoid')
    
    # Train the Model
    trained_model = train_model(Base_Model=base_model.outputs['Base_Model'],
                                Train_Data=splitted_data_op.outputs['Train_Data'],
                                learning_rate=0.1,
                                optimizer_name='Adadelta',
                                loss_function_name='mean_squared_error',
                                batch_size=32,
                                label_column_name='tips',
                                random_seed=0,
                                metric_names='',
                                number_of_epochs=1000)
    
    test_model(Trained_Model=trained_model.outputs['Trained_Model'],
               Val_Data=splitted_data_op.outputs['Val_Data'],
               label_column_name='tips',
               batch_size= 1)

    deploy_to_model_registry = upload_model_to_vertex_ai(Trained_Model=trained_model.outputs['Trained_Model'],
                        use_gpu=False, 
                        display_name=MODEL_DISPLAY_NAME,
                        description='',
                        project=project,
                        location=gcp_region
                        )
    
    deploy_model_to_vertex_ai_endpoint(model_name = deploy_to_model_registry.outputs['resource_name'], 
                                                            machine_type= MACHINE_TYPE)
    
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="demo-dataset-1-pipeline-ml.yaml",
)

## Run the Pipeline

In [13]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

In [14]:
# Set the display-names for Vertex AI resources
PIPELINE_DISPLAY_NAME = "demo-dataset-1-loan-data-pipeline"  # @param {type:"string"}
DATASET_DISPLAY_NAME = "demo-dataset-1-loan-data-dataset"  # @param {type:"string"}
MODEL_DISPLAY_NAME = "demo-dataset-1-loan-data-model"  # @param {type:"string"}
TRAINING_DISPLAY_NAME = "demo-dataset-1-loan-data-training"  # @param {type:"string"}
ENDPOINT_DISPLAY_NAME = "demo-dataset-1-loan-data-endpoint"  # @param {type:"string"}

# Otherwise, use the default display-names
if PIPELINE_DISPLAY_NAME == "demo-dataset-1-loan-data-pipeline":
    PIPELINE_DISPLAY_NAME = f"pipeline_demo1_{UUID}"

if DATASET_DISPLAY_NAME == "demo-dataset-1-loan-data-dataset":
    DATASET_DISPLAY_NAME = f"dataset_demo1_{UUID}"

if MODEL_DISPLAY_NAME == "demo-dataset-1-loan-data-model":
    MODEL_DISPLAY_NAME = f"model_demo1_{UUID}"

if TRAINING_DISPLAY_NAME == "demo-dataset-1-loan-data-training":
    TRAINING_DISPLAY_NAME = f"automl_training_demo1_{UUID}"

if ENDPOINT_DISPLAY_NAME == "demo-dataset-1-loan-data-endpoint":
    ENDPOINT_DISPLAY_NAME = f"endpoint_demo1_{UUID}"

# Set machine type
MACHINE_TYPE = "n1-standard-2"

In [15]:
# Validate region of the given source (BigQuery) against region of the pipeline
from google.cloud import bigquery
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '' # fill with the service account

bq_source = "" #bq source project_id.dataset.table

client = bigquery.Client(location='asia-southeast1')
bq_region = client.get_table(bq_source).location.lower()
try:
    assert bq_region in REGION
    print(f"Region validated: {REGION}")
except AssertionError:
    print(
        "Please make sure the region of BigQuery (source) and that of the pipeline are the same."
    )

Region validated: asia-southeast1


In [16]:
# Configure the pipeline
job = aiplatform.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path="demo-dataset-1-pipeline-ml.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project": PROJECT_ID,
        "gcp_region": REGION,
        "bigquery_dataset": f"{bq_source}",
        "MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME,
        "MACHINE_TYPE": MACHINE_TYPE,
    },
    enable_caching=False,
)

In [17]:
# Run the job
job.run(service_account='FILL with Service Account email')

Creating PipelineJob
PipelineJob created. Resource name: projects/8457519537/locations/asia-southeast1/pipelineJobs/tabular-demo-dataset-1-training-singapore-20240620140420
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/8457519537/locations/asia-southeast1/pipelineJobs/tabular-demo-dataset-1-training-singapore-20240620140420')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/tabular-demo-dataset-1-training-singapore-20240620140420?project=8457519537
PipelineJob projects/8457519537/locations/asia-southeast1/pipelineJobs/tabular-demo-dataset-1-training-singapore-20240620140420 current state:
PipelineState.PIPELINE_STATE_PENDING
PipelineJob projects/8457519537/locations/asia-southeast1/pipelineJobs/tabular-demo-dataset-1-training-singapore-20240620140420 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/8457519537/locations/asia-southeast1/pipelineJobs/tabular-demo