### What we will learn

- We will build custom Kubeflow pipeline for classfication
- We will show how to use Vertex AI Experiments

In [37]:
import kfp

In [38]:
print(kfp.__version__)

1.8.14


In [39]:
from dotenv import load_dotenv
load_dotenv(dotenv_path='.env', verbose=True)

True

In [40]:
import os

BIGQUERY_PROJECT_ID = os.environ.get('BIGQUERY_PROJECT_ID')
BIGQUERY_DATASET = os.environ.get('BIGQUERY_DATASET')
BIGQUERY_DATASET_REGION = os.environ.get('BIGQUERY_DATASET_REGION')
BIGQUERY_TABLE = os.environ.get('BIGQUERY_TABLE')

VERTEXAI_PROJECT_ID = os.environ.get('VERTEXAI_PROJECT_ID')
VERTEXAI_REGION = os.environ.get('VERTEXAI_REGION')

BUCKET_NAME = os.environ.get('BUCKET_NAME')
BUCKET_URI = os.environ.get('BUCKET_URI')
BUCKET_REGION = os.environ.get('BUCKET_REGION')

PREFIX = os.environ.get('PREFIX')

print("BIGQUERY_PROJECT_ID: ",BIGQUERY_PROJECT_ID)
print("BIGQUERY_DATASET: ",BIGQUERY_DATASET)
print("BIGQUERY_DATASET_REGION: ",BIGQUERY_DATASET_REGION)
print("BIGQUERY_TABLE: ",BIGQUERY_TABLE)

print("VERTEXAI_PROJECT_ID: ",VERTEXAI_PROJECT_ID)
print("VERTEXAI_REGION: ",VERTEXAI_REGION)

print("BUCKET_NAME: ",BUCKET_NAME)
print("BUCKET_URI: ",BUCKET_NAME)
print("BUCKET_REGION: ",VERTEXAI_REGION)

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)

BIGQUERY_PROJECT_ID:  datafusionsbox
BIGQUERY_DATASET:  dataset4ccc
BIGQUERY_DATASET_REGION:  us
BIGQUERY_TABLE:  df_for_model_ccc_with_weights
VERTEXAI_PROJECT_ID:  datafusionsbox
VERTEXAI_REGION:  us-central1
BUCKET_NAME:  gcp-demo-ccc-vertexai
BUCKET_URI:  gcp-demo-ccc-vertexai
BUCKET_REGION:  us-central1


In [41]:
from kfp.v2.dsl import component, pipeline
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath)

In [42]:
@component(
  packages_to_install=["pandas","db-dtypes", "google-cloud-bigquery", "pyarrow"]
)
def stage(bq_projectid: str, bq_dataset: str, bq_table: str, output_dataset: OutputPath('staged_bq_table')):
    from google.cloud import bigquery
    import google.auth
    
    ##authenticate 
    auth_credentials, auth_project = google.auth.default()
    print("Project: "+auth_project)
    client = bigquery.Client(project=bq_projectid, credentials = auth_credentials)
    
    
    query = f"SELECT * FROM {bq_projectid}.{bq_dataset}.{bq_table}"
    print(query)
    
    ## fetch query results as dataframe
    dataframe = client.query(query).to_dataframe()
    print(dataframe.head()) 
    
    ## export resultset into csv file om GCS
    dataframe.to_csv(output_dataset)

In [27]:
@pipeline(name="wf-kubeflow-bq2gcs")
def pipeline(
    in_bq_projectid: str = 'defaultprojectid',
    in_bq_dataset: str = 'xxxx',
    in_bq_table: str = 'yyyy'
):
    stagingTask = stage(bq_projectid = in_bq_projectid,
                                   bq_dataset   = in_bq_dataset, 
                                   bq_table     = in_bq_table)

In [31]:
dag_yaml_filename="dag_kubeflow_bq2gcs.json"   ##The output path dag_kubeflow_bq2gcs.yaml should ends with ".json".

In [32]:
from kfp.v2 import compiler
compiler.Compiler().compile(
   pipeline_func=pipeline,
   package_path=dag_yaml_filename
)



In [33]:
PIPELINE_PARAMETERS = {
    "in_bq_projectid":  BIGQUERY_PROJECT_ID, 
    "in_bq_dataset":    BIGQUERY_DATASET,
    "in_bq_table":      BIGQUERY_TABLE
}

LABELS = {}
ENABLE_CACHING=True

In [34]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(display_name = "kfp_pipeline_bq2gcs",
                             template_path = dag_yaml_filename,
                             ##pipeline_root = PIPELINE_ROOT,
                             parameter_values = PIPELINE_PARAMETERS, ## Make sure PIPELINE_PARAMETERS collection does not include parameters that are unknown to pipeline
                             enable_caching = ENABLE_CACHING,
                             labels = LABELS,
                             project = VERTEXAI_PROJECT_ID,
                             location = VERTEXAI_REGION)

job.run(service_account="339239659794-compute@developer.gserviceaccount.com")

Creating PipelineJob
PipelineJob created. Resource name: projects/339239659794/locations/us-central1/pipelineJobs/wf-kubeflow-bq2gcs-20230214142528
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/339239659794/locations/us-central1/pipelineJobs/wf-kubeflow-bq2gcs-20230214142528')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/wf-kubeflow-bq2gcs-20230214142528?project=339239659794
PipelineJob projects/339239659794/locations/us-central1/pipelineJobs/wf-kubeflow-bq2gcs-20230214142528 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/339239659794/locations/us-central1/pipelineJobs/wf-kubeflow-bq2gcs-20230214142528 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/339239659794/locations/us-central1/pipelineJobs/wf-kubeflow-bq2gcs-20230214142528 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/339239659794/locations/us-centra

In [43]:
@component(
  packages_to_install=["pandas","db-dtypes","scikit-learn", "google-cloud-bigquery", "pyarrow"]
)
def preprocess(staged_dataset: InputPath('staged_bq_table'), 
               staged_training_dataset: OutputPath('staged_training_dataset'), 
               staged_validation_dataset: OutputPath('staged_validation_dataset'), 
               staged_test_dataset: OutputPath('staged_test_dataset')):
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    dataset = pd.read_csv(staged_bq_table, index_col=0)
    _excluded_columns = ["synerise_client_id"]
    _target_column="y_if_trans"
    _weight_column="weight"
    
    ## drop columns that are not needed
    dataset.drop(_excluded_columns, axis=1)
    
    dataset = dataset.loc[:, dataset.columns != _target_column]
    
    X = dataset.loc[:, dataset.columns != _target_column]
    ## Feature engineering if any, e.g
    ## from sklearn.preprocessing import MinMaxScaler
    ## scaler = MinMaxScaler(feature_range = (0,1))
    ## scaler.fit(X)
    
    Y = dataset[_target_column]
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=101)
    X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2, random_state=101)
    
    training_dataset = pd.concat([X_train,Y_train], axis = 1)
    validation_dataset = pd.concat([X_val,Y_val], axis = 1)
    test_dataset = pd.concat([X_test,Y_test], axis = 1)

    training_dataset.to_csv(staged_training_dataset)
    validation_dataset.to_csv(staged_validation_dataset)
    test_dataset.to_csv(staged_test_dataset)
    

In [None]:
@component(
  packages_to_install=["pandas","db-dtypes","scikit-learn", "pyarrow"]
)
def train(staged_training_dataset: InputPath('staged_training_dataset'), 
          staged_validation_dataset: InputPath('staged_validation_dataset'), 
          staged_test_dataset: InputPath('staged_test_dataset'),
          in_vertexai_experiment_name:str, 
          in_vertexai_region: str, 
          in_vertexai_projectid: str, 
          output_model: Output[Model]
         ):
    
    import sklearn.metrics as metrics
    from google.cloud import aiplatform
    from datetime import datetime
    import tensorflow as tf
    import keras_tuner 
    
    _METRICS = [
      tf.keras.metrics.TruePositives(name='tp'),
      tf.keras.metrics.FalsePositives(name='fp'),
      tf.keras.metrics.TrueNegatives(name='tn'),
      tf.keras.metrics.FalseNegatives(name='fn'), 
      tf.keras.metrics.BinaryAccuracy(name='accuracy'),
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),
      tf.keras.metrics.AUC(name='auc'),
      tf.keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
    ]
    
    ## function to build model
    def build_model(hptune):
        model = Sequential()
        model.add(Dense(units=128, activation = "relu"))
        model.add(
           Dense(
              # Define the hyperparameter
              units=hptune.Int("units", min_value=32, max_value=512, step=32),
              activation=hptune.Choice("activation",["relu","tanh"]),
                )
        )
        if hptune.Boolean("dropout"):
           model.add(Dropout(rate=0.25))
    
        model.add(Dense(1, activation="sigmoid"))
        learning_rate = hptune.Float("lr",min_value = 1e-4, max_value=1e-2, sampling="log")

        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
            loss=tf.keras.losses.BinaryCrossentropy(), 
            metrics=_METRICS,
        )
        return model
    
    training_dataset = pd.read_csv(staged_training_dataset)
    validation_dataset = pd.read_csv(staged_validation_dataset)
    test_dataset = pd.read_csv(staged_test_dataset)
    
    ##Create a Keras Hyperband Hyperparameter tuner with an accuracy objective
    tuner =  keras_tuner.Hyperband(
       hypermodel=build_model,
       objective="val_accuracy",
       max_epochs=50,
       factor=3,
       hyperband_iterations=1,
       seed=None,
       hyperparameters=None,
       tune_new_entries=True,
       allow_new_entries=True
    )
    
    stop_early = tensorflow.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5)
    tuner.search(x_train, y_train, epochs=50, validation_data=(x_val, y_val) , callbacks=[stop_early]) ## you can also use validation_split=0.2 if you do not have validation data
    
    # Get the optimal hyperparameters for the model as determined from the search
    best_hyperparameters=tuner.get_best_hyperparameters()[0]
    hypermodel = tuner.hypermodel.build(best_hyperparameters)
    history = model.fit(x_train, y_train, epochs=50, validation_data=(x_val, y_val))
    
    ## register hyperparameters and metrics as Vertex AI experiment run
    aiplatform.init(
       project=in_vertexai_projectid,
       location=in_vertexai_region,
       experiment=in_vertexai_experiment_name
    )
    
    run_id = f"run-{datetime.now().strftime('%Y%m%d%H%M%S')}"
    aiplatform.start_run(run_id)
    
    training_params = {
        'training_dataset': staged_training_dataset,
        'validation_dataset': staged_validation_dataset,
        'test_dataset': staged_test_dataset,
        'model_type': 'nn',
        'model_path': model_path
    }
    
    training_metrics = {
        'model_accuracy': metrics.accuracy_score(Y_test, predicted),
        'model_precision': metrics.precision_score(Y_test, predicted, average='macro'),
        'model_recall': metrics.recall_score(Y_test, predicted, average='macro'),
        'model_logloss': metrics.log_loss(Y_test, predicted),
        'model_auc_roc': metrics.roc_auc_score(Y_test, predicted)
    }
    
    aiplatform.log_params(training_params)
    aiplatform.log_metrics(training_metrics)
    
    model_path=os.path.split(output_model.path)
    hypermodel.save(model_path)

In [None]:
@component(
  packages_to_install=["pandas","db-dtypes","scikit-learn", "google-cloud-bigquery", "pyarrow"]
)
def gate(staged_training_dataset: InputPath('staged_training_dataset')): 

In [None]:
@component(
   packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-storage"]
)
def deploy():

In [None]:
from kfp.dsl import pipeline
from kfp.dsl import Condition

@pipeline(name="wf-mlops_pipeline")
def pipeline(
    wf_bq_projectid: str = 'defaultprojectid',
    wf_bq_dataset: str = 'xxxx',
    wf_bq_table: str = 'yyyy',
    
    wf_vertexai_project_id: str = 'defaultprojectid'
):
    stagingTask = stage(bq_projectid = wf_bq_projectid,
                                   bq_dataset   = wf_bq_dataset, 
                                   bq_table     = wf_bq_table)
    
    preprocessingTask = preprocess(stagingTask.output)
    
    trainingTask = train(staged_training_dataset = preprocessingTask.outputs['staged_training_dataset'],
                         staged_validation_dataset = preprocessingTask.outputs['staged_validation_dataset'],
                         staged_test_dataset = preprocessingTask.outputs['staged_test_dataset'],
                         in_vertexai_experiment_name = wf_vertexai_experiment, 
                         in_vertexai_region = wf_vertexai_region, 
                         in_vertexai_projectid = wf_vertexai_project_id, 
                        )

In [None]:
from kfp.v2 import compiler
compiler.Compiler().compile(
   pipeline_func=pipeline,
   package_path=dag_yaml_filename
)

In [None]:
PIPELINE_PARAMETERS = {
    "in_bq_projectid":  BIGQUERY_PROJECT_ID, 
    "in_bq_dataset":    BIGQUERY_DATASET,
    "in_bq_table":      BIGQUERY_TABLE
}

LABELS = {}
ENABLE_CACHING=True

In [None]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(display_name = "kfp_pipeline_bq2gcs",
                             template_path = dag_yaml_filename,
                             ##pipeline_root = PIPELINE_ROOT,
                             parameter_values = PIPELINE_PARAMETERS, ## Make sure PIPELINE_PARAMETERS collection does not include parameters that are unknown to pipeline
                             enable_caching = ENABLE_CACHING,
                             labels = LABELS,
                             project = VERTEXAI_PROJECT_ID,
                             location = VERTEXAI_REGION)

job.run(service_account="339239659794-compute@developer.gserviceaccount.com")