In [79]:
#import sys
#print(sys.executable)

In [1]:
import os
import datetime
import importlib
import kfp
from kfp import dsl, compiler
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.model import ModelUploadOp,ModelExportOp

In [2]:
NOTEBOOK = 'pipeline_babyweight'
PROJECT = 'babyweight-mlops'
REGION = "us-central1"
BUCKET = 'mlops_exp_prod_babyweight'
APPNAME = "babyweight"
BQ_DATASET = "babyweight"
GOOGLE_APPLICATION_CREDENTIALS = '/Users/zacharynguyen/Documents/GitHub/End-to-end-MLOps-with-VertexAI/auth/babyweight-mlops-a0f3cc4a4260.json'

os.environ["REGION"] = REGION
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_APPLICATION_CREDENTIALS

## 1. Construct the Pipeline

In [3]:
from src.pipeline import zach_prepare_data_component
#from src_v1.pipeline import bqml_component
#from src_v1.pipeline import custom_model_component
#from src_v1.pipeline import model_selection_component

importlib.reload(zach_prepare_data_component)
#importlib.reload(bqml_component)
#importlib.reload(custom_model_component)
#importlib.reload(model_selection_component)

<module 'src.pipeline.zach_prepare_data_component' from '/Users/zacharynguyen/Documents/GitHub/End-to-end-MLOps-with-VertexAI/src/pipeline/zach_prepare_data_component.py'>

In [4]:
PIPELINE_URI = f"gs://{BUCKET}/{APPNAME}"
TRAINING_PIPELINE_DISPLAY_NAME = f"{APPNAME}-training-pipeline"

SOURCE_BQ_TABLE_ID = "bigquery-public-data.samples.natality"
LIMIT = 25000


#TRAINING_BQ_TABLE_ID = f"{SOURCE_BQ_TABLE_ID}_prepped_limit_{LIMIT}" 
#DATASET_DISPLAY_NAME= f"babyweight_prepped_limit_{LIMIT}_dataset"

BQML_MODEL_NAME = f"bqml_dnn_model_{APPNAME}"
BQML_MODEL_ID = f"{PROJECT}.{BQ_DATASET}.{BQML_MODEL_NAME}"
BQML_TRAININGS_URI = f"{PIPELINE_URI}/bqml-trainings"
BQML_MODEL_OUTPUT_DIR = f"{BQML_TRAININGS_URI}/{BQML_MODEL_NAME}"

CUSTOM_MODEL_NAME = f"custom_dnn_model_{APPNAME}"
CUSTOM_TRAININGS_URI = f"{PIPELINE_URI}/custom-trainings"
CUSTOM_MODEL_OUTPUT_DIR = f"{CUSTOM_TRAININGS_URI}/{CUSTOM_MODEL_NAME}"
TRAINING_REPLICA_COUNT = 1
TRAINING_MACHINE_TYPE = "n1-highmem-16"
TRAINING_ACCELERATOR_TYPE = "NVIDIA_TESLA_K80"
TRAINING_ACCELERATOR_COUNT = 4

#TENSORBOARD_INSTANCE = tb.resource_name
NUM_EPOCHS = 10
BATCH_SIZE = 256
HIDDEN_UNITS = "128,64,16"
DROPOUT_RATE = 0.4
LEARNING_RATE = 0.01
NUM_EMBEDS = 14
PARAMETERS_DICT = {
    "num_epochs": NUM_EPOCHS,
    "batch_size": BATCH_SIZE,
    "hidden_units": HIDDEN_UNITS,
    "dropout_rate": DROPOUT_RATE,
    "learning_rate": LEARNING_RATE,
    "num_embeds": NUM_EMBEDS,
}

THRESHOLDS_DICT = {"mse": 3.0}
ENDPOINT_DISPLAY_NAME = f"endpoint_{APPNAME}"

DEPLOY_IMAGE="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest"
DEPLOY_MACHINE_TYPE = "n1-highmem-16"
DEPLOY_MIN_REPLICA_COUNT = 1
DEPLOY_MAX_REPLICA_COUNT = 2

#EXPLANATION_PARAMATERS = {"sampledShapleyAttribution": {"pathCount": 10}}
#EXPLANATION_METADATA_JSON_PATH = f"{CUSTOM_TRAININGS_URI[5:]}/explanation_metadata.json"

In [5]:
#from google_cloud_pipeline_components.v1.custom_job import create_custom_training_job_op_from_component
#
#custom_model_training_op = create_custom_training_job_op_from_component(
#    component_spec=custom_model_component.train_evaluate_custom_model_op, 
#    replica_count=TRAINING_REPLICA_COUNT,
#    machine_type=TRAINING_MACHINE_TYPE,
#    accelerator_type=TRAINING_ACCELERATOR_TYPE,
#    accelerator_count=TRAINING_ACCELERATOR_COUNT,
#    service_account=SERVICE_ACCOUNT,
#    tensorboard=TENSORBOARD_INSTANCE,
#    base_output_directory=BASE_OUTPUT_DIR
#)

In [17]:
@dsl.pipeline(name = APPNAME, pipeline_root = PIPELINE_URI)
def vertex_ai_pipeline(
    project: str=PROJECT,
    region: str=REGION,
):
    #from kfp.v2.components import importer_node
    #from google_cloud_pipeline_components.types import artifact_types
    #from google_cloud_pipeline_components.v1.model import ModelUploadOp
    #from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
    #from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp 
    #from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
    #from google_cloud_pipeline_components.v1.bigquery import BigqueryCreateModelJobOp
    #from google_cloud_pipeline_components.v1.bigquery import BigqueryEvaluateModelJobOp
    #from google_cloud_pipeline_components.v1.bigquery import BigqueryExportModelJobOp
    #from google_cloud_pipeline_components import aiplatform as gcpc_aip
    
    ####
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \
        tabular_dataset_create as TabularDatasetCreateOp
    
    ##################################
    data_preparing_task = zach_prepare_data_component.bq_table_prep_op(
        project=project,
        region=region,
        source_bq_table_id=SOURCE_BQ_TABLE_ID,
        out_bq_dataset_id = f"{PROJECT}.{BQ_DATASET}",
        limit=LIMIT, 
    ).set_display_name('prepped-bq-table-create')
    
    dataset_create_op = TabularDatasetCreateOp(
        project=project,
        location=region,
        display_name=f"{APPNAME}-DATASET",
        bq_source=data_preparing_task.outputs['prepped_data_uri'],
    )

## 2. Compile the Pipeline

In [18]:
# Create a new (local) directory to store the complied file
DIR = f"temp"
!rm -rf {DIR}
!mkdir -p {DIR}

In [19]:
compiled_package = f"{DIR}/compiled_pipeline_package.json"

compiler.Compiler().compile(
    pipeline_func = vertex_ai_pipeline,
    package_path = compiled_package
)

## 3. Execute the Pipeline on Vertex AI

In [20]:
aiplatform.init(project=PROJECT, location=REGION)

In [21]:
pipeline_job = aiplatform.PipelineJob(
    display_name = f"{APPNAME}",
    template_path = compiled_package,
    pipeline_root=PIPELINE_URI,
    parameter_values = {
        "project": PROJECT,
        "region": REGION,
    },
    #enable_caching = False,        #//TRUE, by default//
    labels = {'notebook':f'{NOTEBOOK}'}
)

In [22]:
SERVICE_ACCOUNT ='zacharynguyen@babyweight-mlops.iam.gserviceaccount.com'

In [23]:
response = pipeline_job.run(
    service_account = SERVICE_ACCOUNT,
    #sync = True
)

Creating PipelineJob
PipelineJob created. Resource name: projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/babyweight-20240224215543?project=288906550115
PipelineJob projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/288906550115/locations/us-central1/pipelineJobs/babyweight-20240224215543 curren